diff --git a/common/fudb/dbapis/fu_db_api.py b/common/fudb/dbapis/fu_db_api.py index 3d70a3d..95f5345 100644 --- a/common/fudb/dbapis/fu_db_api.py +++ b/common/fudb/dbapis/fu_db_api.py @@ -7,7 +7,7 @@ # @Desc : from sqlalchemy import Connection, text, CursorResult -from common.futool.fu_collection import split_coll +from common.futool.core.fu_collection import split_coll class SqlExecuteError(Exception): diff --git a/common/futool/__init__.py b/common/futool/__init__.py index 1a85cc4..e69de29 100644 --- a/common/futool/__init__.py +++ b/common/futool/__init__.py @@ -1,7 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# @Time : 2023/4/9 14:53 -# @Author : old tom -# @File : __init__.py.py -# @Project : futool-tiny-datahub -# @Desc : diff --git a/common/futool/cache/__init__.py b/common/futool/cache/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/common/futool/core/__init__.py b/common/futool/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/common/futool/core/fh_file_path.py b/common/futool/core/fh_file_path.py new file mode 100644 index 0000000..e358267 --- /dev/null +++ b/common/futool/core/fh_file_path.py @@ -0,0 +1,144 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2022/9/12 21:03 +# @Author : old tom +# @File : fh_file_path.py +# @Project : Futool +# @Desc : 文件路径 + +import pathlib +import os +import typing + + +class PathNotExistError(Exception): + """ + 路径不存在 + """ + + def __init__(self, msg=''): + Exception.__init__(self, msg) + + +def resolve_2_abs(path) -> pathlib.Path: + """ + 相对路径解析为绝对路径 + :param path: + :return: + """ + return pathlib.Path(path).resolve() + + +def isabs(path): + """ + 是否绝对路径 + :param path: + :return: + """ + return os.path.isabs(path) + + +def exist(path): + """ + 文件或文件夹是否存在 + :param path: + :return: + """ + return os.path.exists(path) + + +def rm_dir(dir_path): + """ + 删除文件夹,此目录必须为空 + :param dir_path: + :return: + """ + pathlib.Path(dir_path).rmdir() + + +def find_file_by_pattern(path, pattern) -> typing.Generator: + """ + 根据匹配规则查找文件 + :param path: 路径 + :param pattern: 例:所有txt,*.txt + :return: + """ + return pathlib.Path(path).rglob(pattern) + + +def loop_mk_dir(dir_path, mode=0o777, exist_ok=False): + """ + 创建文件夹及其子路径 + :param dir_path: 文件夹路径 + :param mode: 权限 + :param exist_ok: 是否覆盖 + :return: + """ + pathlib.Path(dir_path).mkdir(parents=True, mode=mode, exist_ok=exist_ok) + + +def loop_dir(dir_path, file_container: list, filter_fun=None): + """ + 递归文件夹 + :param dir_path: + :param file_container: 路径容器 + :param filter_fun: 自定义过滤 + :return: + """ + if not exist(dir_path): + raise PathNotExistError('目标文件夹不存在') + file_list = os.listdir(dir_path) + for f in file_list: + full_path = os.path.join(dir_path, f) + if os.path.isdir(full_path): + loop_dir(full_path, file_container, filter_fun) + else: + if filter_fun: + if filter_fun(full_path): + file_container.append(full_path) + else: + file_container.append(full_path) + return file_container + + +def is_windows_path(path) -> bool: + """ + 是否windows路径 + :param path: + :return: + """ + return len(str(pathlib.Path(path).drive).rstrip()) > 0 + + +def parent_path_str(path) -> str: + """ + 父级路径 + :param path: + :return: + """ + return str(parent_path(path)) + + +def parent_path(path) -> pathlib.Path: + """ + 父级路径 + :param path: + :return: + """ + return pathlib.Path(path).parent + + +def pwd(): + """ + 当前路径 + :return: + """ + return pathlib.Path().cwd() + + +def home(): + """ + home路径 + :return: + """ + return pathlib.Path().home() diff --git a/common/futool/fu_collection.py b/common/futool/core/fu_collection.py similarity index 100% rename from common/futool/fu_collection.py rename to common/futool/core/fu_collection.py diff --git a/common/futool/core/fu_date.py b/common/futool/core/fu_date.py new file mode 100644 index 0000000..7e48ca4 --- /dev/null +++ b/common/futool/core/fu_date.py @@ -0,0 +1,404 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2022/9/10 10:17 +# @Author : old tom +# @File : fu_date.py +# @Project : Futool +# @Desc : 日期时间工具 + +import time +from datetime import datetime, date, timedelta +import calendar +from decimal import Decimal + +# 纳秒到毫秒 +NANOSECOND_2_MILLISECOND = 1000000 +# 纳秒到秒 +NANOSECOND_2_SECOND = 1000000000 +# 纳秒到分钟 +NANOSECOND_2_MINUTE = 60000000000 +# 毫秒单位 +MILLISECOND_UNIT = 1000 +# 1小时3600秒 +ONE_HOUR_SECOND = 3600 +# 上午下午分界 +AM_PM_DIV = 11 +# 一周7天 +ONE_WEEK_DAYS = 7 + +# 日期时间格式化转换 +FMT_MAPPING = { + 'yyyy-MM-dd': '%Y-%m-%d', + 'yyyyMMdd': '%Y%m%d', + 'yyyy-MM-dd hh:mm:ss': '%Y-%m-%d %I:%M:%S', + 'yyyy-MM-dd hh24:mm:ss': '%Y-%m-%d %H:%M:%S', + 'yyyy-MM-dd HH:mm:ss': '%Y-%m-%d %H:%M:%S' +} + + +def current_year() -> int: + """ + 本年 + :return: + """ + return datetime.now().year + + +def current_month() -> int: + """ + 本月 + :return: + """ + return datetime.now().month + + +def current_day() -> int: + return datetime.now().day + + +def current_date() -> str: + return str(datetime.now().date()) + + +def current_datetime(fmt='yyyy-MM-dd hh24:mm:ss') -> str: + """ + 获取当前日期时间 + :param fmt: + :return: + """ + return datetime.now().strftime(FMT_MAPPING[fmt]) + + +def current_time(fmt='s') -> int: + """ + 获取当前时间绝对秒 + :param fmt: s: 秒 ms:毫秒 ns:纳秒 + :return: + """ + time_jar = { + 's': int(time.time()), + 'ms': int(time.time() * MILLISECOND_UNIT), + 'ns': time.time_ns() + } + return time_jar[fmt] + + +def current_timestamp(): + """ + 当前时间戳 + :return: + """ + return datetime.now().timestamp() + + +def format_datetime_str(dt: str, fmt='yyyy-MM-dd hh24:mm:ss') -> datetime: + """ + 格式化日期时间字符串 + :param dt: 日期时间字符串 + :param fmt: 格式化,例:yyyy-MM-dd + :return: datetime对象 + """ + return datetime.strptime(dt, FMT_MAPPING[fmt]) + + +def format_date_str(dt: str, fmt='yyyy-MM-dd') -> date: + """ + 格式化日期字符串 + :param dt: 日期字符串 + :param fmt: yyyy-MM-dd + :return: + """ + return datetime.strptime(dt, FMT_MAPPING[fmt]).date() + + +def datetime_2_second(dt: str, fmt='yyyy-MM-dd hh24:mm:ss') -> int: + """ + 日期时间字符串转绝对秒 + :param dt: 日期时间字符串 yyyy-MM-dd hh:mm:ss 格式 + :param fmt: 格式化函数 + :return: + """ + return int(format_datetime_str(dt, fmt).timestamp()) + + +def sec_2_datatime(sec_time: int, fmt='yyyy-MM-dd hh24:mm:ss') -> str: + """ + 绝对秒转日期时间 + :param sec_time: + :param fmt: 格式化函数 + :return: + """ + timed = time.localtime(sec_time) + return time.strftime(FMT_MAPPING[fmt], timed) + + +def is_leap(year: int) -> bool: + """ + 是否闰年 + :param year: + :return: + """ + return calendar.isleap(year) + + +def begin_of_week(date_str: str, fmt='yyyy-MM-dd') -> str: + """ + 周开始日期 + :param date_str: 年 + :param fmt: 格式化 + :return: + """ + formated_dt = format_date_str(date_str, fmt) + week_idx = weekday(date_str, fmt) + return date_str if week_idx == 0 else str(formated_dt - timedelta(days=week_idx)) + + +def end_of_week(date_str: str, fmt='yyyy-MM-dd') -> str: + """ + 周结束日期 + :param date_str: 年 + :param fmt: 格式化 + :return: + """ + formated_dt = format_date_str(date_str, fmt) + week_idx = weekday(date_str, fmt) + return date_str if week_idx == 6 else str(formated_dt + timedelta(days=(6 - week_idx))) + + +def end_of_month(y, m) -> int: + """ + 月结束日期 + :param y 年 + :param m 月 + :return: + """ + return calendar.monthrange(y, m)[1] + + +def weekday(date_str: str, fmt='yyyy-MM-dd') -> int: + """ + 返回日期是周几 + :param date_str: 年 + :param fmt: 格式化 + :return: 0-7 ,0:周一 + """ + fmted_date = format_datetime_str(date_str, fmt) + return calendar.weekday(fmted_date.year, fmted_date.month, fmted_date.day) + + +def age(birth: str, compare_date: str, fmt='yyyy-MM-dd') -> int: + """ + 年龄计算 + :param birth: 生日 + :param compare_date: 被比较日期 + :param fmt: 日期格式化 yyyy-MM-dd|yyyyMMdd + :return: + """ + fmt_birth = format_date_str(birth, fmt) + fmt_compare = format_date_str(compare_date, fmt) + birth_m = fmt_birth.replace(year=fmt_compare.year) + return fmt_compare.year - fmt_birth.year if fmt_compare > birth_m else fmt_compare.year - fmt_birth.year - 1 + + +def age_of_now(birth: str) -> int: + """ + 当前年龄 + :param birth: 出生日期 + :return: + """ + return age(birth, current_datetime('yyyy-MM-dd')) + + +def between(dt_1: str, dt_2: str, fmt='yyyy-MM-dd', time_unit='day') -> int: + """ + 计算两个时间差 + :param dt_1: 时间1 日期或日期时间 + :param dt_2: 时间2 日期或日期时间 + :param fmt: 格式化 + :param time_unit: 时间单位 day: 天,hour: 小时 ,minute:分钟,second:秒 + :return: + """ + fmt_dt1, fmt_dt2 = format_datetime_str(dt_1, fmt), format_datetime_str(dt_2, fmt) + return abs({ + 'day': (fmt_dt1 - fmt_dt2).days, + 'hour': int(fmt_dt1.timestamp() - fmt_dt2.timestamp()) / ONE_HOUR_SECOND, + 'second': int(fmt_dt1.timestamp() - fmt_dt2.timestamp()) + }[time_unit]) + + +def time_offset(start_dt: str, offset: int, fmt='yyyy-MM-dd HH:mm:ss', time_unit='day') -> datetime: + """ + 时间偏移计算,例:计算相隔N天后的日期 + :param start_dt: 开始日期(日期时间) + :param fmt: 时间格式化 + :param offset: 偏移量,支持正负数 + :param time_unit: 偏移量单位 + :return: + """ + fmt_dt = format_datetime_str(start_dt, fmt) + return {'day': fmt_dt + timedelta(days=offset), + 'hour': fmt_dt + timedelta(hours=offset), + 'second': fmt_dt + timedelta(seconds=offset)}[time_unit] + + +def is_am(dt: str, fmt='yyyy-MM-dd HH:mm:ss') -> bool: + """ + 是否上午 + :param dt: + :param fmt: + :return: + """ + fmt_dt = format_datetime_str(dt, fmt) + return fmt_dt.hour <= AM_PM_DIV + + +def is_pm(dt: str, fmt='yyyy-MM-dd HH:mm:ss') -> bool: + """ + 是否下午 + :param dt: + :param fmt: + :return: + """ + fmt_dt = format_datetime_str(dt, fmt) + return fmt_dt.hour > AM_PM_DIV + + +def next_week(fmt='yyyy-MM-dd HH:mm:ss') -> datetime: + """ + 下周同一天 + :return: + """ + now = current_datetime(fmt) + return time_offset(now, ONE_WEEK_DAYS, fmt) + + +def next_month() -> datetime: + """ + 下个月同一天 + :return: + """ + return datetime.now().replace(month=current_month() + 1) + + +def convert_time(t, from_unit, to_unit): + """ + TODO 时间转换 + :param t: + :param from_unit: + :param to_unit: + :return: + """ + pass + + +class StopWatch(object): + """ + 简单计时器实现 + """ + + class TaskInfo(object): + def __init__(self, task_name, start_nanos): + # 任务名称 + self.__task_name = task_name + # 开始时间 + self.__start_nanos = start_nanos + # 运行状态 + self.__task_running = True + # 时间花费 + self.__interval_nanos = 0 + + @property + def task_name(self): + return self.__task_name + + @property + def start_nanos(self): + return self.__start_nanos + + @start_nanos.setter + def start_nanos(self, value): + self.__start_nanos = value + + @task_name.setter + def task_name(self, value): + self.__task_name = value + + @property + def interval_nanos(self): + return self.__interval_nanos + + def set_task_stop(self, time_nanos): + self.__interval_nanos = time_nanos + self.__task_running = False + + @property + def task_running(self): + return self.__task_running + + @task_running.setter + def task_running(self, values=False): + self.__task_running = values + + class StopWatchError(Exception): + def __init__(self, msg): + Exception.__init__(self, msg) + + def __init__(self): + self.task_queue = {} + + def _task_exist(self, task_name): + """ + 判断任务是否存在 + :param task_name: + :return: + """ + return task_name in self.task_queue.keys() + + def start(self, task_name): + if self._task_exist(task_name): + raise self.StopWatchError(f'{task_name} already running') + else: + self.task_queue[task_name] = self.TaskInfo(task_name, current_time(fmt='ns')) + + def stop(self, task_name, time_unit='s'): + if not self._task_exist(task_name): + raise self.StopWatchError(f'{task_name} not running') + else: + task = self.task_queue[task_name] + interval_time_nanos = current_time(fmt='ns') - task.start_nanos + # 设置执行时间及修改任务状态 + task.set_task_stop(time_nanos=interval_time_nanos) + return { + 'ns': interval_time_nanos, + 'ms': interval_time_nanos / NANOSECOND_2_MILLISECOND, + 's': interval_time_nanos // NANOSECOND_2_SECOND, + 'm': Decimal(interval_time_nanos / NANOSECOND_2_MINUTE).quantize(Decimal("0.01"), + rounding="ROUND_HALF_UP") + }[time_unit] + + def all_task(self): + """ + 返回总任务数 + :return: + """ + return len(self.task_queue) + + def pretty_print(self, task_name): + """ + 格式化输出控制台 + :return: + """ + if not self._task_exist(task_name): + raise self.StopWatchError(f'{task_name} not running') + # 判断任务状态 + task = self.task_queue[task_name] + if task.task_running: + self.stop(task_name) + print(f'[{task_name}] running time = {task.interval_nanos // NANOSECOND_2_SECOND} second') + + def clear(self): + """ + 清空任务 + :return: + """ + self.task_queue = {} diff --git a/common/futool/core/fu_file.py b/common/futool/core/fu_file.py new file mode 100644 index 0000000..9af2076 --- /dev/null +++ b/common/futool/core/fu_file.py @@ -0,0 +1,230 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2022/9/10 10:45 +# @Author : old tom +# @File : fu_file.py +# @Project : Futool +# @Desc : 文件操作 +import shutil +import os +import hashlib +import zipfile +from pathlib import Path +from futool.core.fh_file_path import loop_mk_dir, parent_path, exist +from enum import Enum + + +class FileSuffix(Enum): + """ + 常见文件后缀 + """ + XLSX = '.xlsx' + XLS = '.xls' + DOC = '.doc' + DOCX = '.docx' + PPT = '.ppt' + PPTX = '.pptx' + EXE = '.exe' + MSI = '.msi' + ISO = '.iso' + PGP = '.pgp' + PNG = '.png' + JPG = 'jpg' + JPEG = '.jpeg' + GIF = '.gif' + WAV = '.wav' + JAR = '.jar' + PY = '.py' + BAT = '.bat' + DLL = '.dll' + ZIP = '.zip' + SEVEN_ZIP = '.7z' + TAR = '.tar' + RAR = '.rar' + + +class FileNotExistError(Exception): + def __init__(self, msg=''): + Exception.__init__(self, msg) + + +def split_text(): + pass + + +def split_json(): + pass + + +def compress(path, z_path, z_name='zip'): + """ + 压缩文件或文件夹 + :param path: + :param z_path: 压缩后路径 + :param z_name: 压缩格式 zip rar gz + :return: + """ + pass + + +def un_compress(path): + """ + 解压 + :param path: + :return: + """ + pass + + +def md5(file_path) -> str: + """ + 文件MD5 + :param file_path: + :return: + """ + if not exist(file_path): + raise FileNotExistError('文件不存在') + with open(file_path, 'rb') as f: + data = f.read() + return hashlib.md5(data).hexdigest() + + +def move(src, dst): + """ + 移动文件夹或文件 + :param src: 源 + :param dst: 目标 + :return: + """ + shutil.move(src, dst) + + +def copy(src, dst, override=False): + """ + 复制 + :param src: + :param dst: + :param override: + :return: + """ + if override and exist(dst): + os.remove(dst) + shutil.copy(src, dst) + + +def copy_dir(src, dst, override=False) -> str: + """ + 复制文件夹 + :param src: 源目录 + :param dst: 目的目录 + :param override: 是否覆盖 + :return: + """ + if override and exist(dst): + os.remove(dst) + return shutil.copytree(src, dst, dirs_exist_ok=override) + + +def copy_file(src, dst, override=False) -> str: + """ + 复制文件 + :param src: 源文件 + :param dst: 目的文件或目的目录 + :param override: 是否覆盖 + :return: + """ + if override and exist(dst): + os.remove(dst) + if Path(dst).is_dir(): + dst = os.path.join(dst, file_full_name(src)) + return shutil.copyfile(src, dst) + + +def rename(file_path, neo_name) -> Path: + """ + 重命名 + :param file_path: 文件路径 + :param neo_name: 新命名 + :return: + """ + return Path(file_path).rename(parent_path(file_path).joinpath(neo_name)) + + +def delete(file_path): + """ + 删除文件 + :param file_path: + :return: + """ + if exist(file_path): + os.remove(file_path) + + +def touch(file_path, mode=0o777, cover=True): + """ + 创建文件 + :param file_path: 文件路径 + :param mode: 权限 + :param cover: 是否覆盖 + :return: + """ + parent = parent_path(file_path) + if not parent.exists(): + loop_mk_dir(str(parent), mode=mode, exist_ok=cover) + Path(file_path).touch(mode=mode, exist_ok=cover) + + +def file_name(file_path) -> str: + """ + 获取文件名 + :param file_path: + :return: + """ + full_name = file_full_name(file_path) + return full_name.split(sep='.')[0] if (full_name and '.' in full_name) else full_name + + +def file_full_name(file_path) -> str: + """ + 文件全名,带后缀 + :param file_path: + :return: + """ + return Path(file_path).name + + +def suffix(file_path) -> str: + """ + 文件后缀 + :return: + """ + return Path(file_path).suffix + + +def suffixes(file_path) -> list: + """ + 多后缀,例:xxx.tar.gz + :param file_path: + :return: [.tar,.gz] + """ + return Path(file_path).suffixes + + +def file_info(file_path): + """ + 返回文件信息 + :param file_path: + :return: + st_mode=33206 文件模式:包括文件类型和文件模式位(即权限位) + st_ino=281474976714543 与平台有关,但如果不为零,则根据 st_dev 值唯一地标识文件。 + 通常: 在 Unix 上该值表示索引节点号 (inode number)。 在 Windows 上该值表示 文件索引号 。 + st_dev=10943705 该文件所在设备的标识符。 + st_nlink=1 硬链接的数量。 + st_uid=0 文件所有者的用户 ID。 + st_gid=0 文件所有者的用户组 ID。 + st_size=453 文件大小(以字节为单位) + st_atime=1662966762 最近的访问时间,以秒为单位 + st_mtime=1652331424 最近的修改时间,以秒为单位 + st_ctime=1652331424 在 Windows 上表示创建时间,以秒为单位 在 Unix 上表示最近的元数据更改时间 + """ + return Path(file_path).stat() diff --git a/common/futool/fu_function.py b/common/futool/core/fu_function.py similarity index 100% rename from common/futool/fu_function.py rename to common/futool/core/fu_function.py diff --git a/common/futool/fu_id.py b/common/futool/core/fu_id.py similarity index 99% rename from common/futool/fu_id.py rename to common/futool/core/fu_id.py index bd74ceb..2cb3973 100644 --- a/common/futool/fu_id.py +++ b/common/futool/core/fu_id.py @@ -7,7 +7,7 @@ # @Desc : ID 生成器 import time import abc -from common.log_conf import Logger +from datahub.log_conf import Logger logger = Logger().get_logger() diff --git a/common/futool/core/fu_lang.py b/common/futool/core/fu_lang.py new file mode 100644 index 0000000..5f235a2 --- /dev/null +++ b/common/futool/core/fu_lang.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2022/9/10 10:15 +# @Author : old tom +# @File : fu_lang.py +# @Project : Futool +# @Desc : 字符串相关 diff --git a/common/futool/core/fu_math.py b/common/futool/core/fu_math.py new file mode 100644 index 0000000..69579bc --- /dev/null +++ b/common/futool/core/fu_math.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2022/9/10 10:17 +# @Author : old tom +# @File : fu_math.py +# @Project : Futool +# @Desc : 数学计算 diff --git a/common/futool/core/fu_parser.py b/common/futool/core/fu_parser.py new file mode 100644 index 0000000..ea799c9 --- /dev/null +++ b/common/futool/core/fu_parser.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2022/9/10 10:46 +# @Author : old tom +# @File : fu_parser.py +# @Project : Futool +# @Desc : 解析器(CSV,JSON,NB文件) diff --git a/common/futool/db/__init__.py b/common/futool/db/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/common/futool/db/fh_db.py b/common/futool/db/fh_db.py new file mode 100644 index 0000000..53be2e2 --- /dev/null +++ b/common/futool/db/fh_db.py @@ -0,0 +1,8 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2023/4/3 21:46 +# @Author : old tom +# @File : fh_db.py +# @Project : futool +# @Desc : 数据库通用类 + diff --git a/common/futool/http/__init__.py b/common/futool/http/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/common/futool/http/http_downloader.py b/common/futool/http/http_downloader.py new file mode 100644 index 0000000..85d63c6 --- /dev/null +++ b/common/futool/http/http_downloader.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2022/8/31 23:34 +# @Author : old tom +# @File : http_downloader.py +# @Project : Futool +# @Desc : 文件下载器 +import time + +from futool.http.http_request import head +from multiprocessing import Pool +import urllib.request as req + + +class HttpDownloader(object): + """ + HTTP 下载器 + """ + + def __init__(self, pool=None): + self.pool = Pool(16) if not pool else pool + + def download(self, url, dst, chunk_size=1000): + """ + 文件下,自动开启多线程 + :param url: 下载链接 + :param dst: 保存路径 + :param chunk_size: 文件块 + :return: + """ + is_support, content_length = HttpDownloader.is_support_range(url) + if is_support: + # 每个线程下载字节偏移量 + offset = self.fork(int(content_length), chunk_size) + self.__join(offset, url, dst) + else: + print('无法获取Content-Length,使用单线程下载') + pass + + @staticmethod + def is_support_range(url): + """ + 判断是否支持range请求 + :return: + """ + wrapper = head(url) + header = wrapper.header() + h_keys = header.keys() + if 'Accept-Ranges' in h_keys and 'Content-Length' in h_keys and header['Accept-Ranges'] != 'none': + return True, header['Content-Length'] + else: + return False, 0 + + @staticmethod + def fork(content_length: int, chunk_size): + """ + 拆分线程 + :param chunk_size: 文件块大小 + :param content_length: + :return: + """ + offset = [] + if content_length <= chunk_size: + offset.append((0, content_length)) + else: + for i in range(content_length // chunk_size): + start_offset = chunk_size * i + 1 + end_offset = start_offset - 1 + chunk_size + offset.append((0 if i == 0 else start_offset, end_offset)) + offset.append((chunk_size * (content_length // chunk_size), content_length)) + return offset + + def __join(self, offset, url, dst): + """ + 多线程下载 + :param offset: + :param url: + :param dst: + :return: + """ + + def download_by_thread(part): + _request = req.Request(url=url, headers={ + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/104.0.5112.102 Safari/537.36 Edg/104.0.1293.70", + 'Range': f'bytes:{part[0]}-{part[1]}' + }, method='GET') + response = req.urlopen(_request) + with open(dst + f'.{time.time_ns()}', 'wb') as f: + f.write(response.read()) + + self.pool.map(download_by_thread, offset) + self.pool.close() + self.pool.join() diff --git a/common/futool/http/http_request.py b/common/futool/http/http_request.py new file mode 100644 index 0000000..4f9dc1a --- /dev/null +++ b/common/futool/http/http_request.py @@ -0,0 +1,158 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2022/8/29 21:31 +# @Author : old tom +# @File : http_request.py +# @Project : Futool + +import urllib.request as req +import urllib.parse +from futool.http.http_response import ResponseWrapper +from http import cookiejar +import json +import shutil +import os + +DEFAULT_HEADER = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/104.0.5112.102 Safari/537.36 Edg/104.0.1293.70", + "Accept": "*/*", + "Connection": "keep-alive" +} + +DEFAULT_TIMEOUT = 10 + + +class HttpRequestError(Exception): + def __init__(self, msg=''): + Exception.__init__(self, msg) + + +def get(url, param=None, header=None, timeout=DEFAULT_TIMEOUT): + """ + 普通get请求 + :param url: 请求地址 + :param param: 参数字典 自动拼接url + :param header: 请求头 + :param timeout: 超时(秒) + :return: + """ + return request(url, 'GET', param, header=header, timeout=timeout) + + +def head(url, param=None, header=None, timeout=DEFAULT_TIMEOUT): + """ + 发起head请求 + :param url: + :param param: + :param header: + :param timeout: + :return: + """ + return request(url, 'HEAD', param, header, timeout) + + +def post_form(url, data=None, header=None, timeout=DEFAULT_TIMEOUT): + """ + post 请求提交普通表单 + :param url: + :param data: + :param header: + :param timeout: + :return: + """ + return request(url, 'POST', data=data, header=header, timeout=timeout) + + +def post(url, json_param=None, header=None, timeout=DEFAULT_TIMEOUT): + """ + post 请求提交body 参数 + :param url: + :param json_param: + :param header: + :param timeout: + :return: + """ + return request(url, 'POST', json_param=json_param, header=header, timeout=timeout) + + +def request(url, method, param=None, data=None, json_param=None, header=None, timeout=DEFAULT_TIMEOUT, before_send=None, + success_handler=None, + error_handler=None): + """ + 基础请求方法 + :param method: 请求方法 目前支持get post 方法 + :param url: 请求地址 + :param param: 请求参数拼接url或表单 + :param data: 表单参数 + :param json_param: json body参数 + :param header: 请求头 + :param timeout: 超时 + :param before_send: 发送前处理 + :param success_handler: 钩子函数(成功处理) + :param error_handler: 钩子函数(失败处理) + :return: + """ + _request = None + if method not in ['GET', 'POST', 'HEAD']: + raise HttpRequestError('not support method') + if header is None: + header = DEFAULT_HEADER + opener, cookie = _init_cookie_jar() + if before_send: + before_send(opener, cookie, header) + if method in ['GET', 'HEAD']: + if param: + param_str = urllib.parse.urlencode(param) + url = url + '?' + param_str + _request = req.Request(url, headers=header, method=method) + if 'POST' == method: + if data: + # 表单 + data = urllib.parse.urlencode(data).encode('utf-8') + header.update({"Content-Type": "application/x-www-form-urlencoded"}) + if json_param: + # json body + data = bytes(json.dumps(json_param), 'utf-8') + header.update({"Content-Type": "application/json"}) + _request = req.Request(url, data=data, headers=header, method=method) + wrapper = ResponseWrapper(opener.open(_request, timeout=timeout), cookie) + if success_handler and wrapper.is_ok(): + return success_handler(wrapper) + elif error_handler and not wrapper.is_ok(): + return error_handler(wrapper) + return wrapper + + +def _init_cookie_jar(): + """ + 初始化cookie容器 + :return: + """ + cookie = cookiejar.CookieJar() + handler = req.HTTPCookieProcessor(cookie) + return req.build_opener(handler), cookie + + +def upload_file(): + # TODO + pass + + +def download_file(url, dst_path, overwrite=True, duplicate_handler=None, buffer=16 * 1024): + """ + 文件下载 + :param url: URL路径 + :param dst_path: 目标位置 + :param overwrite: 是否覆盖同名文件 + :param duplicate_handler: 自定义重复文件处理 + :param buffer: + :return: + """ + if os.path.isfile(dst_path) and overwrite: + os.remove(dst_path) + if os.path.isfile(dst_path) and not overwrite and duplicate_handler: + duplicate_handler(dst_path) + b_resp = req.urlopen(url) + with open(dst_path, 'wb') as f: + shutil.copyfileobj(b_resp, f, buffer) diff --git a/common/futool/http/http_response.py b/common/futool/http/http_response.py new file mode 100644 index 0000000..df188b5 --- /dev/null +++ b/common/futool/http/http_response.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2022/8/29 23:14 +# @Author : old tom +# @File : http_response.py +# @Project : Futool +# @Desc : 响应解析 + +from http.client import HTTPResponse +from http.cookiejar import CookieJar +import json + +DEFAULT_ENCODING = 'UTF-8' + +# 响应类型编码 +RESPONSE_CONTENT_ENCODING = "Content-Encoding" + +# 压缩类型 +COMPRESS_TYPE = ('gzip', 'deflate', 'br') + + +class ResponseWrapper(object): + + def __init__(self, response: HTTPResponse, cookie: CookieJar = None): + self.resp = response + if cookie and len(cookie) > 0: + self.cookie = cookie + + def body(self, encoding=DEFAULT_ENCODING): + return self.resp.read().decode(encoding) + + def json_body(self, encoding=DEFAULT_ENCODING): + return json.loads(self.body(encoding)) + + def status(self): + return self.resp.status + + def is_ok(self): + st_code = self.resp.status + return 200 <= st_code <= 300 + + def header(self, name=None): + return self.resp.getheader(name) if name else self._parse_header_dict() + + def _parse_header_dict(self): + headers = self.resp.getheaders() + header_dict = {} + if headers: + for h in headers: + header_dict[h[0]] = h[1] + return header_dict + + def is_compress(self): + """ + 是否压缩 + :return: + """ + return self.compress_type() in COMPRESS_TYPE + + def compress_type(self): + """ + 压缩格式 + :return: + """ + header = self.header() + if RESPONSE_CONTENT_ENCODING in header.keys(): + res_content_encoding = header[RESPONSE_CONTENT_ENCODING] + return res_content_encoding + + def cookies(self): + """ + 获取cookie + :return: + """ + ck = {} + if self.cookie: + for item in self.cookie: + ck[item.name] = item.value + return ck diff --git a/common/futool/net/__init__.py b/common/futool/net/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/common/futool/poi/__init__.py b/common/futool/poi/__init__.py new file mode 100644 index 0000000..4fc1aa6 --- /dev/null +++ b/common/futool/poi/__init__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2023/4/2 9:07 +# @Author : old tom +# @File : __init__.py.py +# @Project : futool +# @Desc : excel、word操作相关 diff --git a/common/futool/poi/fu_excel.py b/common/futool/poi/fu_excel.py new file mode 100644 index 0000000..6978baa --- /dev/null +++ b/common/futool/poi/fu_excel.py @@ -0,0 +1,213 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2023/4/2 9:08 +# @Author : old tom +# @File : fu_excel.py +# @Project : futool +# @Desc : 读写excel,需要引入openPyxl + +from futool.core import fu_file +from openpyxl import load_workbook, Workbook + + +class ExcelNotFoundError(Exception): + """ + excel文件不存在 + """ + + def __init__(self, msg=''): + Exception.__init__(self, msg) + + +class SheetNotExistError(Exception): + """ + sheet不存在 + """ + + def __init__(self, msg=''): + Exception.__init__(self, msg) + + +class ExcelReader(object): + """ + excel读取 + TODO 存在性能问题需要重构;增加流式读取功能;sheet分片方式存在问题 + """ + + class SheetNotLoadError(Exception): + def __init__(self): + Exception.__init__(self, 'sheet not load,you need call load_sheet()') + + class OutOfColIndexError(Exception): + def __init__(self, msg): + Exception.__init__(self, msg) + + def __init__(self, file_path): + if not fu_file.exist(file_path): + raise ExcelNotFoundError(msg=f'path={file_path}') + self.wb = load_workbook(file_path, read_only=True) + self.sheetnames = self.wb.sheetnames + self.sheet = None + self.col_nums = 0 + + def load_sheet(self, sheet_name: str): + """ + 指定名称读取sheet + """ + if sheet_name not in self.sheetnames: + raise SheetNotExistError(msg=f'{sheet_name} not exists') + self.sheet = self.wb[sheet_name] + return self.sheet + + def load_sheet_by_index(self, sheet_index: int): + """ + 指定下标读取sheet,从0开始 + """ + return self.load_sheet(self.sheetnames[sheet_index]) + + def load_sheet_first(self): + """ + 加载第一个sheet + :return: + """ + return self.load_sheet_by_index(0) + + def read_row(self, row_index): + """ + 读取某一行 + """ + self._check_sheet() + element = [] + for i, row in enumerate(self.sheet): + if i == row_index: + for cell in row: + element.append(cell.value) + return element + + def read_range_rows(self, start, end): + """ + 范围读取 + """ + self._check_sheet() + elements = [] + for i, row in enumerate(self.sheet): + if start <= i <= end: + element = [] + for cell in row: + element.append(cell.value) + elements.append(tuple(element)) + return elements + + def read_rows(self, row_index: [] = None): + """ + 指定读取多行 + :param row_index: 行号,例如:[1,3,5] + :return: + """ + self._check_sheet() + elements = [] + for i, row in enumerate(self.sheet): + if i in row_index: + element = [] + for cell in row: + element.append(cell.value) + elements.append(tuple(element)) + return elements + + def read_first(self): + """ + 读取标题行 + """ + return self.read_row(0) + + def read_all(self, skip_head=True): + """ + 读取全部 + :return: + """ + self._check_sheet() + elements = [] + for i, row in enumerate(self.sheet): + if skip_head: + if i > 0: + element = [] + for cell in row: + element.append(cell.value) + elements.append(tuple(element)) + return elements + + def read_column(self, col_index, skip_head=True): + """ + 按列读取 + :param col_index: 列下标,从0开始 + :param skip_head: 跳过第一行 + :return: list + """ + self._check_sheet() + element = [] + for i, row in enumerate(self.sheet): + if skip_head: + if i > 0: + for j, cell in enumerate(row): + if j == col_index: + element.append(cell.value) + return element + + def read_range_column(self, start, end, skip_head=True): + """ + 范围读取列,下标从0开始 + :param start: 开始下标 + :param end: 结束下标 + :param skip_head: 跳过第一行 + :return: + """ + self._col_nums() + if end >= self.col_nums: + raise self.OutOfColIndexError(msg=f'out of column index,max col was {self.col_nums} ') + elements = [] + for i in range(start, end + 1): + elements.append(tuple(self.read_column(i, skip_head))) + return elements + + def _check_sheet(self): + if self.sheet is None: + raise self.SheetNotLoadError() + + def _col_nums(self): + self.col_nums = len(self.read_first()) + + +class SimpleExcelWriter(object): + """ + excel写入 + """ + + class ExcelFileExistsError(Exception): + """ + excel文件已存在 + """ + + def __init__(self, msg=''): + Exception.__init__(self, msg) + + def __init__(self, write_path): + if fu_file.exist(write_path): + raise self.ExcelFileExistsError(f'{write_path} all ready exists') + self.write_path = write_path + self.wb = Workbook(write_only=True) + + def write(self, head: [], data: [], sheet_name='Sheet1', index=0): + """ + 写excel文件 + :param index: sheet 下标 + :param head: 第一行标题 + :param data: 数据 + :param sheet_name: sheet名称 + :return: + """ + ws = self.wb.create_sheet(title=sheet_name, index=index) + # 写入列头 + ws.append(head) + for d in data: + ws.append(d) + self.wb.save(self.write_path) diff --git a/common/futool/system/__init__.py b/common/futool/system/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/common/futool/system/fu_sys.py b/common/futool/system/fu_sys.py new file mode 100644 index 0000000..3563bfc --- /dev/null +++ b/common/futool/system/fu_sys.py @@ -0,0 +1,85 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2022/9/9 0:04 +# @Author : old tom +# @File : fu_sys.py +# @Project : Futool +# @Desc : 用于获取当前系统信息,内存及硬盘信息获取请使用psutil + +import os +import platform +import getpass + + +def is_windows(): + """ + 是否windows + :return: + """ + return 'Windows' == platform.system() + + +def is_linux(): + """ + 是否linux + :return: + """ + return 'Linux' == platform.system() + + +def is_unix(): + """ + 是否unix + :return: + """ + return 'Unix' == platform.system() + + +def os_name(): + """ + 操作系统名称 + :return: + """ + return platform.platform() + + +def host_name(): + """ + 获取主机名 + :return: + """ + return platform.node() + + +def sys_user(): + """ + 系统用户 + :return: + """ + return getpass.getuser() + + +def sys_user_dir(): + """ + 当前用户目录 + :return: + """ + return os.path.expanduser('~') + + +class CpuInfo(object): + @staticmethod + def cpu_architecture(): + """ + CPU架构,AMD64,i386 + :return: + """ + return platform.machine() + + @staticmethod + def cpu_count(): + """ + CPU核数 + :return: + """ + return os.cpu_count() diff --git a/common/futool/validator/__init__.py b/common/futool/validator/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/datahub/datasource/datasource_manage.py b/datahub/datasource/datasource_manage.py index 72b080f..1320d9b 100644 --- a/datahub/datasource/datasource_manage.py +++ b/datahub/datasource/datasource_manage.py @@ -9,9 +9,10 @@ from common.fudb.connectors.connector_factory import ConnFactory from common.fudb.dbapis.fu_db_api import select_one from datahub.datasource.dsdao.ds_dao import DataSourceDao -from common.log_conf import Logger +from datahub.log_conf import Logger from datahub.datasource.constant import ds_conf_param, DUAL_DB_TYPE from datahub.local_db_conf import local_conn +from datahub.metadata.metaversion.metadata_version import MetadataVersionKeeper # 日志 logger = Logger().get_logger() @@ -45,8 +46,14 @@ class DataSourceManage(object): checker = DataSourceChecker(connector) result, msg = checker.check() if result: - # 入库并添加扫描任务 - return self.dao.add_datasource(connector.connector_id, conf, cron), '' + # 入库并添加扫描任务 TODO 添加事务管理 + rt = self.dao.add_datasource(connector.connector_id, conf, cron), '' + # 初始化版本 + if rt: + version_keeper = MetadataVersionKeeper(connector.connector_id) + add_version_rt = version_keeper.init_version() + return True, "添加数据源并初始化版本成功" if add_version_rt else False, "添加数据源成功,初始化版本失败" + return False, '添加数据源失败' else: # 返回错误信息 return result, f'check failed,{msg}' diff --git a/datahub/datasource/dsdao/ds_dao.py b/datahub/datasource/dsdao/ds_dao.py index 31a7303..a4f79f8 100644 --- a/datahub/datasource/dsdao/ds_dao.py +++ b/datahub/datasource/dsdao/ds_dao.py @@ -9,7 +9,7 @@ from common.fudb.connectors.connector_factory import ConnFactory from common.fudb.dbapis.fu_dao import BaseDao from sqlalchemy import text -from common.log_conf import Logger +from datahub.log_conf import Logger from datahub.datasource.constant import ds_conf_param # 日志 @@ -45,6 +45,11 @@ class DataSourceDao(BaseDao): return self.execute_update(f"delete from datasource_main where source_id='{source_id}'") def deactivate_datasource(self, source_id): + """ + 禁用数据源 + :param source_id: + :return: + """ return self.execute_update( f"update datasource_main set has_used='N' where source_id='{source_id}'") > 0 diff --git a/datahub/local_db_conf.py b/datahub/local_db_conf.py index b728dc1..16e513b 100644 --- a/datahub/local_db_conf.py +++ b/datahub/local_db_conf.py @@ -14,4 +14,4 @@ local_db = ds_conf_param('postgresql', 'postgres', 'root@123', 'localhost', 5432 local_conn = ConnFactory(local_db) # neo4j配置 -graph = Graph("bolt://localhost:7687", auth=("neo4j", "colony-turtle-trick-leopard-tulip-5674")) +graph = Graph("bolt://localhost:7687", auth=("neo4j", "root@12345")) diff --git a/common/log_conf.py b/datahub/log_conf.py similarity index 95% rename from common/log_conf.py rename to datahub/log_conf.py index 34c0fa7..0a89ffc 100644 --- a/common/log_conf.py +++ b/datahub/log_conf.py @@ -31,7 +31,7 @@ class Logger(object): ) # 输出到文件的格式,注释下面的add',则关闭日志写入 self.logger.add(LOG_PATH, level='DEBUG', - format='{time:YYYYMMDD HH:mm:ss} - ' # 时间 + format='{time:YYYY-MM-DD HH:mm:ss} - ' # 时间 "{process.name} | " # 进程名 "{thread.name} | " # 进程名 '{module}.{function}:{line} - {level} -{message}', # 模块名.方法名:行号 diff --git a/datahub/main.py b/datahub/main.py new file mode 100644 index 0000000..c830d3c --- /dev/null +++ b/datahub/main.py @@ -0,0 +1,17 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2023/5/9 13:55 +# @Author : old tom +# @File : main.py +# @Project : futool-tiny-datahub +# @Desc : 入口文件,定时任务、fastapi、streamlit启动 +from datahub.scheduletask.scan_task import ScanTaskRunner +from datahub.scheduletask.schedule import sch + +# 数据源扫描任务 +scan_runner = ScanTaskRunner() +scan_runner.run() + +if __name__ == '__main__': + # 启动调度 + sch.start() diff --git a/datahub/metadata/metadata_reader.py b/datahub/metadata/metadata_reader.py index ea13770..6fc080b 100644 --- a/datahub/metadata/metadata_reader.py +++ b/datahub/metadata/metadata_reader.py @@ -9,7 +9,7 @@ import abc import os from configparser import ConfigParser - +from datahub.log_conf import Logger from datahub.datasource.datasource_manage import DataSource from datahub.metadata.metadatadao.metadata_dao import MetadataDao @@ -18,6 +18,8 @@ BASE_DIR = os.path.dirname(os.path.abspath(__file__)) reader_conf = ConfigParser() reader_conf.read(os.path.join(BASE_DIR, 'reader_conf.ini')) +logger = Logger().get_logger() + class AbsMetadataReader(metaclass=abc.ABCMeta): """ diff --git a/datahub/metadata/metadata_warehouse.py b/datahub/metadata/metadata_warehouse.py index bccacab..1d06475 100644 --- a/datahub/metadata/metadata_warehouse.py +++ b/datahub/metadata/metadata_warehouse.py @@ -6,7 +6,7 @@ # @Project : futool-tiny-datahub # @Desc : 元数据存储 -from common.futool.fu_id import id_gen +from common.futool.core.fu_id import id_gen from datahub.metadata.metadatadao.metadata_dao import MetadataDao from datahub.local_db_conf import local_conn from datahub.metadata.constant.metadata_constant import MetaDataObjType @@ -116,8 +116,3 @@ class MetadataWareHouse(object): def is_view(self, view_name): return self.query_metadata_type_by_name(view_name) == MetaDataObjType.View.value - - -if __name__ == '__main__': - mtw = MetadataWareHouse('834164a2d62de959c0261e6239dd1e55') - print(mtw.query_metadata_name('view')) diff --git a/datahub/metadata/metadatadao/metadata_dao.py b/datahub/metadata/metadatadao/metadata_dao.py index cdb94e2..a7d4ce4 100644 --- a/datahub/metadata/metadatadao/metadata_dao.py +++ b/datahub/metadata/metadatadao/metadata_dao.py @@ -153,4 +153,4 @@ class MetadataDao(BaseDao): """ rt = self.query_one( f"select meta_type from metadata_object where meta_name='{meta_name}' and source_id='{source_id}' limit 1") - return rt[0] if rt and len(rt) > 0 else None + return rt[0] if rt and len(rt) > 0 else None \ No newline at end of file diff --git a/datahub/metadata/metadatadao/metadata_version_dao.py b/datahub/metadata/metadatadao/metadata_version_dao.py new file mode 100644 index 0000000..6b286e0 --- /dev/null +++ b/datahub/metadata/metadatadao/metadata_version_dao.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2023/5/9 14:29 +# @Author : old tom +# @File : metadata_version_dao.py +# @Project : futool-tiny-datahub +# @Desc : +from common.fudb.connectors.connector_factory import ConnFactory +from common.fudb.dbapis.fu_dao import BaseDao + + +class IncreaseVersionError(Exception): + """ + 递增版本号异常 + """ + + def __init__(self, msg): + Exception.__init__(self, msg) + + +class MetadataVersionDao(BaseDao): + + def __init__(self, connector: ConnFactory): + super().__init__(connector) + + def init_version(self, source_id): + """ + 初始化版本号 + :param source_id: + :return: + """ + return self.execute_update(f"insert into metadata_object_version_record (source_id) values ('{source_id}')") + + def query_latest_version(self, source_id): + """ + 查询最新版本号 + :param source_id: + :return: + """ + result = self.query_one( + f"select version_code from metadata_object_version_record where source_id='{source_id}' " + f"order by version_code desc limit 1") + return result[0] if result else None + + def add_version(self, source_id): + """ + 版本号加1并返回最新 + :param source_id: + :return: + """ + last_version = self.query_latest_version(source_id) + latest_version = last_version + 1 + flag = self.execute_update( + f"insert into metadata_object_version_record (source_id,version_code) values ('{source_id}',{latest_version})") + if flag == 1: + return latest_version + else: + raise IncreaseVersionError(f'[{source_id}] 更新版本号失败') diff --git a/datahub/metadata/metaversion/metadata_version.py b/datahub/metadata/metaversion/metadata_version.py new file mode 100644 index 0000000..6a48469 --- /dev/null +++ b/datahub/metadata/metaversion/metadata_version.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2023/5/9 14:22 +# @Author : old tom +# @File : metadata_version.py +# @Project : futool-tiny-datahub +# @Desc : 元数据版本管理 +from datahub.metadata.metadatadao.metadata_version_dao import MetadataVersionDao +from datahub.local_db_conf import local_conn +from datahub.log_conf import Logger +import threading + +lock = threading.RLock() +logger = Logger().get_logger() + + +class MetadataVersionKeeper(object): + """ + 元数据版本管理器 + """ + + def __init__(self, source_id): + self.source_id = source_id + self.dao = MetadataVersionDao(local_conn) + + def init_version(self): + """ + 初始化版本号 + :return: + """ + flag = self.dao.init_version(self.source_id) == 1 + if flag: + logger.info(f"[{self.source_id}] 数据源初始化版本成功") + else: + logger.error(f"[{self.source_id}] 数据源初始化版本失败") + + def get_latest_version(self): + """ + 获取最新版本 + :return: + """ + return self.dao.query_latest_version(self.source_id) + + def increase_version(self): + """ + 递增并返回最新版本 + 加锁操作 + :return: + """ + try: + lock.acquire() + return self.dao.add_version(self.source_id) + finally: + lock.release() + + +if __name__ == '__main__': + version_keeper = MetadataVersionKeeper('834164a2d62de959c0261e6239dd1e55') + # 多线程测试 + + # for i in range(100): + # print(version_keeper.increase_version()) diff --git a/datahub/metadata/reader_conf.ini b/datahub/metadata/reader_conf.ini index c09b375..09a7085 100644 --- a/datahub/metadata/reader_conf.ini +++ b/datahub/metadata/reader_conf.ini @@ -1,5 +1,5 @@ [oracle] -tables = select distinct table_name,comments from user_tab_comments where table_type='TABLE' and table_name not like '%BIN$%' +tables = select distinct table_name,comments from user_tab_comments where table_type='TABLE' and instr(table_name,'BIN$')<1 views = select distinct table_name,comments from user_tab_comments where table_type='VIEW' procedure = select distinct name,'' as comments From user_source where type = 'PROCEDURE' view_detail = select text from all_views where view_name='#$#' diff --git a/datahub/relation/relation_analyze.py b/datahub/relation/relation_analyze.py index 09aaacc..d9e7a65 100644 --- a/datahub/relation/relation_analyze.py +++ b/datahub/relation/relation_analyze.py @@ -11,7 +11,7 @@ import sqlparse from sqllineage.runner import LineageRunner from datahub.graph.graph_helper import Neo4jHelper from datahub.local_db_conf import graph -from common.log_conf import Logger +from datahub.log_conf import Logger from datahub.metadata.metadata_warehouse import MetadataWareHouse from datahub.metadata.constant.metadata_constant import MetaDataObjType from common.futool.fu_collection import is_not_empty diff --git a/datahub/scheduletask/scan_task.py b/datahub/scheduletask/scan_task.py index ffda781..5beda88 100644 --- a/datahub/scheduletask/scan_task.py +++ b/datahub/scheduletask/scan_task.py @@ -11,16 +11,14 @@ from datahub.local_db_conf import local_conn from datahub.metadata.metadata_reader import MetadataReader from datahub.metadata.metadata_warehouse import MetadataWareHouse from datahub.scheduletask.scandao.scan_task_dao import ScanTaskDao -from common.futool.fu_function import singleton from datahub.scheduletask.task_executor import ScheduleExecutor from datahub.scheduletask.schedule import CronExpTrigger -from common.log_conf import Logger +from datahub.log_conf import Logger from datahub.metadata.constant.metadata_constant import MetaDataObjType logger = Logger().get_logger() -@singleton class ScanTaskManage(object): def __init__(self): self.dao = ScanTaskDao(local_conn) @@ -29,7 +27,7 @@ class ScanTaskManage(object): """ 添加任务 :param source_id: 数据源ID - :param cron: cron表达式 + :param cron: cron表达式,默认一天一次 :return: """ if self.dao.exist_by_id(source_id): @@ -88,8 +86,8 @@ class ScanTaskRunner(object): enable_task = self.dao.query_all_task('Y') if enable_task: for task in enable_task: - self.executor.submit(task[0], CronExpTrigger.parse_crontab(task[1]), self.scanner.scan_metadata) - logger.info(f'task [{task[0]}] submit success') + self.executor.submit(task[0], CronExpTrigger.parse_crontab(task[1]), self.scanner.scan_print) + logger.info(f'datasource scan task [{task[0]}] submit success') class ScanTaskExecutor(object): @@ -97,8 +95,21 @@ class ScanTaskExecutor(object): def __init__(self): self.datasource_manage = DataSourceManage() + def scan_print(self, source_id): + # 元数据仓库API + warehouse = MetadataWareHouse(source_id) + # 获取待扫描数据源 + datasource = self.datasource_manage.get(source_id) + # 初始化元数据读取器 + metadata_reader = MetadataReader(datasource) + # 分别读取表\视图\存储过程并入库 + logger.info(f'开始扫描[{source_id}]元数据') + tables = metadata_reader.query_tables() + logger.info(f'[{source_id}]读取表完毕,共{len(tables)}张') + def scan_metadata(self, source_id): """ + TODO python暂时无法实现注解事务,后续使用动态代理处理 扫描元数据 :param source_id: :return: @@ -110,31 +121,13 @@ class ScanTaskExecutor(object): # 初始化元数据读取器 metadata_reader = MetadataReader(datasource) # 分别读取表\视图\存储过程并入库 + logger.info(f'开始扫描[{source_id}]元数据') tables = metadata_reader.query_tables() warehouse.save_metadata_obj(tables, MetaDataObjType.Table.value) + logger.info(f'[{source_id}]读取表完毕,共{len(tables)}张') views = metadata_reader.query_views() warehouse.save_metadata_obj(views, MetaDataObjType.View.value) + logger.info(f'[{source_id}]读取视图完毕,共{len(views)}张') procedures = metadata_reader.query_procedure() warehouse.save_metadata_obj(procedures, MetaDataObjType.Procedure.value) - - # TODO 临时调用 - # w_procedures = warehouse.query_metadata_id_name(MetaDataObjType.Procedure.value) - # w_procedures = warehouse.query_metadata_id_name(MetaDataObjType.View.value) - # 查询视图语句 - # for v in w_procedures: - # sql = metadata_reader.query_metadata_detail(v[1], MetaDataObjType.View.value) - # # warehouse.save_metadata_obj_detail([list(v)[0]] + [sql]) - # warehouse.save_metadata_obj_detail([(v[0], sql)]) - - # w_tables = warehouse.query_metadata_id_name(MetaDataObjType.Table.value) - # for t in w_tables: - # fields = metadata_reader.query_metadata_detail(t[1], MetaDataObjType.Table.value) - # fields_data = [] - # for f in fields: - # fields_data.append(tuple([list(t)[0]] + list(f))) - # warehouse.save_metadata_obj_field(fields_data) - - -if __name__ == '__main__': - ste = ScanTaskExecutor() - ste.scan_metadata('834164a2d62de959c0261e6239dd1e55') + logger.info(f'[{source_id}]读取存储过程完毕,共{len(procedures)}个')