From a001e27f01f552d7c282dc022c4f573fd4150cd9 Mon Sep 17 00:00:00 2001 From: old-tom <892955278@qq.com> Date: Wed, 10 May 2023 11:03:16 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E5=85=83=E6=95=B0=E6=8D=AE=E6=9F=A5?= =?UTF-8?q?=E8=AF=A2=E5=A2=9E=E5=8A=A0=E7=89=88=E6=9C=AC=E5=8F=B7=E6=8E=A7?= =?UTF-8?q?=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/futool/core/fu_id.py | 4 +- datahub/datasource/dsdao/ds_dao.py | 5 +- datahub/local_db_conf.py | 2 +- datahub/metadata/metadat_scan.py | 107 +++++++++++++++++++ datahub/metadata/metadata_warehouse.py | 38 ++++--- datahub/metadata/metadatadao/metadata_dao.py | 61 +++++++---- datahub/relation/relation_analyze.py | 14 +-- datahub/scheduletask/scan_task.py | 53 +-------- 8 files changed, 183 insertions(+), 101 deletions(-) create mode 100644 datahub/metadata/metadat_scan.py diff --git a/common/futool/core/fu_id.py b/common/futool/core/fu_id.py index b1fda37..73e8ff5 100644 --- a/common/futool/core/fu_id.py +++ b/common/futool/core/fu_id.py @@ -9,8 +9,6 @@ import time import abc from datahub.log_conf import log -logger = Logger().get_logger() - class IdGenerator(object): """ @@ -95,7 +93,7 @@ class SnowFlakeId(AbsIdGenerator): timestamp = self._gen_timestamp() # 时钟回拨 if timestamp < self.last_timestamp: - logger.error('clock is moving backwards. Rejecting requests until{}'.format(self.last_timestamp)) + log.error('clock is moving backwards. Rejecting requests until{}'.format(self.last_timestamp)) raise Exception if timestamp == self.last_timestamp: self.sequence = (self.sequence + 1) & self.SEQUENCE_MASK diff --git a/datahub/datasource/dsdao/ds_dao.py b/datahub/datasource/dsdao/ds_dao.py index 711d90f..9c275c3 100644 --- a/datahub/datasource/dsdao/ds_dao.py +++ b/datahub/datasource/dsdao/ds_dao.py @@ -12,9 +12,6 @@ from sqlalchemy import text from datahub.log_conf import log from datahub.datasource.constant import ds_conf_param -# 日志 -logger = Logger().get_logger() - class DataSourceDao(BaseDao): def __init__(self, connector: ConnFactory): @@ -36,7 +33,7 @@ class DataSourceDao(BaseDao): return True except Exception as e: conn.rollback() - logger.error(f'添加数据源[{conf}]失败,e={e}') + log.error(f'添加数据源[{conf}]失败,e={e}') return False finally: conn.close() diff --git a/datahub/local_db_conf.py b/datahub/local_db_conf.py index 16e513b..ff89f3e 100644 --- a/datahub/local_db_conf.py +++ b/datahub/local_db_conf.py @@ -14,4 +14,4 @@ local_db = ds_conf_param('postgresql', 'postgres', 'root@123', 'localhost', 5432 local_conn = ConnFactory(local_db) # neo4j配置 -graph = Graph("bolt://localhost:7687", auth=("neo4j", "root@12345")) +graph = Graph("bolt://localhost:7687", auth=("neo4j", "havana-academy-emerald-herman-scale-5422")) diff --git a/datahub/metadata/metadat_scan.py b/datahub/metadata/metadat_scan.py new file mode 100644 index 0000000..6f3070e --- /dev/null +++ b/datahub/metadata/metadat_scan.py @@ -0,0 +1,107 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2023/5/9 16:06 +# @Author : old tom +# @File : metadat_scan.py +# @Project : futool-tiny-datahub +# @Desc : +from datahub.datasource.datasource_manage import DataSourceManage +from datahub.metadata.metadata_reader import MetadataReader +from datahub.metadata.metadata_warehouse import MetadataWareHouse +from datahub.metadata.constant.metadata_constant import MetaDataObjType +from datahub.log_conf import log +from datahub.metadata.metaversion.metadata_version import MetadataVersionKeeper +from common.futool.core.fu_date import StopWatch + + +class MetadataScanner(object): + + def __init__(self): + self.datasource_manage = DataSourceManage() + + @staticmethod + def _join_meta_detail(meta_datas, metadata_reader: MetadataReader, obj_type): + """ + 组装明细结果 + :return: + """ + save_datas = [] + for i, t in enumerate(meta_datas): + meta_id, obj_name = t[0], t[1] + meta_result = metadata_reader.query_metadata_detail(obj_name, obj_type) + # 组装插入数据 + obj_data = ([meta_id] + list(x) for x in meta_result) + for d in obj_data: + save_datas.append(d) + log.info(f"剩余{len(meta_datas) - (i + 1)}") + return save_datas + + def _scan_detail(self, warehouse: MetadataWareHouse, metadata_reader: MetadataReader): + """ + 获取明细数据 + :param warehouse: 元数据仓库 + :param metadata_reader: 元数据读取器 + :return: + """ + source_id = warehouse.source_id + watch = StopWatch() + # 读取表字段 + watch.start(f"{source_id}_table") + meta_tables = warehouse.query_metadata_id_name(MetaDataObjType.Table.value) + log.info(f"开始查询[{warehouse.source_id}] 表字段数据,共{len(meta_tables)}张表") + table_field_datas = self._join_meta_detail(meta_tables, metadata_reader, MetaDataObjType.Table.value) + warehouse.save_metadata_obj_field(table_field_datas) + table_time = watch.stop(f"{source_id}_table") + log.info(f"[{warehouse.source_id}] 表字段数据查询结束,耗时{table_time}秒") + # 读取视图创建语句 + watch.start(f"{source_id}_view") + meta_views = warehouse.query_metadata_id_name(MetaDataObjType.View.value) + log.info(f"开始查询[{warehouse.source_id}] 视图创建语句,共{len(meta_views)}张视图") + view_datas = self._join_meta_detail(meta_views, metadata_reader, MetaDataObjType.View.value) + warehouse.save_metadata_obj_detail(view_datas) + view_time = watch.stop(f"{source_id}_view") + log.info(f"[{warehouse.source_id}] 视图语句查询结束,耗时{view_time}秒") + # 读取存储过程创建语句 + watch.start(f"{source_id}_procedure") + meta_procedure = warehouse.query_metadata_id_name(MetaDataObjType.Procedure.value) + log.info(f"开始查询[{warehouse.source_id}] 存储过程创建语句,共{len(meta_views)}张过程") + procedure_datas = self._join_meta_detail(meta_procedure, metadata_reader, MetaDataObjType.Procedure.value) + warehouse.save_metadata_obj_detail(procedure_datas) + procedure_time = watch.stop(f"{source_id}_procedure") + log.info(f"[{warehouse.source_id}] 存储过程语句查询结束,耗时{procedure_time}秒") + + def scan_metadata(self, source_id): + """ + TODO python暂时无法实现注解事务,后续使用动态代理或装饰器实现 + 扫描元数据 + :param source_id: + :return: + """ + # 元数据仓库API + warehouse = MetadataWareHouse(source_id) + # 获取待扫描数据源 + datasource = self.datasource_manage.get(source_id) + # 初始化元数据读取器 + metadata_reader = MetadataReader(datasource) + # 获取版本号 + + # version_keeper = MetadataVersionKeeper(source_id) + # version_code = version_keeper.increase_version() + # # 分别读取表\视图\存储过程并入库 + # log.info(f'开始扫描[{source_id}]元数据,版本号为[{version_code}]') + # tables = metadata_reader.query_tables() + # warehouse.save_metadata_obj(tables, MetaDataObjType.Table.value, version_code) + # log.info(f'[{source_id}]读取表完毕,共{len(tables)}张') + # views = metadata_reader.query_views() + # warehouse.save_metadata_obj(views, MetaDataObjType.View.value, version_code) + # log.info(f'[{source_id}]读取视图完毕,共{len(views)}张') + # procedures = metadata_reader.query_procedure() + # warehouse.save_metadata_obj(procedures, MetaDataObjType.Procedure.value, version_code) + # log.info(f'[{source_id}]读取存储过程完毕,共{len(procedures)}个') + # 查询明细数据 + self._scan_detail(warehouse, metadata_reader) + + +if __name__ == '__main__': + scanner = MetadataScanner() + scanner.scan_metadata('834164a2d62de959c0261e6239dd1e55') diff --git a/datahub/metadata/metadata_warehouse.py b/datahub/metadata/metadata_warehouse.py index 1d06475..7e75c4e 100644 --- a/datahub/metadata/metadata_warehouse.py +++ b/datahub/metadata/metadata_warehouse.py @@ -21,14 +21,15 @@ class MetadataWareHouse(object): self.source_id = source_id self.dao = MetadataDao(local_conn) - def save_metadata_obj(self, objs, obj_type): + def save_metadata_obj(self, objs, obj_type, version_code): """ 保存元数据对象 + :param version_code: 版本号 :param obj_type: 元数据类型 :param objs: :return: """ - data = [(id_gen.get_id(), self.source_id, obj_type, x[0], x[1]) for x in objs] + data = [(id_gen.get_id(), self.source_id, obj_type, x[0], x[1], version_code) for x in objs] return self.dao.save_metadata_obj(data) def save_metadata_obj_detail(self, details): @@ -55,21 +56,23 @@ class MetadataWareHouse(object): """ return self.dao.query_metadata_by_type(obj_type) - def query_metadata_name(self, obj_type=None): + def query_metadata_name(self, obj_type=None, version_code=None): """ 查询元数据名称 + :param version_code: 版本号 :param obj_type: :return: """ - return self.dao.query_metadata_name_by_type(obj_type) + return self.dao.query_metadata_name_by_type(obj_type, self.source_id, version_code) - def query_metadata_id_name(self, obj_type=None): + def query_metadata_id_name(self, obj_type=None, version_code=None): """ 查询元数据ID及名称 + :param version_code: 版本号 :param obj_type: :return: """ - return self.dao.query_metadata_id(obj_type) + return self.dao.query_metadata_id_name(obj_type, self.source_id, version_code) def query_view_create(self, meta_id): """ @@ -79,13 +82,14 @@ class MetadataWareHouse(object): """ return self.dao.query_metadata_create(MetaDataObjType.View.value, meta_id, self.source_id) - def query_view_create_by_name(self, view_name): + def query_view_create_by_name(self, view_name, version_code=None): """ 视图查询创建语句 + :param version_code: 版本号 :param view_name: :return: """ - return self.dao.query_metadata_create_by_name(view_name, self.source_id) + return self.dao.query_metadata_create_by_name(view_name, self.source_id, version_code) def query_procedure_create(self, meta_id): """ @@ -95,24 +99,26 @@ class MetadataWareHouse(object): """ return self.dao.query_metadata_create(MetaDataObjType.Procedure.value, meta_id, self.source_id) - def query_metadata_type(self, meta_id): + def query_metadata_type(self, meta_id, version_code=None): """ 查询元数据类型 + :param version_code: 版本号 :param meta_id: :return: """ - return self.dao.query_metadata_type(meta_id, self.source_id) + return self.dao.query_metadata_type(meta_id, self.source_id, version_code) - def query_metadata_type_by_name(self, meta_name): + def query_metadata_type_by_name(self, meta_name, version_code=None): """ 根据名称查询元数据类型 + :param version_code: 版本号 :param meta_name: 表名、视图名、存储过程名 :return: """ - return self.dao.query_metadata_type_by_name(meta_name, self.source_id) + return self.dao.query_metadata_type_by_name(meta_name, self.source_id, version_code) - def is_table(self, table_name): - return self.query_metadata_type_by_name(table_name) == MetaDataObjType.Table.value + def is_table(self, table_name, version_code=None): + return self.query_metadata_type_by_name(table_name, version_code) == MetaDataObjType.Table.value - def is_view(self, view_name): - return self.query_metadata_type_by_name(view_name) == MetaDataObjType.View.value + def is_view(self, view_name, version_code=None): + return self.query_metadata_type_by_name(view_name, version_code) == MetaDataObjType.View.value diff --git a/datahub/metadata/metadatadao/metadata_dao.py b/datahub/metadata/metadatadao/metadata_dao.py index a7d4ce4..305548c 100644 --- a/datahub/metadata/metadatadao/metadata_dao.py +++ b/datahub/metadata/metadatadao/metadata_dao.py @@ -82,7 +82,7 @@ class MetadataDao(BaseDao): :return: """ return self.batch_insert( - sql_tpl='insert into metadata_object(meta_id,source_id,meta_type,meta_name,meta_ch_name) values %s', + sql_tpl='insert into metadata_object(meta_id,source_id,meta_type,meta_name,meta_ch_name,version_code) values %s', data=objs, batch_size=1500) def save_metadata_create(self, details): @@ -90,24 +90,39 @@ class MetadataDao(BaseDao): sql_tpl='insert into metadata_object_create(meta_id,create_sql) values %s', data=details, batch_size=1500) + @staticmethod + def _get_version_sql(source_id, version_code=None): + return ' and ' + (f'version_code={version_code}' if version_code \ + else f"version_code=(select max(version_code) from metadata_object_version_record where source_id='{source_id}')") + def query_metadata_by_type(self, obj_type): return self.query_all( f"select * from metadata_object where meta_type='{obj_type}'") if obj_type else self.query_all( 'select * from metadata_object') - def query_metadata_name_by_type(self, obj_type): + def query_metadata_name_by_type(self, obj_type, source_id, version_code=None): """ 查询元数据名称 - :param obj_type: + :param version_code: 版本号,默认查最新 + :param source_id: 源ID + :param obj_type: 类型 :return: """ - sql = f"select meta_name from metadata_object where meta_type='{obj_type}'" if obj_type else 'select meta_name from metadata_object' + sql = f"select meta_name from metadata_object where source_id='{source_id}' and meta_type='{obj_type}'" + self._get_version_sql( + source_id, version_code) return [x[0] for x in self.query_all(sql)] - def query_metadata_id(self, obj_type): - return self.query_all( - f"select meta_id,meta_name from metadata_object where meta_type='{obj_type}'") if obj_type else self.query_all( - 'select meta_id,meta_name from metadata_object') + def query_metadata_id_name(self, obj_type, source_id, version_code=None): + """ + 查询元数据meta_id及meta_name + :param obj_type: + :param source_id: + :param version_code: 版本号 + :return: + """ + sql = f"select meta_id,meta_name from metadata_object where meta_type='{obj_type}'" \ + + self._get_version_sql(source_id, version_code)+" limit 10" + return self.query_all(sql) def query_metadata_create(self, obj_type, meta_id, source_id): """ @@ -118,39 +133,45 @@ class MetadataDao(BaseDao): :return: """ # metadata_object_create与metadata_object非强外键关系,所以加入source_id保证查询结果正确 - create_sql = self.query_one( - sql=f"select create_sql from metadata_object_create where meta_id in (select meta_id from metadata_object where meta_id={meta_id} and source_id='{source_id}' and meta_type='{obj_type}')") + sql = f"select create_sql from metadata_object_create where meta_id in (select meta_id from metadata_object where meta_id={meta_id} and source_id='{source_id}' and meta_type='{obj_type}')" + create_sql = self.query_one(sql) return create_sql[0] if create_sql and len(create_sql) > 0 else '' - def query_metadata_create_by_name(self, meta_name, source_id): + def query_metadata_create_by_name(self, meta_name, source_id, version_code=None): """ 名称查询创建语句 + :param version_code: 版本号 :param meta_name: :param source_id: :return: """ - create_sql = self.query_one( - sql=f"select create_sql from metadata_object_create where meta_id=(select meta_id from metadata_object where meta_name='{meta_name}' and source_id='{source_id}')") + sql = f"select create_sql from metadata_object_create where meta_id=(select meta_id from metadata_object where meta_name='{meta_name}' and source_id='{source_id}'" + self._get_version_sql( + source_id, version_code) + ")" + create_sql = self.query_one(sql) return create_sql[0] if create_sql and len(create_sql) > 0 else '' - def query_metadata_type(self, meta_id, source_id): + def query_metadata_type(self, meta_id, source_id, version_code=None): """ 查询元数据类型 + :param version_code: 版本号 :param meta_id: :param source_id: :return: """ - rt = self.query_one( - f"select meta_type from metadata_object where meta_id={meta_id} and source_id='{source_id}'") + sql = f"select meta_type from metadata_object where meta_id={meta_id} and source_id='{source_id}'" \ + + self._get_version_sql(source_id, version_code) + rt = self.query_one(sql) return rt[0] if rt and len(rt) > 0 else None - def query_metadata_type_by_name(self, meta_name, source_id): + def query_metadata_type_by_name(self, meta_name, source_id, version_code=None): """ 根据元数据名称查询类型 + :param version_code: 版本号 :param meta_name: 元数据名 :param source_id: 源ID :return: """ - rt = self.query_one( - f"select meta_type from metadata_object where meta_name='{meta_name}' and source_id='{source_id}' limit 1") - return rt[0] if rt and len(rt) > 0 else None \ No newline at end of file + sql = f"select meta_type from metadata_object where meta_name='{meta_name}' and source_id='{source_id}'" \ + + self._get_version_sql(source_id, version_code) + rt = self.query_one(sql) + return rt[0] if rt and len(rt) > 0 else None diff --git a/datahub/relation/relation_analyze.py b/datahub/relation/relation_analyze.py index ef35b42..df686e2 100644 --- a/datahub/relation/relation_analyze.py +++ b/datahub/relation/relation_analyze.py @@ -35,7 +35,7 @@ def analyze_view_sql(sql: str): try: analyze_result = LineageRunner(sql) except Exception as e: - logger.error(f'sql解析异常,e={e}') + log.error(f'sql解析异常,e={e}') return None try: # 获取源表 @@ -43,10 +43,10 @@ def analyze_view_sql(sql: str): target_table = analyze_result.target_tables if source_table[0] == target_table[0]: # 防止无限递归 - logger.warning(f'源表与目标表相同') + log.warning(f'源表与目标表相同') return None except Exception as e: - logger.error(f'获取源表异常,e={e}') + log.error(f'获取源表异常,e={e}') return None else: return [str(s).split(sep='.')[1].upper() for s in source_table] @@ -66,7 +66,7 @@ def analyze_procedure_sql(sql: str): try: analyze_result = LineageRunner(sql) except Exception as e: - logger.error(f'sql解析异常,e={e}') + log.error(f'sql解析异常,e={e}') return None try: # 获取源表 @@ -79,7 +79,7 @@ def analyze_procedure_sql(sql: str): 'source': [_format_table_name(s) for s in source_table] }) except Exception as e: - logger.error(f'获取源表异常,e={e}') + log.error(f'获取源表异常,e={e}') return None return result @@ -121,7 +121,7 @@ class MetadataRelationAnalyzer(object): try: self._recurrence_view(view[1]) except Exception as e: - logger.error(f'视图{view[1]}分析异常,e={e}') + log.error(f'视图{view[1]}分析异常,e={e}') finally: stop_time = time.time() log.info(f'视图{view[1]}分析结束,耗时{round(stop_time - start_time, 2)}秒') @@ -197,7 +197,7 @@ class MetadataRelationAnalyzer(object): source_type, 'name') except Exception as e: - logger.error(f'存储过程{procedure[1]}分析异常,e={e}') + log.error(f'存储过程{procedure[1]}分析异常,e={e}') finally: stop_time = time.time() log.info( diff --git a/datahub/scheduletask/scan_task.py b/datahub/scheduletask/scan_task.py index 9613a95..86b466e 100644 --- a/datahub/scheduletask/scan_task.py +++ b/datahub/scheduletask/scan_task.py @@ -5,16 +5,12 @@ # @File : scan_task.py # @Project : futool-tiny-datahub # @Desc : 扫描任务,通过调度扫描数据源-->获取数据源-->读取元数据并同步 - -from datahub.datasource.datasource_manage import DataSourceManage from datahub.local_db_conf import local_conn -from datahub.metadata.metadata_reader import MetadataReader -from datahub.metadata.metadata_warehouse import MetadataWareHouse +from datahub.metadata.metadat_scan import MetadataScanner from datahub.scheduletask.scandao.scan_task_dao import ScanTaskDao from datahub.scheduletask.task_executor import ScheduleExecutor from datahub.scheduletask.schedule import CronExpTrigger from datahub.log_conf import log -from datahub.metadata.constant.metadata_constant import MetaDataObjType class ScanTaskManage(object): @@ -74,7 +70,7 @@ class ScanTaskRunner(object): def __init__(self): self.dao = ScanTaskDao(local_conn) self.executor = ScheduleExecutor() - self.scanner = ScanTaskExecutor() + self.scanner = MetadataScanner() def run(self): """ @@ -84,48 +80,5 @@ class ScanTaskRunner(object): enable_task = self.dao.query_all_task('Y') if enable_task: for task in enable_task: - self.executor.submit(task[0], CronExpTrigger.parse_crontab(task[1]), self.scanner.scan_print) + self.executor.submit(task[0], CronExpTrigger.parse_crontab(task[1]), self.scanner.scan_metadata) log.info(f'datasource scan task [{task[0]}] submit success') - - -class ScanTaskExecutor(object): - - def __init__(self): - self.datasource_manage = DataSourceManage() - - def scan_print(self, source_id): - # 元数据仓库API - warehouse = MetadataWareHouse(source_id) - # 获取待扫描数据源 - datasource = self.datasource_manage.get(source_id) - # 初始化元数据读取器 - metadata_reader = MetadataReader(datasource) - # 分别读取表\视图\存储过程并入库 - log.info(f'开始扫描[{source_id}]元数据') - tables = metadata_reader.query_tables() - log.info(f'[{source_id}]读取表完毕,共{len(tables)}张') - - def scan_metadata(self, source_id): - """ - TODO python暂时无法实现注解事务,后续使用动态代理处理 - 扫描元数据 - :param source_id: - :return: - """ - # 元数据仓库API - warehouse = MetadataWareHouse(source_id) - # 获取待扫描数据源 - datasource = self.datasource_manage.get(source_id) - # 初始化元数据读取器 - metadata_reader = MetadataReader(datasource) - # 分别读取表\视图\存储过程并入库 - log.info(f'开始扫描[{source_id}]元数据') - tables = metadata_reader.query_tables() - warehouse.save_metadata_obj(tables, MetaDataObjType.Table.value) - log.info(f'[{source_id}]读取表完毕,共{len(tables)}张') - views = metadata_reader.query_views() - warehouse.save_metadata_obj(views, MetaDataObjType.View.value) - log.info(f'[{source_id}]读取视图完毕,共{len(views)}张') - procedures = metadata_reader.query_procedure() - warehouse.save_metadata_obj(procedures, MetaDataObjType.Procedure.value) - log.info(f'[{source_id}]读取存储过程完毕,共{len(procedures)}个')