From 75835a4213f500295ea6dbcbbf997eda3aae322f Mon Sep 17 00:00:00 2001 From: old-tom <892955278@msn.cn> Date: Sun, 28 May 2023 21:23:55 +0800 Subject: [PATCH] =?UTF-8?q?fix:=E5=85=83=E6=95=B0=E6=8D=AE=E6=89=AB?= =?UTF-8?q?=E6=8F=8F=E7=A9=BA=E9=9B=86=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- datahub/metadata/metadat_scan.py | 38 +++++++++++-------- .../metaversion/metadata_comparator.py | 3 ++ .../metaversion/metadata_compare_task.py | 12 ++++-- 3 files changed, 34 insertions(+), 19 deletions(-) diff --git a/datahub/metadata/metadat_scan.py b/datahub/metadata/metadat_scan.py index 320bfa4..9ea9ce4 100644 --- a/datahub/metadata/metadat_scan.py +++ b/datahub/metadata/metadat_scan.py @@ -13,6 +13,7 @@ from datahub.metadata.constant.metadata_constant import MetaDataObjType from datahub.log_conf import log from datahub.metadata.metaversion.metadata_version import MetadataVersionKeeper from common.futool.core.fu_date import StopWatch +from common.futool.core.fu_collection import is_not_empty class MetadataScanner(object): @@ -63,19 +64,21 @@ class MetadataScanner(object): # 读取视图创建语句 watch.start(f"{source_id}_view") meta_views = warehouse.query_metadata_id_name(MetaDataObjType.View.value) - log.info(f"开始查询[{warehouse.source_id}] 视图创建语句,共{len(meta_views)}张视图") - view_datas = self._join_meta_detail(meta_views, source_id, MetaDataObjType.View.value) - warehouse.save_metadata_obj_detail(view_datas) - view_time = watch.stop(f"{source_id}_view") - log.info(f"[{warehouse.source_id}] 视图语句入库结束,耗时{view_time}秒") + if is_not_empty(meta_views): + log.info(f"开始查询[{warehouse.source_id}] 视图创建语句,共{len(meta_views)}张视图") + view_datas = self._join_meta_detail(meta_views, source_id, MetaDataObjType.View.value) + warehouse.save_metadata_obj_detail(view_datas) + view_time = watch.stop(f"{source_id}_view") + log.info(f"[{warehouse.source_id}] 视图语句入库结束,耗时{view_time}秒") # 读取存储过程创建语句 watch.start(f"{source_id}_procedure") meta_procedure = warehouse.query_metadata_id_name(MetaDataObjType.Procedure.value) - log.info(f"开始查询[{warehouse.source_id}] 存储过程创建语句,共{len(meta_procedure)}张过程") - procedure_datas = self._join_meta_detail(meta_procedure, source_id, MetaDataObjType.Procedure.value) - warehouse.save_metadata_obj_detail(procedure_datas) - procedure_time = watch.stop(f"{source_id}_procedure") - log.info(f"[{warehouse.source_id}] 存储过程语句入库结束,耗时{procedure_time}秒") + if is_not_empty(meta_procedure): + log.info(f"开始查询[{warehouse.source_id}] 存储过程创建语句,共{len(meta_procedure)}张过程") + procedure_datas = self._join_meta_detail(meta_procedure, source_id, MetaDataObjType.Procedure.value) + warehouse.save_metadata_obj_detail(procedure_datas) + procedure_time = watch.stop(f"{source_id}_procedure") + log.info(f"[{warehouse.source_id}] 存储过程语句入库结束,耗时{procedure_time}秒") watch.clear() def scan_metadata(self, source_id): @@ -97,14 +100,17 @@ class MetadataScanner(object): log.info(f'开始扫描[{source_id}]元数据,版本号为[{version_code}]') try: tables = metadata_reader.query_tables() - warehouse.save_metadata_obj(tables, MetaDataObjType.Table.value, version_code) - log.info(f'[{source_id}]读取表完毕,共{len(tables)}张') + if is_not_empty(tables): + warehouse.save_metadata_obj(tables, MetaDataObjType.Table.value, version_code) + log.info(f'[{source_id}]读取表完毕,共{len(tables)}张') views = metadata_reader.query_views() - warehouse.save_metadata_obj(views, MetaDataObjType.View.value, version_code) - log.info(f'[{source_id}]读取视图完毕,共{len(views)}张') + if is_not_empty(views): + warehouse.save_metadata_obj(views, MetaDataObjType.View.value, version_code) + log.info(f'[{source_id}]读取视图完毕,共{len(views)}张') procedures = metadata_reader.query_procedure() - warehouse.save_metadata_obj(procedures, MetaDataObjType.Procedure.value, version_code) - log.info(f'[{source_id}]读取存储过程完毕,共{len(procedures)}个') + if is_not_empty(procedures): + warehouse.save_metadata_obj(procedures, MetaDataObjType.Procedure.value, version_code) + log.info(f'[{source_id}]读取存储过程完毕,共{len(procedures)}个') # 查询明细数据 self._scan_detail(warehouse) except Exception as e: diff --git a/datahub/metadata/metaversion/metadata_comparator.py b/datahub/metadata/metaversion/metadata_comparator.py index 6cf8b77..698a1eb 100644 --- a/datahub/metadata/metaversion/metadata_comparator.py +++ b/datahub/metadata/metaversion/metadata_comparator.py @@ -12,6 +12,7 @@ from datahub.metadata.metadata_warehouse import MetadataWareHouse from datahub.metadata.constant.metadata_constant import MetaDataObjType from common.futool.core.fu_collection import list_diff, list_intersection, is_not_empty, list_2_md5 from common.futool.core.fu_lang import str_md5 +from datahub.metadata.metaversion.metadata_compare_task import MetadataCompareTaskManage, MetadataCompareTask class MetadataComparator(metaclass=abc.ABCMeta): @@ -23,10 +24,12 @@ class MetadataComparator(metaclass=abc.ABCMeta): :param source_id_1: 数据源1 :param source_id_2: 数据源2 """ + self.task_manage = MetadataCompareTaskManage() self.version_1 = version_1 self.version_2 = version_2 self.warehouse_1 = MetadataWareHouse(source_id_1) self.warehouse_2 = MetadataWareHouse(source_id_2) if source_id_2 else None + self.task_manage.create_task(MetadataCompareTask(version_1, version_2, source_id_1, source_id_2)) @abc.abstractmethod def compare(self): diff --git a/datahub/metadata/metaversion/metadata_compare_task.py b/datahub/metadata/metaversion/metadata_compare_task.py index f0c2a21..2ca9950 100644 --- a/datahub/metadata/metaversion/metadata_compare_task.py +++ b/datahub/metadata/metaversion/metadata_compare_task.py @@ -49,15 +49,21 @@ class MetadataCompareTaskManage(object): def __init__(self): self.dao = MetaTaskDao(local_conn) - def create_task(self, task: MetadataCompareTask) -> bool: + def create_task(self, task: MetadataCompareTask) -> str: """ 创建任务 :return: """ compare_id = task.compare_id_gen() self._check_task_exist(compare_id) - return self.dao.create_task(compare_id, task.version_code_1, task.version_code_2, task.source_id_1, - task.source_id_2, task.compare_task_type) + rt = self.dao.create_task(compare_id, task.version_code_1, task.version_code_2, task.source_id_1, + task.source_id_2, task.compare_task_type) + if rt: + log.info('比较任务创建成功,任务ID=[{}]', compare_id) + else: + log.error('比较任务创建失败') + raise self.MetadataCompareTaskError() + return compare_id def finish_task(self, compare_id): """