fix:元数据扫描空集处理

dev
old-tom 2 years ago
parent 6b4f4f8d4a
commit 75835a4213

@ -13,6 +13,7 @@ from datahub.metadata.constant.metadata_constant import MetaDataObjType
from datahub.log_conf import log
from datahub.metadata.metaversion.metadata_version import MetadataVersionKeeper
from common.futool.core.fu_date import StopWatch
from common.futool.core.fu_collection import is_not_empty
class MetadataScanner(object):
@ -63,19 +64,21 @@ class MetadataScanner(object):
# 读取视图创建语句
watch.start(f"{source_id}_view")
meta_views = warehouse.query_metadata_id_name(MetaDataObjType.View.value)
log.info(f"开始查询[{warehouse.source_id}] 视图创建语句,共{len(meta_views)}张视图")
view_datas = self._join_meta_detail(meta_views, source_id, MetaDataObjType.View.value)
warehouse.save_metadata_obj_detail(view_datas)
view_time = watch.stop(f"{source_id}_view")
log.info(f"[{warehouse.source_id}] 视图语句入库结束,耗时{view_time}")
if is_not_empty(meta_views):
log.info(f"开始查询[{warehouse.source_id}] 视图创建语句,共{len(meta_views)}张视图")
view_datas = self._join_meta_detail(meta_views, source_id, MetaDataObjType.View.value)
warehouse.save_metadata_obj_detail(view_datas)
view_time = watch.stop(f"{source_id}_view")
log.info(f"[{warehouse.source_id}] 视图语句入库结束,耗时{view_time}")
# 读取存储过程创建语句
watch.start(f"{source_id}_procedure")
meta_procedure = warehouse.query_metadata_id_name(MetaDataObjType.Procedure.value)
log.info(f"开始查询[{warehouse.source_id}] 存储过程创建语句,共{len(meta_procedure)}张过程")
procedure_datas = self._join_meta_detail(meta_procedure, source_id, MetaDataObjType.Procedure.value)
warehouse.save_metadata_obj_detail(procedure_datas)
procedure_time = watch.stop(f"{source_id}_procedure")
log.info(f"[{warehouse.source_id}] 存储过程语句入库结束,耗时{procedure_time}")
if is_not_empty(meta_procedure):
log.info(f"开始查询[{warehouse.source_id}] 存储过程创建语句,共{len(meta_procedure)}张过程")
procedure_datas = self._join_meta_detail(meta_procedure, source_id, MetaDataObjType.Procedure.value)
warehouse.save_metadata_obj_detail(procedure_datas)
procedure_time = watch.stop(f"{source_id}_procedure")
log.info(f"[{warehouse.source_id}] 存储过程语句入库结束,耗时{procedure_time}")
watch.clear()
def scan_metadata(self, source_id):
@ -97,14 +100,17 @@ class MetadataScanner(object):
log.info(f'开始扫描[{source_id}]元数据,版本号为[{version_code}]')
try:
tables = metadata_reader.query_tables()
warehouse.save_metadata_obj(tables, MetaDataObjType.Table.value, version_code)
log.info(f'[{source_id}]读取表完毕,共{len(tables)}')
if is_not_empty(tables):
warehouse.save_metadata_obj(tables, MetaDataObjType.Table.value, version_code)
log.info(f'[{source_id}]读取表完毕,共{len(tables)}')
views = metadata_reader.query_views()
warehouse.save_metadata_obj(views, MetaDataObjType.View.value, version_code)
log.info(f'[{source_id}]读取视图完毕,共{len(views)}')
if is_not_empty(views):
warehouse.save_metadata_obj(views, MetaDataObjType.View.value, version_code)
log.info(f'[{source_id}]读取视图完毕,共{len(views)}')
procedures = metadata_reader.query_procedure()
warehouse.save_metadata_obj(procedures, MetaDataObjType.Procedure.value, version_code)
log.info(f'[{source_id}]读取存储过程完毕,共{len(procedures)}')
if is_not_empty(procedures):
warehouse.save_metadata_obj(procedures, MetaDataObjType.Procedure.value, version_code)
log.info(f'[{source_id}]读取存储过程完毕,共{len(procedures)}')
# 查询明细数据
self._scan_detail(warehouse)
except Exception as e:

@ -12,6 +12,7 @@ from datahub.metadata.metadata_warehouse import MetadataWareHouse
from datahub.metadata.constant.metadata_constant import MetaDataObjType
from common.futool.core.fu_collection import list_diff, list_intersection, is_not_empty, list_2_md5
from common.futool.core.fu_lang import str_md5
from datahub.metadata.metaversion.metadata_compare_task import MetadataCompareTaskManage, MetadataCompareTask
class MetadataComparator(metaclass=abc.ABCMeta):
@ -23,10 +24,12 @@ class MetadataComparator(metaclass=abc.ABCMeta):
:param source_id_1: 数据源1
:param source_id_2: 数据源2
"""
self.task_manage = MetadataCompareTaskManage()
self.version_1 = version_1
self.version_2 = version_2
self.warehouse_1 = MetadataWareHouse(source_id_1)
self.warehouse_2 = MetadataWareHouse(source_id_2) if source_id_2 else None
self.task_manage.create_task(MetadataCompareTask(version_1, version_2, source_id_1, source_id_2))
@abc.abstractmethod
def compare(self):

@ -49,15 +49,21 @@ class MetadataCompareTaskManage(object):
def __init__(self):
self.dao = MetaTaskDao(local_conn)
def create_task(self, task: MetadataCompareTask) -> bool:
def create_task(self, task: MetadataCompareTask) -> str:
"""
创建任务
:return:
"""
compare_id = task.compare_id_gen()
self._check_task_exist(compare_id)
return self.dao.create_task(compare_id, task.version_code_1, task.version_code_2, task.source_id_1,
task.source_id_2, task.compare_task_type)
rt = self.dao.create_task(compare_id, task.version_code_1, task.version_code_2, task.source_id_1,
task.source_id_2, task.compare_task_type)
if rt:
log.info('比较任务创建成功,任务ID=[{}]', compare_id)
else:
log.error('比较任务创建失败')
raise self.MetadataCompareTaskError()
return compare_id
def finish_task(self, compare_id):
"""

Loading…
Cancel
Save