#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2023/5/14 21:15 # @Author : old tom # @File : metadata_comparator.py # @Project : futool-tiny-datahub # @Desc : 版本比较器 import abc from datahub.metadata.metadata_warehouse import MetadataWareHouse from datahub.metadata.constant.metadata_constant import MetaDataObjType from common.futool.core.fu_collection import list_diff, list_intersection, is_not_empty, list_2_md5 from common.futool.core.fu_lang import str_md5 class MetadataComparator(metaclass=abc.ABCMeta): def __init__(self, version_1: int, version_2: int, source_id_1, source_id_2=None): """ :param version_1: 版本号1 :param version_2: 版本号2 :param source_id_1: 数据源1 :param source_id_2: 数据源2 """ self.version_1 = version_1 self.version_2 = version_2 self.warehouse_1 = MetadataWareHouse(source_id_1) self.warehouse_2 = MetadataWareHouse(source_id_2) if source_id_2 else None @abc.abstractmethod def compare(self): pass def _do_compare(self): self._compare_table() self._compare_view() self._compare_procedure() def _compare_table(self): """ 比较表 1.表名 2.字段名、字段类型、长度、是否可为空、默认值 :return: """ v1_to_v2, v2_to_v1, same_name_compare_result = self._compare_metadata(MetaDataObjType.Table.value) return v1_to_v2, v2_to_v1, same_name_compare_result def _compare_view(self): """ 比较视图 :return: """ self._compare_metadata(MetaDataObjType.View.value) def _compare_procedure(self): """ 比较存储过程 :return: """ self._compare_metadata(MetaDataObjType.Procedure.value) def _compare_metadata(self, obj_type): """ 比较元数据名称 :param obj_type: :return: """ # 版本号对应元数据名 obj_v1, obj_v2 = self._query_metadata_name(obj_type) # v1与v2比,v2与v1比,得到新建及删除的元数据名称 v1_to_v2, v2_to_v1 = list_diff(obj_v1, obj_v2), list_diff(obj_v2, obj_v1) # 相同名称比较 same_name_objs = list_intersection(obj_v1, obj_v2) if (is_not_empty(v1_to_v2) or is_not_empty(v2_to_v1)) \ else v1_to_v2 same_name_compare_result = self._compare_detail(same_name_objs, obj_type) return v1_to_v2, v2_to_v1, same_name_compare_result def _query_metadata_name(self, obj_type): """ 查询表名称 :return: """ obj_v1_name = self.warehouse_1.query_metadata_name(obj_type=obj_type, version_code=self.version_1) obj_v2_name = self.warehouse_2.query_metadata_name(obj_type=obj_type, version_code=self.version_2) if self.warehouse_2 \ else self.warehouse_1.query_metadata_name(obj_type=obj_type, version_code=self.version_2) return obj_v1_name, obj_v2_name def _query_table_field(self, table_name): """ 查询表字段 :param table_name: :return: """ table_field_v1 = self.warehouse_1.query_table_fields(table_name, self.version_1) table_field_v2 = self.warehouse_2.query_table_fields(table_name, self.version_2) if self.warehouse_2 \ else self.warehouse_1.query_table_fields(table_name, self.version_2) # 包装返回为dict,key为字段名,value为 field_ch_name, field_type, nullable, default_value, field_length def trans_field_to_kv(fields): field_kv = {} for f in fields: field_name = f[0] field_kv[field_name] = f[1:] return field_kv return trans_field_to_kv(table_field_v1), trans_field_to_kv(table_field_v2) def _query_obj_create(self, obj_name): """ 查询试图或存储过程SQL :param obj_name: :return: """ create_v1 = self.warehouse_1.query_create_by_name(obj_name, self.version_1) create_v2 = self.warehouse_2.query_create_by_name(obj_name, self.version_2) if self.warehouse_2 \ else self.warehouse_1.query_create_by_name(obj_name, self.version_2) return create_v1, create_v2 def _compare_detail(self, metadata_obj, obj_type): """ 比较明细 表:字段 试图、存储过程:创建语句 :param metadata_obj: :param obj_type: :return: """ compare_result = [] for obj_name in metadata_obj: obj_compare_result = self._compare_table_field( obj_name) if obj_type is MetaDataObjType.Table.value else self._compare_create_sql(obj_name) if obj_compare_result: compare_result.append(obj_compare_result) return compare_result def _compare_create_sql(self, obj_name): """ 比较试图或存储过程SQL :param obj_name: :return: """ compare_result = {} create_v1, create_v2 = self._query_obj_create(obj_name) if str_md5(create_v1) != str_md5(create_v2): compare_result[obj_name] = { 'create_v1': create_v1, 'create_v2': create_v2 } return compare_result def _compare_table_field(self, table_name): """ 比较字段 :param table_name: 表名 :return: """ compare_result = {} table_field_v1, table_field_v2 = self._query_table_field(table_name) # 得到差异字段 table_v1_fields, table_v2_fields = list(table_field_v1.keys()), list(table_field_v2.keys()) # 版本1相对版本2,版本2相对版本1 k1_to_k2, k2_to_k1 = list_diff(table_v1_fields, table_v2_fields), list_diff(table_v2_fields, table_v1_fields) # 得到相同字段 same_fields = list_intersection(table_v1_fields, table_v2_fields) if ( is_not_empty(k1_to_k2) or is_not_empty(k2_to_k1)) \ else k1_to_k2 # 比较相同字段,内容合并转为md5进行比较。如果数据量较大可以使用minHash算法,现在没必要 same_field_compare_result = [] for field_name in same_fields: # 取出相同字段比较 f_v1, f_v2 = table_field_v1[field_name], table_field_v2[field_name] if list_2_md5(f_v1) != list_2_md5(f_v2): # 记录相同字段,不同属性 same_field_compare_result.append(field_name) if is_not_empty(k1_to_k2) or is_not_empty(k2_to_k1) or is_not_empty(same_field_compare_result): compare_result[table_name] = { 'v1_to_v2': k1_to_k2, 'v2_to_v1': k2_to_k1, 'same_compare_result': same_field_compare_result } return compare_result class SameSourceComparator(MetadataComparator): """ 同库比较 """ def __init__(self, source_id, version_1, version_2): super().__init__(version_1, version_2, source_id) def compare(self): return super()._do_compare() class DiffSourceComparator(MetadataComparator): """ 不同库比较 """ def __init__(self, source_id_1, version_1, source_id_2, version_2): super().__init__(version_1, version_2, source_id_1, source_id_2) def compare(self): return super()._do_compare() if __name__ == '__main__': ssc = DiffSourceComparator('834164a2d62de959c0261e6239dd1e55', 24, 'f98fede74826c709329a65d63db167df', 1) ssc.compare()