diff --git a/common/fudb/dbapis/fu_dao.py b/common/fudb/dbapis/fu_dao.py index 34d762a..c962389 100644 --- a/common/fudb/dbapis/fu_dao.py +++ b/common/fudb/dbapis/fu_dao.py @@ -27,7 +27,7 @@ class BaseDao(object): def execute_update(self, sql): return execute_update(self.connector.get_conn(), sql) - def batch_insert(self, db_type, sql_tpl, data, batch_size): + def batch_insert(self, sql_tpl, data, batch_size, db_type='postgresql'): return batch_insert(self.connector.get_conn(), db_type, sql_tpl, data, batch_size) def dynamic_update_by_param(self, table_name, condition, param: dict): diff --git a/common/fudb/dbapis/fu_db_api.py b/common/fudb/dbapis/fu_db_api.py index 2bad8f5..5c157ff 100644 --- a/common/fudb/dbapis/fu_db_api.py +++ b/common/fudb/dbapis/fu_db_api.py @@ -88,6 +88,7 @@ def execute_update(conn: Connection, sql): def batch_insert(conn: Connection, db_type, sql_tpl, data, batch_size=1500): """ 批量插入 + 将sql转为into oracle_table ( id, code ) values( 1 , '1' ),( 2 , '2' ),( 3 , '3' ) :param conn: 数据库连接 :param batch_size: 每次插入量 :param db_type: 数据库类型 @@ -168,7 +169,7 @@ class BatchInsertHandler(object): for ds in data_set: val = '(' for ele in ds: - val += "'" + ele + "'," if isinstance(ele, str) else str(ele) + ',' + val += self._field_value_convert(ele) val = val[0:-1] + ')' begin += (self.sql_tpl.replace('%s', val) + ' \r ') end = 'select 1 from dual' @@ -179,10 +180,28 @@ class BatchInsertHandler(object): for ds in data_set: val = '(' for ele in ds: - val += "'" + ele + "'," if isinstance(ele, str) else str(ele) + ',' + val += self._field_value_convert(ele) val = val[0:-1] + ')' vals += val + ',' return self.sql_tpl.replace('%s', vals[0:-1]) def build_mysql_insert(self, data_set): return self.build_pg_insert(data_set) + + @staticmethod + def _field_value_convert(field_val): + """ + 字段类型转换 + :param field_val: 字段值 + :return: + """ + # None处理 + if field_val is None: + field_val = '' + # 特殊字符处理 + if isinstance(field_val, str) and "'" in field_val: + field_val = field_val.replace("'", '') + # TODO 不可见字符处理,查询创建SQL时会删除\r,导致SQL格式混乱 + # if isinstance(field_val, str): + # field_val = ''.join(x for x in field_val if x.isprintable()) + return "'" + field_val + "'," if isinstance(field_val, str) else str(field_val) + ',' diff --git a/datahub/metadata/constant/__init__.py b/datahub/metadata/constant/__init__.py new file mode 100644 index 0000000..f0b3faf --- /dev/null +++ b/datahub/metadata/constant/__init__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2023/4/14 14:09 +# @Author : old tom +# @File : __init__.py.py +# @Project : futool-tiny-datahub +# @Desc : diff --git a/datahub/metadata/constant/metadata_constant.py b/datahub/metadata/constant/metadata_constant.py new file mode 100644 index 0000000..fa261f2 --- /dev/null +++ b/datahub/metadata/constant/metadata_constant.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2023/4/14 14:09 +# @Author : old tom +# @File : metadata_constant.py +# @Project : futool-tiny-datahub +# @Desc : +from enum import Enum + + +class MetaDataObjType(Enum): + Table = 'table' + View = 'view' + Procedure = 'procedure' diff --git a/datahub/metadata/metadata_reader.py b/datahub/metadata/metadata_reader.py index e74cfe2..ea13770 100644 --- a/datahub/metadata/metadata_reader.py +++ b/datahub/metadata/metadata_reader.py @@ -7,16 +7,16 @@ # @Desc : 元数据读取 import abc +import os from configparser import ConfigParser -from datahub.datasource.constant import ds_conf_param -from datahub.datasource.datasource_manage import DataSource, DataSourceManage +from datahub.datasource.datasource_manage import DataSource from datahub.metadata.metadatadao.metadata_dao import MetadataDao -from common.fudb.connectors.connector_factory import ConnFactory -from sqllineage.runner import LineageRunner + +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) reader_conf = ConfigParser() -reader_conf.read('./reader_conf.ini') +reader_conf.read(os.path.join(BASE_DIR, 'reader_conf.ini')) class AbsMetadataReader(metaclass=abc.ABCMeta): @@ -89,27 +89,11 @@ class MetadataReader(AbsMetadataReader): :return: """ return { - 'table': '', + 'table': self.dao.query_table_field( + reader_conf[self.db_type][obj_type + '_detail'].replace('#$#', obj_name)), 'view': self.dao.query_view_detail(obj_name, reader_conf[self.db_type][obj_type + '_detail'].replace('#$#', obj_name)), 'procedure': self.dao.query_procedure_detail( reader_conf[self.db_type][obj_type + '_detail'].replace('#$#', obj_name)) }[obj_type] - - -if __name__ == '__main__': - local_ds = ds_conf_param('postgresql', 'postgres', 'root@123', 'localhost', 5432, 'postgres') - dsm = DataSourceManage(ConnFactory(local_ds)) - ds = dsm.get('834164a2d62de959c0261e6239dd1e55') - mtr = MetadataReader(ds) - # print(mtr.query_tables()) - # print(mtr.query_views()) - # print(mtr.query_procedure()) - # sql = mtr.query_metadata_detail('DW__FINANCE_SURVEY_MONTH_TJ_VW', 'view') - # runner = LineageRunner(sql, dialect='oracle') - # 画图 - # runner.draw(dialect='oracle') - # print(runner) - # runner.draw(dialect='oracle') - print(mtr.query_table_fields('ODS_CONTRACT_PAY_INFO')) diff --git a/datahub/metadata/metadata_warehouse.py b/datahub/metadata/metadata_warehouse.py index c589adf..ee00cf0 100644 --- a/datahub/metadata/metadata_warehouse.py +++ b/datahub/metadata/metadata_warehouse.py @@ -7,6 +7,8 @@ # @Desc : 元数据存储 from common.futool.fu_id import id_gen +from datahub.metadata.metadatadao.metadata_dao import MetadataDao +from datahub.local_db_conf import local_conn class MetadataWareHouse(object): @@ -16,12 +18,54 @@ class MetadataWareHouse(object): def __init__(self, source_id): self.source_id = source_id + self.dao = MetadataDao(local_conn) - def save_metadata_obj(self): - pass + def save_metadata_obj(self, objs, obj_type): + """ + 保存元数据对象 + :param obj_type: 元数据类型 + :param objs: + :return: + """ + data = [(id_gen.get_id(), self.source_id, obj_type, x[0], x[1]) for x in objs] + return self.dao.save_metadata_obj(data) - def save_metadata_obj_detail(self): - pass + def save_metadata_obj_detail(self, details): + """ + 保存元数据对象明细 + :param details: + :return: + """ + return self.dao.save_metadata_create(details) - def save_metadata_obj_field(self): - pass + def save_metadata_obj_field(self, fields): + """ + 保存字段 + :param fields: + :return: + """ + return self.dao.save_table_fields(fields) + + def query_metadata(self, obj_type=None): + """ + 查询元数据 + :param obj_type: + :return: + """ + return self.dao.query_metadata_by_type(obj_type) + + def query_metadata_name(self, obj_type=None): + """ + 查询元数据名称 + :param obj_type: + :return: + """ + return self.dao.query_metadata_name_by_type(obj_type) + + def query_metadata_id_name(self, obj_type=None): + """ + 查询元数据ID及名称 + :param obj_type: + :return: + """ + return self.dao.query_metadata_id(obj_type) diff --git a/datahub/metadata/metadatadao/metadata_dao.py b/datahub/metadata/metadatadao/metadata_dao.py index 269989d..5f0d4b3 100644 --- a/datahub/metadata/metadatadao/metadata_dao.py +++ b/datahub/metadata/metadatadao/metadata_dao.py @@ -20,7 +20,7 @@ class MetadataDao(BaseDao): :param sql: :return: """ - return [t[0] for t in self.query_all(sql)] + return self.query_all(sql) def query_all_views(self, sql): """ @@ -28,7 +28,7 @@ class MetadataDao(BaseDao): :param sql: :return: """ - return [v[0] for v in self.query_all(sql)] + return self.query_all(sql) def query_all_procedure(self, sql): """ @@ -36,7 +36,7 @@ class MetadataDao(BaseDao): :param sql: :return: """ - return [p[0] for p in self.query_all(sql)] + return self.query_all(sql) def query_procedure_detail(self, sql): """ @@ -46,7 +46,7 @@ class MetadataDao(BaseDao): """ rt = self.query_all(sql) # 合并为完整SQL - return 'CREATE OR REPLACE ' + '\r'.join([x[0] for x in rt]) + return 'create or replace ' + (''.join([str(x[0]).replace('\n', '\r') for x in rt])) def query_view_detail(self, view_name, sql): """ @@ -64,3 +64,47 @@ class MetadataDao(BaseDao): :return: """ return self.query_all(sql) + + def save_table_fields(self, data): + """ + 保存表字段 + :param data: + :return: + """ + return self.batch_insert( + sql_tpl='insert into metadata_object_field(meta_id,field_name,field_ch_name,order_num,field_type,nullable,default_value,field_length) values %s', + data=data, batch_size=1500) + + def save_metadata_obj(self, objs): + """ + 保存元数据 + :param objs: + :return: + """ + return self.batch_insert( + sql_tpl='insert into metadata_object(meta_id,source_id,meta_type,meta_name,meta_ch_name) values %s', + data=objs, batch_size=1500) + + def save_metadata_create(self, details): + return self.batch_insert( + sql_tpl='insert into metadata_object_create(meta_id,create_sql) values %s', + data=details, batch_size=1500) + + def query_metadata_by_type(self, obj_type): + return self.query_all( + f"select * from metadata_object where meta_type='{obj_type}'") if obj_type else self.query_all( + 'select * from metadata_object') + + def query_metadata_name_by_type(self, obj_type): + """ + 查询元数据名称 + :param obj_type: + :return: + """ + sql = f"select meta_name from metadata_object where meta_type='{obj_type}'" if obj_type else 'select meta_name from metadata_object' + return [x[0] for x in self.query_all(sql)] + + def query_metadata_id(self, obj_type): + return self.query_all( + f"select meta_id,meta_name from metadata_object where meta_type='{obj_type}'") if obj_type else self.query_all( + 'select meta_id,meta_name from metadata_object') diff --git a/datahub/metadata/metaversion/__init__.py b/datahub/metadata/metaversion/__init__.py new file mode 100644 index 0000000..4548db7 --- /dev/null +++ b/datahub/metadata/metaversion/__init__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2023/4/14 14:04 +# @Author : old tom +# @File : __init__.py.py +# @Project : futool-tiny-datahub +# @Desc : 元数据版本 diff --git a/datahub/metadata/reader_conf.ini b/datahub/metadata/reader_conf.ini index 73a92aa..c09b375 100644 --- a/datahub/metadata/reader_conf.ini +++ b/datahub/metadata/reader_conf.ini @@ -1,11 +1,11 @@ [oracle] -tables = select distinct table_name from user_tab_comments where table_type='TABLE' -views = select distinct table_name from user_tab_comments where table_type='VIEW' -procedure = select distinct name From user_source where type = 'PROCEDURE' +tables = select distinct table_name,comments from user_tab_comments where table_type='TABLE' and table_name not like '%BIN$%' +views = select distinct table_name,comments from user_tab_comments where table_type='VIEW' +procedure = select distinct name,'' as comments From user_source where type = 'PROCEDURE' view_detail = select text from all_views where view_name='#$#' procedure_detail = SELECT text FROM user_source WHERE NAME = '#$#' ORDER BY line -table_field = select b.COLUMN_NAME,a.COMMENTS, b.COLUMN_ID, b.DATA_TYPE, b.DATA_LENGTH, b.NULLABLE, b.DEFAULT_LENGTH, b.DATA_DEFAULT from user_col_comments a left join user_tab_columns b on a.table_name=b.table_name - and a.COLUMN_NAME = b.COLUMN_NAME where a.table_name = '#$#' ORDER BY b.COLUMN_ID asc +table_detail = select b.COLUMN_NAME,a.COMMENTS, b.COLUMN_ID, b.DATA_TYPE, b.NULLABLE,b.DATA_DEFAULT,b.DATA_LENGTH from user_col_comments a left join user_tab_columns b on a.table_name=b.table_name + and a.COLUMN_NAME = b.COLUMN_NAME where a.table_name = '#$#' ORDER BY b.COLUMN_ID asc [postgresql] tables = SELECT distinct table_name FROM information_schema.tables WHERE table_schema = 'public' ORDER BY table_name diff --git a/datahub/scheduletask/scan_task.py b/datahub/scheduletask/scan_task.py index afa1376..ffda781 100644 --- a/datahub/scheduletask/scan_task.py +++ b/datahub/scheduletask/scan_task.py @@ -5,10 +5,22 @@ # @File : scan_task.py # @Project : futool-tiny-datahub # @Desc : 扫描任务,通过调度扫描数据源-->获取数据源-->读取元数据并同步 + +from datahub.datasource.datasource_manage import DataSourceManage from datahub.local_db_conf import local_conn +from datahub.metadata.metadata_reader import MetadataReader +from datahub.metadata.metadata_warehouse import MetadataWareHouse from datahub.scheduletask.scandao.scan_task_dao import ScanTaskDao +from common.futool.fu_function import singleton +from datahub.scheduletask.task_executor import ScheduleExecutor +from datahub.scheduletask.schedule import CronExpTrigger +from common.log_conf import Logger +from datahub.metadata.constant.metadata_constant import MetaDataObjType + +logger = Logger().get_logger() +@singleton class ScanTaskManage(object): def __init__(self): self.dao = ScanTaskDao(local_conn) @@ -58,4 +70,71 @@ class ScanTaskManage(object): return self.dao.query_task_by_id(source_id) -scan_task_manage = ScanTaskManage() +class ScanTaskRunner(object): + """ + 扫描任务执行器 + """ + + def __init__(self): + self.dao = ScanTaskDao(local_conn) + self.executor = ScheduleExecutor() + self.scanner = ScanTaskExecutor() + + def run(self): + """ + 获取扫描配置提交到执行器 + :return: + """ + enable_task = self.dao.query_all_task('Y') + if enable_task: + for task in enable_task: + self.executor.submit(task[0], CronExpTrigger.parse_crontab(task[1]), self.scanner.scan_metadata) + logger.info(f'task [{task[0]}] submit success') + + +class ScanTaskExecutor(object): + + def __init__(self): + self.datasource_manage = DataSourceManage() + + def scan_metadata(self, source_id): + """ + 扫描元数据 + :param source_id: + :return: + """ + # 元数据仓库API + warehouse = MetadataWareHouse(source_id) + # 获取待扫描数据源 + datasource = self.datasource_manage.get(source_id) + # 初始化元数据读取器 + metadata_reader = MetadataReader(datasource) + # 分别读取表\视图\存储过程并入库 + tables = metadata_reader.query_tables() + warehouse.save_metadata_obj(tables, MetaDataObjType.Table.value) + views = metadata_reader.query_views() + warehouse.save_metadata_obj(views, MetaDataObjType.View.value) + procedures = metadata_reader.query_procedure() + warehouse.save_metadata_obj(procedures, MetaDataObjType.Procedure.value) + + # TODO 临时调用 + # w_procedures = warehouse.query_metadata_id_name(MetaDataObjType.Procedure.value) + # w_procedures = warehouse.query_metadata_id_name(MetaDataObjType.View.value) + # 查询视图语句 + # for v in w_procedures: + # sql = metadata_reader.query_metadata_detail(v[1], MetaDataObjType.View.value) + # # warehouse.save_metadata_obj_detail([list(v)[0]] + [sql]) + # warehouse.save_metadata_obj_detail([(v[0], sql)]) + + # w_tables = warehouse.query_metadata_id_name(MetaDataObjType.Table.value) + # for t in w_tables: + # fields = metadata_reader.query_metadata_detail(t[1], MetaDataObjType.Table.value) + # fields_data = [] + # for f in fields: + # fields_data.append(tuple([list(t)[0]] + list(f))) + # warehouse.save_metadata_obj_field(fields_data) + + +if __name__ == '__main__': + ste = ScanTaskExecutor() + ste.scan_metadata('834164a2d62de959c0261e6239dd1e55') diff --git a/datahub/scheduletask/schedule.py b/datahub/scheduletask/schedule.py index 2885e58..91c9b81 100644 --- a/datahub/scheduletask/schedule.py +++ b/datahub/scheduletask/schedule.py @@ -6,7 +6,8 @@ # @Project : futool-tiny-datahub # @Desc : 定时任务配置 from apscheduler.triggers.cron import CronTrigger -from apscheduler.schedulers.background import BackgroundScheduler +# from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.schedulers.blocking import BlockingScheduler # 后台指定调度,配合fastapi使用 # BlockingScheduler: 调用start函数后会阻塞当前线程。当调度器是你应用中唯一要运行的东西时(如上例)使用。 @@ -15,7 +16,7 @@ from apscheduler.schedulers.background import BackgroundScheduler # scheduler.add_job(tick, trigger=CronExpTrigger.parse_crontab('0/1 * * * * * *'), kwargs={ # "name": "bob" # } -scheduler = BackgroundScheduler() +sch = BlockingScheduler() class CronExpTrigger(CronTrigger): diff --git a/datahub/scheduletask/task_executor.py b/datahub/scheduletask/task_executor.py index 3b27cc6..216a19e 100644 --- a/datahub/scheduletask/task_executor.py +++ b/datahub/scheduletask/task_executor.py @@ -5,28 +5,36 @@ # @File : task_executor.py # @Project : futool-tiny-datahub # @Desc : 任务执行器,负责将任务添加到scheduler队列中 -from common.futool.fu_function import singleton +from datahub.scheduletask.schedule import sch -@singleton -class CommonTaskExecutor(object): +class ScheduleExecutor(object): """ - 通用任务执行器 + 定时任务任务执行器 """ - def __init__(self, scheduler, cron_trigger): + def __init__(self, scheduler=sch): """ - :param scheduler: 调度器 - :param cron_trigger: cron触发器 + :param scheduler: 调度器默认使用BackgroundScheduler """ - self.cron_trigger = cron_trigger self.scheduler = scheduler - def submit(self, source_id, cron): + def submit(self, source_id, cron_trigger, execute_fun): """ 提交任务 + :param execute_fun: 定时任务执行函数 + :param cron_trigger: cron触发器 :param source_id: 数据源ID - :param cron: cron表达式 :return: """ - pass + self.scheduler.add_job(execute_fun, trigger=cron_trigger, id=source_id, kwargs={ + 'source_id': source_id + }) + + def remove_job(self, source_id): + """ + 移除任务 + :param source_id: + :return: + """ + self.scheduler.remove_job(job_id=source_id)