From a2de66574fd99caebf43cd0ca9d8c9b0745b8d50 Mon Sep 17 00:00:00 2001 From: old-tom <892955278@qq.com> Date: Wed, 12 Apr 2023 17:06:39 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=96=B0=E5=A2=9E=E5=85=83=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=BA=93=E7=AE=A1=E7=90=86=E3=80=81=E6=89=AB=E6=8F=8F?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/fudb/connectors/connector_factory.py | 2 +- common/futool/__init__.py | 7 + common/futool/fu_function.py | 21 +++ common/futool/fu_id.py | 134 ++++++++++++++++++ common/log_conf.py | 2 +- datahub/datasource/datasource_manage.py | 25 ++-- datahub/datasource/dsdao/ds_dao.py | 39 +++-- datahub/local_db_conf.py | 13 ++ datahub/logout/info_log.txt | 0 datahub/metadata/metadata_reader.py | 37 ++++- datahub/metadata/metadata_warehouse.py | 27 ++++ datahub/metadata/metadatadao/metadata_dao.py | 35 +++++ datahub/metadata/reader_conf.ini | 9 +- datahub/relation/relation_analyze.py | 16 ++- datahub/scheduletask/__init__.py | 7 + datahub/scheduletask/scan_task.py | 61 ++++++++ datahub/scheduletask/scandao/__init__.py | 7 + datahub/scheduletask/scandao/scan_task_dao.py | 67 +++++++++ datahub/scheduletask/schedule.py | 38 +++++ datahub/scheduletask/task_executor.py | 32 +++++ logout/info_log.txt | 3 - 21 files changed, 542 insertions(+), 40 deletions(-) create mode 100644 common/futool/__init__.py create mode 100644 common/futool/fu_function.py create mode 100644 common/futool/fu_id.py create mode 100644 datahub/local_db_conf.py delete mode 100644 datahub/logout/info_log.txt create mode 100644 datahub/metadata/metadata_warehouse.py create mode 100644 datahub/scheduletask/__init__.py create mode 100644 datahub/scheduletask/scan_task.py create mode 100644 datahub/scheduletask/scandao/__init__.py create mode 100644 datahub/scheduletask/scandao/scan_task_dao.py create mode 100644 datahub/scheduletask/schedule.py create mode 100644 datahub/scheduletask/task_executor.py delete mode 100644 logout/info_log.txt diff --git a/common/fudb/connectors/connector_factory.py b/common/fudb/connectors/connector_factory.py index e0368b4..67665dc 100644 --- a/common/fudb/connectors/connector_factory.py +++ b/common/fudb/connectors/connector_factory.py @@ -28,7 +28,7 @@ class ConnFactory(object): self.connector_id = self._gen_connector_id(conf.db_type, conf.user, conf.password, conf.host, conf.port, conf.database) # 尝试从缓存获取 - if connector_cache.exist(self.connector_id): + if connector_cache.exist(self.connector_id) and connector_cache.get(self.connector_id): self.connector = connector_cache.get(self.connector_id) else: # urlquote 用于处理密码中的特殊字符例如@ diff --git a/common/futool/__init__.py b/common/futool/__init__.py new file mode 100644 index 0000000..1a85cc4 --- /dev/null +++ b/common/futool/__init__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2023/4/9 14:53 +# @Author : old tom +# @File : __init__.py.py +# @Project : futool-tiny-datahub +# @Desc : diff --git a/common/futool/fu_function.py b/common/futool/fu_function.py new file mode 100644 index 0000000..1f5ab63 --- /dev/null +++ b/common/futool/fu_function.py @@ -0,0 +1,21 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2023/4/12 15:09 +# @Author : old tom +# @File : fu_function.py +# @Project : futool-tiny-datahub +# @Desc : +def singleton(cls): + """ + 单例装饰器 + :param cls: + :return: + """ + _instance = {} + + def inner(): + if cls not in _instance: + _instance[cls] = cls() + return _instance[cls] + + return inner diff --git a/common/futool/fu_id.py b/common/futool/fu_id.py new file mode 100644 index 0000000..bd74ceb --- /dev/null +++ b/common/futool/fu_id.py @@ -0,0 +1,134 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2023/4/9 14:54 +# @Author : old tom +# @File : fu_id.py +# @Project : futool-tiny-datahub +# @Desc : ID 生成器 +import time +import abc +from common.log_conf import Logger + +logger = Logger().get_logger() + + +class IdGenerator(object): + """ + ID 生成器 + """ + + def __init__(self, id_type='snowflake'): + """ + :param id_type: id类型,默认雪花算法 + """ + self.id_type = id_type + self.snowflake = SnowFlakeId(datacenter_id=1, worker_id=1) + self.uuid = UUID() + + def get_id(self): + return { + 'snowflake': self.snowflake, + 'uuid': self.uuid + }[self.id_type].get_id() + + +class AbsIdGenerator(metaclass=abc.ABCMeta): + @abc.abstractmethod + def get_id(self): + pass + + +class SnowFlakeId(AbsIdGenerator): + """ + 雪花ID生成 + 会生成一个64bit的整数,最终存到数据库就只占用8字节 + 1bit: 一般是符号位,代表正负数的所以这一位不做处理 + 41bit:这个部分用来记录时间戳,如果从1970-01-01 00:00:00来计算开始时间的话,它可以记录到2039年,足够我们用了,并且后续我们可以设置起始时间,这样就不用担心不够的问题, 这一个部分是保证我们生辰的id趋势递增的关键。 + 10bit:这是用来记录机器id的, 默认情况下这10bit会分成两部分前5bit代表数据中心,后5bit代表某个数据中心的机器id,默认情况下计算大概可以支持32*32 - 1= 1023台机器。 + 12bit:循环位,来对应1毫秒内产生的不同的id, 大概可以满足1毫秒并发生成2^12-1=4095次id的要求。 + """ + # 64位ID的划分 + WORKER_ID_BITS = 5 + DATACENTER_ID_BITS = 5 + SEQUENCE_BITS = 12 + # 最大取值计算 + MAX_WORKER_ID = -1 ^ (-1 << WORKER_ID_BITS) # 2**5-1 0b11111 + MAX_DATACENTER_ID = -1 ^ (-1 << DATACENTER_ID_BITS) + # 移位偏移计算 + WOKER_ID_SHIFT = SEQUENCE_BITS + DATACENTER_ID_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS + TIMESTAMP_LEFT_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS + DATACENTER_ID_BITS + # 序号循环掩码 + SEQUENCE_MASK = -1 ^ (-1 << SEQUENCE_BITS) + # Twitter元年时间戳 + TWEPOCH = 1288834974657 + + def __init__(self, datacenter_id, worker_id, start_seq=0): + """ + :param datacenter_id: 数据中心ID + :param worker_id: 机器ID + :param start_seq: 起始序号 + """ + # sanity check + if worker_id > self.MAX_WORKER_ID or worker_id < 0: + raise ValueError('worker_id值越界') + if datacenter_id > self.MAX_DATACENTER_ID or datacenter_id < 0: + raise ValueError('datacenter_id值越界') + self.worker_id = worker_id + self.datacenter_id = datacenter_id + self.sequence = start_seq + self.last_timestamp = -1 # 上次计算的时间戳 + + @staticmethod + def _gen_timestamp(): + """ + 生成整数时间戳 + :return:int timestamp + """ + return int(time.time() * 1000) + + def get_id(self): + """ + 获取新ID + :return: + """ + timestamp = self._gen_timestamp() + # 时钟回拨 + if timestamp < self.last_timestamp: + logger.error('clock is moving backwards. Rejecting requests until{}'.format(self.last_timestamp)) + raise Exception + if timestamp == self.last_timestamp: + self.sequence = (self.sequence + 1) & self.SEQUENCE_MASK + if self.sequence == 0: + timestamp = self._til_next_millis(self.last_timestamp) + else: + self.sequence = 0 + self.last_timestamp = timestamp + new_id = ((timestamp - self.TWEPOCH) << self.TIMESTAMP_LEFT_SHIFT) | ( + self.datacenter_id << self.DATACENTER_ID_SHIFT) | \ + (self.worker_id << self.WOKER_ID_SHIFT) | self.sequence + return new_id + + def _til_next_millis(self, last_timestamp): + """ + 等到下一毫秒 + """ + timestamp = self._gen_timestamp() + while timestamp <= last_timestamp: + timestamp = self._gen_timestamp() + return timestamp + + +class UUID(AbsIdGenerator): + """ + UUID生成 + """ + + def __init__(self): + pass + + def get_id(self): + pass + + +id_gen = IdGenerator() diff --git a/common/log_conf.py b/common/log_conf.py index 93349ea..d82d48a 100644 --- a/common/log_conf.py +++ b/common/log_conf.py @@ -10,7 +10,7 @@ import sys from loguru import logger # 日志输出路径 -LOG_PATH = '../logout/info_log.txt' +LOG_PATH = '../logout/info_log.log' class Logger(object): diff --git a/datahub/datasource/datasource_manage.py b/datahub/datasource/datasource_manage.py index d1384c7..72b080f 100644 --- a/datahub/datasource/datasource_manage.py +++ b/datahub/datasource/datasource_manage.py @@ -11,6 +11,7 @@ from common.fudb.dbapis.fu_db_api import select_one from datahub.datasource.dsdao.ds_dao import DataSourceDao from common.log_conf import Logger from datahub.datasource.constant import ds_conf_param, DUAL_DB_TYPE +from datahub.local_db_conf import local_conn # 日志 logger = Logger().get_logger() @@ -31,10 +32,10 @@ class DataSourceManage(object): 数据源管理 """ - def __init__(self, local_db): - self.dao = DataSourceDao(local_db) + def __init__(self): + self.dao = DataSourceDao(local_conn) - def add(self, conf: ds_conf_param): + def add(self, conf: ds_conf_param, cron='0 0 0 1/1 * ?'): # 初始化连接器 connector = ConnFactory(conf) if self.dao.exist_by_source(connector.connector_id): @@ -44,8 +45,8 @@ class DataSourceManage(object): checker = DataSourceChecker(connector) result, msg = checker.check() if result: - # 入库 - return self.dao.add_datasource(connector.connector_id, conf), '' + # 入库并添加扫描任务 + return self.dao.add_datasource(connector.connector_id, conf, cron), '' else: # 返回错误信息 return result, f'check failed,{msg}' @@ -103,16 +104,8 @@ class DataSourceChecker(object): return False, f'cannot get connection,e={e}' try: # 测试select 1 - return int(select_one(conn, 'select 1 ' + ( - 'from dual' if self.connector.db_type in DUAL_DB_TYPE else ''))) > 0, 'success' + sql = 'select 1 ' + ( + 'from dual' if self.connector.db_type in DUAL_DB_TYPE else '') + return int(select_one(conn, sql)[0]) > 0, 'success' except Exception as e: return False, f'cannot execute "select 1",e={e}' - - -if __name__ == '__main__': - local_ds = ds_conf_param('postgresql', 'postgres', 'root@123', 'localhost', 5432, 'postgres') - factory_1 = ConnFactory(local_ds) - factory_2 = ConnFactory(local_ds) - print(factory_1.connector_id, factory_2.connector_id) - print(factory_1 is factory_2) - print(factory_2.connector is factory_1.connector) diff --git a/datahub/datasource/dsdao/ds_dao.py b/datahub/datasource/dsdao/ds_dao.py index 882458d..31a7303 100644 --- a/datahub/datasource/dsdao/ds_dao.py +++ b/datahub/datasource/dsdao/ds_dao.py @@ -8,17 +8,38 @@ from common.fudb.connectors.connector_factory import ConnFactory from common.fudb.dbapis.fu_dao import BaseDao +from sqlalchemy import text +from common.log_conf import Logger from datahub.datasource.constant import ds_conf_param +# 日志 +logger = Logger().get_logger() + class DataSourceDao(BaseDao): def __init__(self, connector: ConnFactory): super().__init__(connector) - def add_datasource(self, source_id, conf: ds_conf_param): - sql = f"insert into datasource_main (source_id,source_type,host,port,username,password,database_name) values ('{source_id}'" \ - f",'{conf.db_type}','{conf.host}',{conf.port},'{conf.user}','{conf.password}','{conf.database}')" - return self.execute_update(sql) > 0 + def add_datasource(self, source_id, conf: ds_conf_param, cron): + conn = self.connector.get_conn() + try: + # 开启事务 + conn.begin() + # 添加数据源 + sql = f"insert into datasource_main (source_id,source_type,host,port,username,password,database_name) values ('{source_id}'" \ + f",'{conf.db_type}','{conf.host}',{conf.port},'{conf.user}','{conf.password}','{conf.database}')" + conn.execute(text(sql)) + # 创建扫描任务 + scan_sql = f"insert into scan_task_conf (source_id,cron_expression) values ('{source_id}','{cron}')" + conn.execute(text(scan_sql)) + conn.commit() + return True + except Exception as e: + conn.rollback() + logger.error(f'添加数据源[{conf}]失败,e={e}') + return False + finally: + conn.close() def remove_datasource(self, source_id): return self.execute_update(f"delete from datasource_main where source_id='{source_id}'") @@ -33,7 +54,8 @@ class DataSourceDao(BaseDao): :param source_id: :return: """ - return self.query_one(f"select 1 from datasource_main where source_id='{source_id}'") > 0 + sql = f"select 1 from datasource_main where source_id='{source_id}'" + return self.query_one(sql) is not None def edit_datasource_conf(self, source_id, param_dict): """ @@ -49,10 +71,3 @@ class DataSourceDao(BaseDao): conf_field = ['source_type', 'username', 'password', 'host', 'port', 'database_name'] return self.query_one( f"select {','.join(conf_field)} from datasource_main where source_id='{source_id}'") - - -if __name__ == '__main__': - local_ds = ds_conf_param('postgresql', 'postgres', 'root@123', 'localhost', 5432, 'postgres') - dao = DataSourceDao(ConnFactory(local_ds)) - rt = dao.query_datasource_conf('db143d11741a9575fdea92ed2b39dc53') - print(ds_conf_param._make(rt)) diff --git a/datahub/local_db_conf.py b/datahub/local_db_conf.py new file mode 100644 index 0000000..761fafa --- /dev/null +++ b/datahub/local_db_conf.py @@ -0,0 +1,13 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2023/4/9 15:37 +# @Author : old tom +# @File : local_db_conf.py +# @Project : futool-tiny-datahub +# @Desc : 数据库连接配置 +from datahub.datasource.constant import ds_conf_param +from common.fudb.connectors.connector_factory import ConnFactory + +# 系统使用数据库配置 数据库类型 用户名 密码 host 端口 默认数据库 +local_db = ds_conf_param('postgresql', 'postgres', 'root@123', 'localhost', 5432, 'postgres') +local_conn = ConnFactory(local_db) diff --git a/datahub/logout/info_log.txt b/datahub/logout/info_log.txt deleted file mode 100644 index e69de29..0000000 diff --git a/datahub/metadata/metadata_reader.py b/datahub/metadata/metadata_reader.py index 7139129..e74cfe2 100644 --- a/datahub/metadata/metadata_reader.py +++ b/datahub/metadata/metadata_reader.py @@ -13,6 +13,7 @@ from datahub.datasource.constant import ds_conf_param from datahub.datasource.datasource_manage import DataSource, DataSourceManage from datahub.metadata.metadatadao.metadata_dao import MetadataDao from common.fudb.connectors.connector_factory import ConnFactory +from sqllineage.runner import LineageRunner reader_conf = ConfigParser() reader_conf.read('./reader_conf.ini') @@ -73,16 +74,42 @@ class MetadataReader(AbsMetadataReader): return self.dao.query_all_views(reader_conf[self.db_type]['views']) def query_procedure(self): - pass + return self.dao.query_all_procedure(reader_conf[self.db_type]['procedure']) def query_table_fields(self, table): - pass + return self.dao.query_table_field(reader_conf[self.db_type]['table_field'].replace('#$#', table)) + + def query_metadata_detail(self, obj_name, obj_type): + """ + 查询元数据明细 + 例:obj_type 为view或procedure 则查询对应创建语句 + 为table 则查询中文注释 + :param obj_name: 对象名称 + :param obj_type: 对象类型 + :return: + """ + return { + 'table': '', + 'view': self.dao.query_view_detail(obj_name, + reader_conf[self.db_type][obj_type + '_detail'].replace('#$#', + obj_name)), + 'procedure': self.dao.query_procedure_detail( + reader_conf[self.db_type][obj_type + '_detail'].replace('#$#', obj_name)) + }[obj_type] if __name__ == '__main__': local_ds = ds_conf_param('postgresql', 'postgres', 'root@123', 'localhost', 5432, 'postgres') dsm = DataSourceManage(ConnFactory(local_ds)) - ds = dsm.get('db143d11741a9575fdea92ed2b39dc53') + ds = dsm.get('834164a2d62de959c0261e6239dd1e55') mtr = MetadataReader(ds) - print(mtr.query_tables()) - print(mtr.query_views()) + # print(mtr.query_tables()) + # print(mtr.query_views()) + # print(mtr.query_procedure()) + # sql = mtr.query_metadata_detail('DW__FINANCE_SURVEY_MONTH_TJ_VW', 'view') + # runner = LineageRunner(sql, dialect='oracle') + # 画图 + # runner.draw(dialect='oracle') + # print(runner) + # runner.draw(dialect='oracle') + print(mtr.query_table_fields('ODS_CONTRACT_PAY_INFO')) diff --git a/datahub/metadata/metadata_warehouse.py b/datahub/metadata/metadata_warehouse.py new file mode 100644 index 0000000..c589adf --- /dev/null +++ b/datahub/metadata/metadata_warehouse.py @@ -0,0 +1,27 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2023/4/9 14:41 +# @Author : old tom +# @File : metadata_warehouse.py +# @Project : futool-tiny-datahub +# @Desc : 元数据存储 + +from common.futool.fu_id import id_gen + + +class MetadataWareHouse(object): + """ + 元数据仓库 + """ + + def __init__(self, source_id): + self.source_id = source_id + + def save_metadata_obj(self): + pass + + def save_metadata_obj_detail(self): + pass + + def save_metadata_obj_field(self): + pass diff --git a/datahub/metadata/metadatadao/metadata_dao.py b/datahub/metadata/metadatadao/metadata_dao.py index f0ce952..269989d 100644 --- a/datahub/metadata/metadatadao/metadata_dao.py +++ b/datahub/metadata/metadatadao/metadata_dao.py @@ -29,3 +29,38 @@ class MetadataDao(BaseDao): :return: """ return [v[0] for v in self.query_all(sql)] + + def query_all_procedure(self, sql): + """ + 查询所有存储过程 + :param sql: + :return: + """ + return [p[0] for p in self.query_all(sql)] + + def query_procedure_detail(self, sql): + """ + 查询视图及存储过程明细 + :param sql: + :return: + """ + rt = self.query_all(sql) + # 合并为完整SQL + return 'CREATE OR REPLACE ' + '\r'.join([x[0] for x in rt]) + + def query_view_detail(self, view_name, sql): + """ + 查询视图创建语句 + :param view_name: + :param sql: + :return: + """ + return f'CREATE OR REPLACE VIEW {view_name} AS ' + '\r\n' + self.query_one(sql)[0] + + def query_table_field(self, sql): + """ + 查询表字段 + :param sql: + :return: + """ + return self.query_all(sql) diff --git a/datahub/metadata/reader_conf.ini b/datahub/metadata/reader_conf.ini index a9a827d..73a92aa 100644 --- a/datahub/metadata/reader_conf.ini +++ b/datahub/metadata/reader_conf.ini @@ -1,7 +1,14 @@ [oracle] tables = select distinct table_name from user_tab_comments where table_type='TABLE' views = select distinct table_name from user_tab_comments where table_type='VIEW' +procedure = select distinct name From user_source where type = 'PROCEDURE' +view_detail = select text from all_views where view_name='#$#' +procedure_detail = SELECT text FROM user_source WHERE NAME = '#$#' ORDER BY line +table_field = select b.COLUMN_NAME,a.COMMENTS, b.COLUMN_ID, b.DATA_TYPE, b.DATA_LENGTH, b.NULLABLE, b.DEFAULT_LENGTH, b.DATA_DEFAULT from user_col_comments a left join user_tab_columns b on a.table_name=b.table_name + and a.COLUMN_NAME = b.COLUMN_NAME where a.table_name = '#$#' ORDER BY b.COLUMN_ID asc [postgresql] tables = SELECT distinct table_name FROM information_schema.tables WHERE table_schema = 'public' ORDER BY table_name -views = select distinct table_name from information_schema.views WHERE table_schema = 'public' ORDER BY table_name \ No newline at end of file +views = select distinct table_name from information_schema.views WHERE table_schema = 'public' ORDER BY table_name +view_detail = +procedure_detail = \ No newline at end of file diff --git a/datahub/relation/relation_analyze.py b/datahub/relation/relation_analyze.py index f48580b..e71ba8e 100644 --- a/datahub/relation/relation_analyze.py +++ b/datahub/relation/relation_analyze.py @@ -6,10 +6,24 @@ # @Project : futool-tiny-datahub # @Desc : +from sqllineage.runner import LineageRunner + + class MetadataRelationAnalyzer(object): """ 元数据关系分析 + 流程:从元数据存储读取视图-->分析视图(递归)-->视图与表关联关系-->存入Neo4j """ def __init__(self, source_id): - pass + self.source_id = source_id + + +if __name__ == '__main__': + with open(r'D:\文档\工作\ATD\2023上半年\项目\事权\DW_FINANCE_PLAN_DETAIL_BASE_VW.sql', 'r', encoding='utf-8') as f: + sql = f.read() + runner = LineageRunner(sql, dialect='oracle') + # 画图 + # runner.draw(dialect='oracle') + print(runner) + print(runner.source_tables) diff --git a/datahub/scheduletask/__init__.py b/datahub/scheduletask/__init__.py new file mode 100644 index 0000000..40805f5 --- /dev/null +++ b/datahub/scheduletask/__init__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2023/4/11 18:52 +# @Author : old tom +# @File : __init__.py.py +# @Project : futool-tiny-datahub +# @Desc : 任务定时调度模块 diff --git a/datahub/scheduletask/scan_task.py b/datahub/scheduletask/scan_task.py new file mode 100644 index 0000000..afa1376 --- /dev/null +++ b/datahub/scheduletask/scan_task.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2023/4/11 19:08 +# @Author : old tom +# @File : scan_task.py +# @Project : futool-tiny-datahub +# @Desc : 扫描任务,通过调度扫描数据源-->获取数据源-->读取元数据并同步 +from datahub.local_db_conf import local_conn +from datahub.scheduletask.scandao.scan_task_dao import ScanTaskDao + + +class ScanTaskManage(object): + def __init__(self): + self.dao = ScanTaskDao(local_conn) + + def add_task(self, source_id, cron='0 0 0 1/1 * ?'): + """ + 添加任务 + :param source_id: 数据源ID + :param cron: cron表达式 + :return: + """ + if self.dao.exist_by_id(source_id): + return False, f'[{source_id}] all ready exist' + return self.dao.add_task(source_id, cron), 'add success' + + def switch_task(self, source_id, enable='N'): + """ + 开关任务 + :param source_id: 源ID + :param enable: 是否开启 Y开|N关 + :return: + """ + return self.dao.switch_task(source_id, enable) + + def edit_cron(self, source_id, cron): + """ + 修改任务CRON表达式 + :param source_id: 源ID + :param cron: cron表达式 + :return: + """ + return self.dao.edit_task_cron(source_id, cron) + + def query_task(self, enable=None): + """ + 任务查询 + :param enable: + :return: + """ + return self.dao.query_all_task(enable) + + def query_task_by_id(self, source_id): + """ + 根据ID 查任务 + :return: + """ + return self.dao.query_task_by_id(source_id) + + +scan_task_manage = ScanTaskManage() diff --git a/datahub/scheduletask/scandao/__init__.py b/datahub/scheduletask/scandao/__init__.py new file mode 100644 index 0000000..0918e2a --- /dev/null +++ b/datahub/scheduletask/scandao/__init__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2023/4/12 15:24 +# @Author : old tom +# @File : __init__.py.py +# @Project : futool-tiny-datahub +# @Desc : diff --git a/datahub/scheduletask/scandao/scan_task_dao.py b/datahub/scheduletask/scandao/scan_task_dao.py new file mode 100644 index 0000000..b78e647 --- /dev/null +++ b/datahub/scheduletask/scandao/scan_task_dao.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2023/4/12 15:24 +# @Author : old tom +# @File : scan_task_dao.py +# @Project : futool-tiny-datahub +# @Desc : + +from common.fudb.connectors.connector_factory import ConnFactory +from common.fudb.dbapis.fu_dao import BaseDao + + +class ScanTaskDao(BaseDao): + def __init__(self, connector: ConnFactory): + super().__init__(connector) + + def add_task(self, source_id, cron): + return self.execute_update( + f"insert into scan_task_conf (source_id,cron_expression) values ('{source_id}','{cron}')") > 0 + + def exist_by_id(self, source_id): + """ + 判断任务是否存在 + :param source_id: + :return: + """ + return self.query_one(f"select count(1) from scan_task_conf where source_id='{source_id}'")[0] > 0 + + def remove_task(self, source_id): + return self.execute_update(f"delete from scan_task_conf where source_id='{source_id}'") > 0 + + def switch_task(self, source_id, enable): + """ + 开关任务 + :param source_id: 数据源ID + :param enable: Y开启 N关闭 + :return: + """ + return self.execute_update(f"update scan_task_conf set enable='{enable}' where source_id='{source_id}'") > 0 + + def edit_task_cron(self, source_id, cron): + """ + 修改任务cron表达式 + :param source_id: 数据源ID + :param cron: cron表达式 + :return: + """ + return self.execute_update( + f"update scan_task_conf set cron_expression='{cron}' where source_id='{source_id}'") > 0 + + def query_all_task(self, enable): + """ + 查询所有已开启任务 + :return: + """ + if not enable: + return self.query_all("select source_id,cron_expression from scan_task_conf") + return self.query_all(f"select source_id,cron_expression from scan_task_conf where enable='{enable}'") + + def query_task_by_id(self, source_id): + """ + 源ID 查任务 + :param source_id: + :return: + """ + return self.query_one( + f"select source_id,cron_expression,enable from scan_task_conf where source_id='{source_id}'") diff --git a/datahub/scheduletask/schedule.py b/datahub/scheduletask/schedule.py new file mode 100644 index 0000000..2885e58 --- /dev/null +++ b/datahub/scheduletask/schedule.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2023/4/12 15:15 +# @Author : old tom +# @File : schedule.py +# @Project : futool-tiny-datahub +# @Desc : 定时任务配置 +from apscheduler.triggers.cron import CronTrigger +from apscheduler.schedulers.background import BackgroundScheduler + +# 后台指定调度,配合fastapi使用 +# BlockingScheduler: 调用start函数后会阻塞当前线程。当调度器是你应用中唯一要运行的东西时(如上例)使用。 +# BackgroundScheduler: 调用start后主线程不会阻塞。当你不运行任何其他框架时使用,并希望调度器在你应用的后台执行。 +# 每隔 1分钟 运行一次 job 方法 +# scheduler.add_job(tick, trigger=CronExpTrigger.parse_crontab('0/1 * * * * * *'), kwargs={ +# "name": "bob" +# } +scheduler = BackgroundScheduler() + + +class CronExpTrigger(CronTrigger): + """ + 重写cron触发器,支持6\7位 cron表达式 + 7位:* * * * * * * + 秒、分、时、天、月、周、年 + """ + + @classmethod + def parse_crontab(cls, expr, timezone=None): + values = expr.split() + if len(values) == 6: + # 6位转7位-->? 转 *-->末尾补充 * + values[5] = '*' + values = values + ['*'] + if len(values) != 7: + raise ValueError('Wrong number of fields; got {}, expected 7'.format(len(values))) + return cls(second=values[0], minute=values[1], hour=values[2], day=values[3], month=values[4], + day_of_week=values[5], year=values[6], timezone=timezone) diff --git a/datahub/scheduletask/task_executor.py b/datahub/scheduletask/task_executor.py new file mode 100644 index 0000000..3b27cc6 --- /dev/null +++ b/datahub/scheduletask/task_executor.py @@ -0,0 +1,32 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2023/4/11 19:10 +# @Author : old tom +# @File : task_executor.py +# @Project : futool-tiny-datahub +# @Desc : 任务执行器,负责将任务添加到scheduler队列中 +from common.futool.fu_function import singleton + + +@singleton +class CommonTaskExecutor(object): + """ + 通用任务执行器 + """ + + def __init__(self, scheduler, cron_trigger): + """ + :param scheduler: 调度器 + :param cron_trigger: cron触发器 + """ + self.cron_trigger = cron_trigger + self.scheduler = scheduler + + def submit(self, source_id, cron): + """ + 提交任务 + :param source_id: 数据源ID + :param cron: cron表达式 + :return: + """ + pass diff --git a/logout/info_log.txt b/logout/info_log.txt deleted file mode 100644 index 0a6512c..0000000 --- a/logout/info_log.txt +++ /dev/null @@ -1,3 +0,0 @@ -20230408 11:11:37 - MainProcess | MainThread | test_log.test:16 - INFO -fefefe -20230408 11:11:37 - MainProcess | MainThread | test_log.test:17 - ERROR -223232 -20230408 11:11:37 - MainProcess | MainThread | test_log.test:18 - WARNING -999