From 1f288162fbb62f4bfed31635bdda63d85f08e98a Mon Sep 17 00:00:00 2001 From: old-tom <892955278@qq.com> Date: Fri, 19 May 2023 12:54:24 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E7=89=88=E6=9C=AC=E6=AF=94=E8=BE=83?= =?UTF-8?q?=E5=99=A8=E5=BC=80=E5=A4=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/futool/core/fu_collection.py | 20 +++ common/futool/core/fu_function.py | 4 +- datahub/metadata/metadat_scan.py | 2 +- .../metaversion/metadata_comparator.py | 159 ++++++++++++++++++ .../metadata/metaversion/metadata_version.py | 2 +- datahub/scheduletask/scan_task.py | 5 + 6 files changed, 188 insertions(+), 4 deletions(-) create mode 100644 datahub/metadata/metaversion/metadata_comparator.py diff --git a/common/futool/core/fu_collection.py b/common/futool/core/fu_collection.py index 9c54f56..6ec9066 100644 --- a/common/futool/core/fu_collection.py +++ b/common/futool/core/fu_collection.py @@ -31,3 +31,23 @@ def is_not_empty(coll: list) -> bool: :return: """ return coll is not None and len(coll) > 0 + + +def list_symmetric_diff(coll_1: list, coll_2: list) -> list: + """ + 对称差集 + :param coll_1: + :param coll_2: + :return: + """ + return list(set(coll_1).symmetric_difference(set(coll_2))) + + +def list_diff(coll_1: list, coll_2: list) -> list: + """ + coll_1中有而coll_2中没有 + :param coll_1: + :param coll_2: + :return: + """ + return list(set(coll_1).difference(set(coll_2))) diff --git a/common/futool/core/fu_function.py b/common/futool/core/fu_function.py index 1f5ab63..61f6df2 100644 --- a/common/futool/core/fu_function.py +++ b/common/futool/core/fu_function.py @@ -13,9 +13,9 @@ def singleton(cls): """ _instance = {} - def inner(): + def inner(*args, **kwargs): if cls not in _instance: - _instance[cls] = cls() + _instance[cls] = cls(*args, **kwargs) return _instance[cls] return inner diff --git a/datahub/metadata/metadat_scan.py b/datahub/metadata/metadat_scan.py index 8e404f4..320bfa4 100644 --- a/datahub/metadata/metadat_scan.py +++ b/datahub/metadata/metadat_scan.py @@ -115,4 +115,4 @@ class MetadataScanner(object): if __name__ == '__main__': scanner = MetadataScanner() - scanner.scan_metadata('834164a2d62de959c0261e6239dd1e55') + scanner.scan_metadata('f98fede74826c709329a65d63db167df') diff --git a/datahub/metadata/metaversion/metadata_comparator.py b/datahub/metadata/metaversion/metadata_comparator.py new file mode 100644 index 0000000..f47357e --- /dev/null +++ b/datahub/metadata/metaversion/metadata_comparator.py @@ -0,0 +1,159 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2023/5/14 21:15 +# @Author : old tom +# @File : metadata_comparator.py +# @Project : futool-tiny-datahub +# @Desc : 版本比较器 + +import abc + +from datahub.metadata.metadata_warehouse import MetadataWareHouse +from datahub.metadata.constant.metadata_constant import MetaDataObjType +from common.futool.core.fu_collection import list_diff + + +class SourceComparator(metaclass=abc.ABCMeta): + + def __init__(self, warehouse_1: MetadataWareHouse, warehouse_2: MetadataWareHouse = None): + self.warehouse_1 = warehouse_1 + if warehouse_2: + self.warehouse_2 = warehouse_2 + + @abc.abstractmethod + def compare_table(self, version_1, version_2): + """ + 比较表名 + :param version_1: + :param version_2: + :return: + """ + pass + + @abc.abstractmethod + def compare_view(self, version_1, version_2): + """ + 比较视图名 + :param version_1: + :param version_2: + :return: + """ + pass + + @abc.abstractmethod + def compare_procedure(self, version_1, version_2): + """ + 比较存储过程 + :param version_1: + :param version_2: + :return: + """ + pass + + @abc.abstractmethod + def compare_table_field(self, version_1, version_2): + """ + 比较字段 + :param version_1: + :param version_2: + :return: + """ + pass + + @abc.abstractmethod + def compare_view_create(self, version_1, version_2): + """ + 比较视图创建语句 + :param version_1: + :param version_2: + :return: + """ + pass + + @abc.abstractmethod + def compare_procedure_create(self, version_1, version_2): + """ + 比较存储过程创建语句 + :param version_1: + :param version_2: + :return: + """ + pass + + def query_table(self, version_1, version_2): + """ + 查询表名称 + :param version_1: 版本号1 + :param version_2: 版本号2 + :return: + """ + table_v1 = self.warehouse_1.query_metadata_name(obj_type=MetaDataObjType.Table.value, version_code=version_1) + table_v2 = self.warehouse_2.query_metadata_name(obj_type=MetaDataObjType.Table.value, + version_code=version_2) if self.warehouse_2 \ + else self.warehouse_1.query_metadata_name(obj_type=MetaDataObjType.Table.value, + version_code=version_2) + return self._compare_data_list(table_v1, table_v2) + + @staticmethod + def _compare_data_list(version_1_data, version_2_data): + # 计算2次用于区分版本间差异 + # version_1有而version_2没有 + v1_to_v2 = list_diff(version_1_data, version_2_data) + # version_2有而version_1没有 + v2_to_v1 = list_diff(version_2_data, version_1_data) + return v1_to_v2, v2_to_v1 + + +class SameSourceComparator(SourceComparator): + """ + 同库比较器 + """ + + def __init__(self, source_id): + super().__init__(MetadataWareHouse(source_id)) + + def compare_table(self, version_1, version_2): + v1_to_v2, v2_to_v1 = self.query_table(version_1, version_2) + + def compare_view(self, version_1, version_2): + pass + + def compare_procedure(self, version_1, version_2): + pass + + def compare_table_field(self, version_1, version_2): + pass + + def compare_view_create(self, version_1, version_2): + pass + + def compare_procedure_create(self, version_1, version_2): + pass + + +class DiffSourceComparator(SourceComparator): + """ + 不同库比较 + """ + + def __init__(self, source_id_1, source_id_2): + warehouse = MetadataWareHouse(source_id_1) + warehouse_dst = MetadataWareHouse(source_id_2) + + def compare_table(self, version_1, version_2): + pass + + def compare_view(self, version_1, version_2): + pass + + def compare_procedure(self, version_1, version_2): + pass + + def compare_table_field(self, version_1, version_2): + pass + + def compare_view_create(self, version_1, version_2): + pass + + def compare_procedure_create(self, version_1, version_2): + pass diff --git a/datahub/metadata/metaversion/metadata_version.py b/datahub/metadata/metaversion/metadata_version.py index 4b31804..b875eec 100644 --- a/datahub/metadata/metaversion/metadata_version.py +++ b/datahub/metadata/metaversion/metadata_version.py @@ -27,7 +27,7 @@ class MetadataVersionKeeper(object): 初始化版本号 :return: """ - flag = self.dao.init_version(self.source_id) == 1 + flag = (self.dao.init_version(self.source_id) == 1) if flag: log.info(f"[{self.source_id}] 数据源初始化版本成功") else: diff --git a/datahub/scheduletask/scan_task.py b/datahub/scheduletask/scan_task.py index 86b466e..caf4aed 100644 --- a/datahub/scheduletask/scan_task.py +++ b/datahub/scheduletask/scan_task.py @@ -82,3 +82,8 @@ class ScanTaskRunner(object): for task in enable_task: self.executor.submit(task[0], CronExpTrigger.parse_crontab(task[1]), self.scanner.scan_metadata) log.info(f'datasource scan task [{task[0]}] submit success') + + +if __name__ == '__main__': + runner = ScanTaskRunner() + runner.run()