From d2fe8e4e379a7caac051b53017b23e2434fc255e Mon Sep 17 00:00:00 2001 From: old-tom <892955278@msn.cn> Date: Sun, 21 May 2023 16:39:35 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E7=89=88=E6=9C=AC=E6=AF=94=E8=BE=83?= =?UTF-8?q?=E5=99=A8=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/futool/core/fu_collection.py | 24 ++ common/futool/core/fu_lang.py | 13 + datahub/local_db_conf.py | 2 +- datahub/metadata/metadata_warehouse.py | 11 +- datahub/metadata/metadatadao/metadata_dao.py | 14 + .../metaversion/metadata_comparator.py | 257 +++++++++++------- datahub/relation/relation_analyze.py | 4 +- datahub/scheduletask/scan_task.py | 3 +- 8 files changed, 221 insertions(+), 107 deletions(-) diff --git a/common/futool/core/fu_collection.py b/common/futool/core/fu_collection.py index 6ec9066..dd34df2 100644 --- a/common/futool/core/fu_collection.py +++ b/common/futool/core/fu_collection.py @@ -5,6 +5,8 @@ # @File : fu_collection.py # @Project : futool-db # @Desc : 集合类工具 +import hashlib + def split_coll(data: [], part_size=5): """ @@ -45,9 +47,31 @@ def list_symmetric_diff(coll_1: list, coll_2: list) -> list: def list_diff(coll_1: list, coll_2: list) -> list: """ + 差集 coll_1中有而coll_2中没有 :param coll_1: :param coll_2: :return: """ return list(set(coll_1).difference(set(coll_2))) + + +def list_intersection(coll_1: list, coll_2: list) -> list: + """ + 求交集 + """ + return list(set(coll_1).intersection(set(coll_2))) + + +def list_2_md5(coll: list) -> str: + """ + 合并列表内容 + :param coll: + :return: + """ + md5 = hashlib.md5() + content = '' + for e in coll: + content += str(e) + md5.update(content.encode(encoding='utf-8')) + return md5.hexdigest() diff --git a/common/futool/core/fu_lang.py b/common/futool/core/fu_lang.py index 5f235a2..70904ae 100644 --- a/common/futool/core/fu_lang.py +++ b/common/futool/core/fu_lang.py @@ -5,3 +5,16 @@ # @File : fu_lang.py # @Project : Futool # @Desc : 字符串相关 + +import hashlib + + +def str_md5(content: str): + """ + 字符串计算MD5 + :param content: + :return: + """ + md5 = hashlib.md5() + md5.update(content.encode(encoding='utf-8')) + return md5.hexdigest() diff --git a/datahub/local_db_conf.py b/datahub/local_db_conf.py index ff89f3e..f2116b9 100644 --- a/datahub/local_db_conf.py +++ b/datahub/local_db_conf.py @@ -14,4 +14,4 @@ local_db = ds_conf_param('postgresql', 'postgres', 'root@123', 'localhost', 5432 local_conn = ConnFactory(local_db) # neo4j配置 -graph = Graph("bolt://localhost:7687", auth=("neo4j", "havana-academy-emerald-herman-scale-5422")) +#graph = Graph("bolt://localhost:7687", auth=("neo4j", "havana-academy-emerald-herman-scale-5422")) \ No newline at end of file diff --git a/datahub/metadata/metadata_warehouse.py b/datahub/metadata/metadata_warehouse.py index 2e108b9..ff58a41 100644 --- a/datahub/metadata/metadata_warehouse.py +++ b/datahub/metadata/metadata_warehouse.py @@ -65,6 +65,15 @@ class MetadataWareHouse(object): """ return self.dao.query_metadata_name_by_type(obj_type, self.source_id, version_code) + def query_table_fields(self, table_name, version_code=None): + """ + 查询已入库表字段 + :param table_name: + :param version_code: + :return: + """ + return self.dao.query_metadata_table_files(self.source_id, table_name, version_code) + def query_metadata_id_name(self, obj_type=None, version_code=None): """ 查询元数据ID及名称 @@ -82,7 +91,7 @@ class MetadataWareHouse(object): """ return self.dao.query_metadata_create(MetaDataObjType.View.value, meta_id, self.source_id) - def query_view_create_by_name(self, view_name, version_code=None): + def query_create_by_name(self, view_name, version_code=None): """ 视图查询创建语句 :param version_code: 版本号 diff --git a/datahub/metadata/metadatadao/metadata_dao.py b/datahub/metadata/metadatadao/metadata_dao.py index b7e27e4..359e155 100644 --- a/datahub/metadata/metadatadao/metadata_dao.py +++ b/datahub/metadata/metadatadao/metadata_dao.py @@ -189,3 +189,17 @@ class MetadataDao(BaseDao): del_metadata_obj = f"delete from metadata_object where source_id = '{source_id}' AND version_code = {version_code}" self.execute_update_tx( [del_version_sql, del_metadata_obj_field_detail, del_metadata_obj_create_detail, del_metadata_obj]) + + def query_metadata_table_files(self, source_id, table_name, version_code): + """ + 查询表字段 + :param source_id: + :param table_name: + :param version_code: + :return: + """ + sql = f"select field_name, field_ch_name, field_type, nullable, default_value, field_length " \ + f"from metadata_object_field where meta_id in (select meta_id from metadata_object " \ + f"where meta_type='table' and meta_name='{table_name}' and source_id = '{source_id}' " \ + f"AND version_code = {version_code})" + return self.query_all(sql) diff --git a/datahub/metadata/metaversion/metadata_comparator.py b/datahub/metadata/metaversion/metadata_comparator.py index f47357e..0f37374 100644 --- a/datahub/metadata/metaversion/metadata_comparator.py +++ b/datahub/metadata/metaversion/metadata_comparator.py @@ -10,150 +10,205 @@ import abc from datahub.metadata.metadata_warehouse import MetadataWareHouse from datahub.metadata.constant.metadata_constant import MetaDataObjType -from common.futool.core.fu_collection import list_diff +from common.futool.core.fu_collection import list_diff, list_intersection, is_not_empty, list_2_md5 +from common.futool.core.fu_lang import str_md5 -class SourceComparator(metaclass=abc.ABCMeta): +class MetaDataComparator(metaclass=abc.ABCMeta): - def __init__(self, warehouse_1: MetadataWareHouse, warehouse_2: MetadataWareHouse = None): - self.warehouse_1 = warehouse_1 - if warehouse_2: - self.warehouse_2 = warehouse_2 + def __init__(self, version_1: int, version_2: int, source_id_1, source_id_2=None): + """ + :param version_1: 版本号1 + :param version_2: 版本号2 + :param source_id_1: 数据源1 + :param source_id_2: 数据源2 + """ + self.version_1 = version_1 + self.version_2 = version_2 + self.warehouse_1 = MetadataWareHouse(source_id_1) + self.warehouse_2 = MetadataWareHouse(source_id_2) if source_id_2 else None @abc.abstractmethod - def compare_table(self, version_1, version_2): + def compare(self): + pass + + def _do_compare(self): + self._compare_table() + self._compare_view() + self._compare_procedure() + + def _compare_table(self): """ - 比较表名 - :param version_1: - :param version_2: + 比较表 + 1.表名 + 2.字段名、字段类型、长度、是否可为空、默认值 :return: """ - pass + v1_to_v2, v2_to_v1, same_name_compare_result = self._compare_metadata(MetaDataObjType.Table.value) + return v1_to_v2, v2_to_v1, same_name_compare_result - @abc.abstractmethod - def compare_view(self, version_1, version_2): + def _compare_view(self): """ - 比较视图名 - :param version_1: - :param version_2: + 比较视图 :return: """ - pass + self._compare_metadata(MetaDataObjType.View.value) - @abc.abstractmethod - def compare_procedure(self, version_1, version_2): + def _compare_procedure(self): """ 比较存储过程 - :param version_1: - :param version_2: :return: """ - pass + self._compare_metadata(MetaDataObjType.Procedure.value) - @abc.abstractmethod - def compare_table_field(self, version_1, version_2): + def _compare_metadata(self, obj_type): """ - 比较字段 - :param version_1: - :param version_2: + 比较元数据名称 + :param obj_type: :return: """ - pass - - @abc.abstractmethod - def compare_view_create(self, version_1, version_2): + # 版本号对应元数据名 + obj_v1, obj_v2 = self._query_metadata_name(obj_type) + # v1与v2比,v2与v1比,得到新建及删除的元数据名称 + v1_to_v2, v2_to_v1 = list_diff(obj_v1, obj_v2), list_diff(obj_v2, obj_v1) + # 相同名称比较 + same_name_objs = list_intersection(obj_v1, obj_v2) if (is_not_empty(v1_to_v2) or is_not_empty(v2_to_v1)) \ + else v1_to_v2 + same_name_compare_result = self._compare_detail(same_name_objs, obj_type) + return v1_to_v2, v2_to_v1, same_name_compare_result + + def _query_metadata_name(self, obj_type): """ - 比较视图创建语句 - :param version_1: - :param version_2: + 查询表名称 :return: """ - pass + obj_v1_name = self.warehouse_1.query_metadata_name(obj_type=obj_type, version_code=self.version_1) + obj_v2_name = self.warehouse_2.query_metadata_name(obj_type=obj_type, + version_code=self.version_2) if self.warehouse_2 \ + else self.warehouse_1.query_metadata_name(obj_type=obj_type, version_code=self.version_2) + return obj_v1_name, obj_v2_name - @abc.abstractmethod - def compare_procedure_create(self, version_1, version_2): + def _query_table_field(self, table_name): """ - 比较存储过程创建语句 - :param version_1: - :param version_2: + 查询表字段 + :param table_name: :return: """ - pass + table_field_v1 = self.warehouse_1.query_table_fields(table_name, self.version_1) + table_field_v2 = self.warehouse_2.query_table_fields(table_name, self.version_2) if self.warehouse_2 \ + else self.warehouse_1.query_table_fields(table_name, self.version_2) + + # 包装返回为dict,key为字段名,value为 field_ch_name, field_type, nullable, default_value, field_length + def trans_field_to_kv(fields): + field_kv = {} + for f in fields: + field_name = f[0] + field_kv[field_name] = f[1:] + return field_kv - def query_table(self, version_1, version_2): + return trans_field_to_kv(table_field_v1), trans_field_to_kv(table_field_v2) + + def _query_obj_create(self, obj_name): """ - 查询表名称 - :param version_1: 版本号1 - :param version_2: 版本号2 + 查询试图或存储过程SQL + :param obj_name: :return: """ - table_v1 = self.warehouse_1.query_metadata_name(obj_type=MetaDataObjType.Table.value, version_code=version_1) - table_v2 = self.warehouse_2.query_metadata_name(obj_type=MetaDataObjType.Table.value, - version_code=version_2) if self.warehouse_2 \ - else self.warehouse_1.query_metadata_name(obj_type=MetaDataObjType.Table.value, - version_code=version_2) - return self._compare_data_list(table_v1, table_v2) + create_v1 = self.warehouse_1.query_create_by_name(obj_name, self.version_1) + create_v2 = self.warehouse_2.query_create_by_name(obj_name, self.version_2) if self.warehouse_2 \ + else self.warehouse_1.query_create_by_name(obj_name, self.version_2) + return create_v1, create_v2 - @staticmethod - def _compare_data_list(version_1_data, version_2_data): - # 计算2次用于区分版本间差异 - # version_1有而version_2没有 - v1_to_v2 = list_diff(version_1_data, version_2_data) - # version_2有而version_1没有 - v2_to_v1 = list_diff(version_2_data, version_1_data) - return v1_to_v2, v2_to_v1 - - -class SameSourceComparator(SourceComparator): + def _compare_detail(self, metadata_obj, obj_type): + """ + 比较明细 + 表:字段 + 试图、存储过程:创建语句 + :param metadata_obj: + :param obj_type: + :return: + """ + compare_result = [] + for obj_name in metadata_obj: + obj_compare_result = self._compare_table_field( + obj_name) if obj_type is MetaDataObjType.Table.value else self._compare_create_sql(obj_name) + if obj_compare_result: + compare_result.append(obj_compare_result) + return compare_result + + def _compare_create_sql(self, obj_name): + """ + 比较试图或存储过程SQL + :param obj_name: + :return: + """ + compare_result = {} + create_v1, create_v2 = self._query_obj_create(obj_name) + if str_md5(create_v1) != str_md5(create_v2): + compare_result[obj_name] = { + 'create_v1': create_v1, + 'create_v2': create_v2 + } + return compare_result + + def _compare_table_field(self, table_name): + """ + 比较字段 + :param table_name: 表名 + :return: + """ + compare_result = {} + table_field_v1, table_field_v2 = self._query_table_field(table_name) + # 得到差异字段 + table_v1_fields, table_v2_fields = list(table_field_v1.keys()), list(table_field_v2.keys()) + # 版本1相对版本2,版本2相对版本1 + k1_to_k2, k2_to_k1 = list_diff(table_v1_fields, table_v2_fields), list_diff(table_v2_fields, + table_v1_fields) + # 得到相同字段 + same_fields = list_intersection(table_v1_fields, table_v2_fields) if ( + is_not_empty(k1_to_k2) or is_not_empty(k2_to_k1)) \ + else k1_to_k2 + # 比较相同字段,内容合并转为md5进行比较。如果数据量较大可以使用minHash算法,现在没必要 + same_field_compare_result = [] + for field_name in same_fields: + # 取出相同字段比较 + f_v1, f_v2 = table_field_v1[field_name], table_field_v2[field_name] + if list_2_md5(f_v1) != list_2_md5(f_v2): + # 记录相同字段,不同属性 + same_field_compare_result.append(field_name) + if is_not_empty(k1_to_k2) or is_not_empty(k2_to_k1) or is_not_empty(same_field_compare_result): + compare_result[table_name] = { + 'v1_to_v2': k1_to_k2, + 'v2_to_v1': k2_to_k1, + 'same_compare_result': same_field_compare_result + } + return compare_result + + +class SameSourceComparator(MetaDataComparator): """ - 同库比较器 + 同库比较 """ - def __init__(self, source_id): - super().__init__(MetadataWareHouse(source_id)) - - def compare_table(self, version_1, version_2): - v1_to_v2, v2_to_v1 = self.query_table(version_1, version_2) - - def compare_view(self, version_1, version_2): - pass - - def compare_procedure(self, version_1, version_2): - pass - - def compare_table_field(self, version_1, version_2): - pass + def __init__(self, source_id, version_1, version_2): + super().__init__(version_1, version_2, source_id) - def compare_view_create(self, version_1, version_2): - pass + def compare(self): + return super()._do_compare() - def compare_procedure_create(self, version_1, version_2): - pass - -class DiffSourceComparator(SourceComparator): +class DiffSourceComparator(MetaDataComparator): """ 不同库比较 """ - def __init__(self, source_id_1, source_id_2): - warehouse = MetadataWareHouse(source_id_1) - warehouse_dst = MetadataWareHouse(source_id_2) - - def compare_table(self, version_1, version_2): - pass - - def compare_view(self, version_1, version_2): - pass + def __init__(self, source_id_1, version_1, source_id_2, version_2): + super().__init__(version_1, version_2, source_id_1, source_id_2) - def compare_procedure(self, version_1, version_2): - pass + def compare(self): + return super()._do_compare() - def compare_table_field(self, version_1, version_2): - pass - def compare_view_create(self, version_1, version_2): - pass - - def compare_procedure_create(self, version_1, version_2): - pass +if __name__ == '__main__': + ssc = DiffSourceComparator('834164a2d62de959c0261e6239dd1e55', 24, 'f98fede74826c709329a65d63db167df', 1) + ssc.compare() diff --git a/datahub/relation/relation_analyze.py b/datahub/relation/relation_analyze.py index df686e2..8ede615 100644 --- a/datahub/relation/relation_analyze.py +++ b/datahub/relation/relation_analyze.py @@ -132,7 +132,7 @@ class MetadataRelationAnalyzer(object): :param view_name: 视图名称 :return: """ - create_sql = self.warehouse.query_view_create_by_name(view_name) + create_sql = self.warehouse.query_create_by_name(view_name) # source_table 可能包含表或视图 source_tables = analyze_view_sql(create_sql) if is_not_empty(source_tables): @@ -177,7 +177,7 @@ class MetadataRelationAnalyzer(object): try: log.info(f'开始分析存储过程{procedure[1]}') start_time = time.time() - create_sql = self.warehouse.query_view_create_by_name(procedure[1]) + create_sql = self.warehouse.query_create_by_name(procedure[1]) analyze_result = analyze_procedure_sql(create_sql) if is_not_empty(analyze_result): # 构建节点与关系 diff --git a/datahub/scheduletask/scan_task.py b/datahub/scheduletask/scan_task.py index caf4aed..6c13083 100644 --- a/datahub/scheduletask/scan_task.py +++ b/datahub/scheduletask/scan_task.py @@ -85,5 +85,4 @@ class ScanTaskRunner(object): if __name__ == '__main__': - runner = ScanTaskRunner() - runner.run() + runner=ScanTaskRunner()