|
|
#!/usr/bin/env python
|
|
|
# -*- coding: utf-8 -*-
|
|
|
# @Time : 2023/5/14 21:15
|
|
|
# @Author : old tom
|
|
|
# @File : metadata_comparator.py
|
|
|
# @Project : futool-tiny-datahub
|
|
|
# @Desc : 版本比较器
|
|
|
|
|
|
import abc
|
|
|
|
|
|
from datahub.metadata.metadata_warehouse import MetadataWareHouse
|
|
|
from datahub.metadata.constant.metadata_constant import MetaDataObjType
|
|
|
from common.futool.core.fu_collection import list_diff, list_intersection, is_not_empty, list_2_md5
|
|
|
from common.futool.core.fu_lang import str_md5
|
|
|
|
|
|
|
|
|
class MetadataComparator(metaclass=abc.ABCMeta):
|
|
|
|
|
|
def __init__(self, version_1: int, version_2: int, source_id_1, source_id_2=None):
|
|
|
"""
|
|
|
:param version_1: 版本号1
|
|
|
:param version_2: 版本号2
|
|
|
:param source_id_1: 数据源1
|
|
|
:param source_id_2: 数据源2
|
|
|
"""
|
|
|
self.version_1 = version_1
|
|
|
self.version_2 = version_2
|
|
|
self.warehouse_1 = MetadataWareHouse(source_id_1)
|
|
|
self.warehouse_2 = MetadataWareHouse(source_id_2) if source_id_2 else None
|
|
|
|
|
|
@abc.abstractmethod
|
|
|
def compare(self):
|
|
|
pass
|
|
|
|
|
|
def _do_compare(self):
|
|
|
self._compare_table()
|
|
|
self._compare_view()
|
|
|
self._compare_procedure()
|
|
|
|
|
|
def _compare_table(self):
|
|
|
"""
|
|
|
比较表
|
|
|
1.表名
|
|
|
2.字段名、字段类型、长度、是否可为空、默认值
|
|
|
:return:
|
|
|
"""
|
|
|
v1_to_v2, v2_to_v1, same_name_compare_result = self._compare_metadata(MetaDataObjType.Table.value)
|
|
|
return v1_to_v2, v2_to_v1, same_name_compare_result
|
|
|
|
|
|
def _compare_view(self):
|
|
|
"""
|
|
|
比较视图
|
|
|
:return:
|
|
|
"""
|
|
|
self._compare_metadata(MetaDataObjType.View.value)
|
|
|
|
|
|
def _compare_procedure(self):
|
|
|
"""
|
|
|
比较存储过程
|
|
|
:return:
|
|
|
"""
|
|
|
self._compare_metadata(MetaDataObjType.Procedure.value)
|
|
|
|
|
|
def _compare_metadata(self, obj_type):
|
|
|
"""
|
|
|
比较元数据名称
|
|
|
:param obj_type:
|
|
|
:return:
|
|
|
"""
|
|
|
# 版本号对应元数据名
|
|
|
obj_v1, obj_v2 = self._query_metadata_name(obj_type)
|
|
|
# v1与v2比,v2与v1比,得到新建及删除的元数据名称
|
|
|
v1_to_v2, v2_to_v1 = list_diff(obj_v1, obj_v2), list_diff(obj_v2, obj_v1)
|
|
|
# 相同名称比较
|
|
|
same_name_objs = list_intersection(obj_v1, obj_v2) if (is_not_empty(v1_to_v2) or is_not_empty(v2_to_v1)) \
|
|
|
else v1_to_v2
|
|
|
same_name_compare_result = self._compare_detail(same_name_objs, obj_type)
|
|
|
return v1_to_v2, v2_to_v1, same_name_compare_result
|
|
|
|
|
|
def _query_metadata_name(self, obj_type):
|
|
|
"""
|
|
|
查询表名称
|
|
|
:return:
|
|
|
"""
|
|
|
obj_v1_name = self.warehouse_1.query_metadata_name(obj_type=obj_type, version_code=self.version_1)
|
|
|
obj_v2_name = self.warehouse_2.query_metadata_name(obj_type=obj_type,
|
|
|
version_code=self.version_2) if self.warehouse_2 \
|
|
|
else self.warehouse_1.query_metadata_name(obj_type=obj_type, version_code=self.version_2)
|
|
|
return obj_v1_name, obj_v2_name
|
|
|
|
|
|
def _query_table_field(self, table_name):
|
|
|
"""
|
|
|
查询表字段
|
|
|
:param table_name:
|
|
|
:return:
|
|
|
"""
|
|
|
table_field_v1 = self.warehouse_1.query_table_fields(table_name, self.version_1)
|
|
|
table_field_v2 = self.warehouse_2.query_table_fields(table_name, self.version_2) if self.warehouse_2 \
|
|
|
else self.warehouse_1.query_table_fields(table_name, self.version_2)
|
|
|
|
|
|
# 包装返回为dict,key为字段名,value为 field_ch_name, field_type, nullable, default_value, field_length
|
|
|
def trans_field_to_kv(fields):
|
|
|
field_kv = {}
|
|
|
for f in fields:
|
|
|
field_name = f[0]
|
|
|
field_kv[field_name] = f[1:]
|
|
|
return field_kv
|
|
|
|
|
|
return trans_field_to_kv(table_field_v1), trans_field_to_kv(table_field_v2)
|
|
|
|
|
|
def _query_obj_create(self, obj_name):
|
|
|
"""
|
|
|
查询试图或存储过程SQL
|
|
|
:param obj_name:
|
|
|
:return:
|
|
|
"""
|
|
|
create_v1 = self.warehouse_1.query_create_by_name(obj_name, self.version_1)
|
|
|
create_v2 = self.warehouse_2.query_create_by_name(obj_name, self.version_2) if self.warehouse_2 \
|
|
|
else self.warehouse_1.query_create_by_name(obj_name, self.version_2)
|
|
|
return create_v1, create_v2
|
|
|
|
|
|
def _compare_detail(self, metadata_obj, obj_type):
|
|
|
"""
|
|
|
比较明细
|
|
|
表:字段
|
|
|
试图、存储过程:创建语句
|
|
|
:param metadata_obj:
|
|
|
:param obj_type:
|
|
|
:return:
|
|
|
"""
|
|
|
compare_result = []
|
|
|
for obj_name in metadata_obj:
|
|
|
obj_compare_result = self._compare_table_field(
|
|
|
obj_name) if obj_type is MetaDataObjType.Table.value else self._compare_create_sql(obj_name)
|
|
|
if obj_compare_result:
|
|
|
compare_result.append(obj_compare_result)
|
|
|
return compare_result
|
|
|
|
|
|
def _compare_create_sql(self, obj_name):
|
|
|
"""
|
|
|
比较试图或存储过程SQL
|
|
|
:param obj_name:
|
|
|
:return:
|
|
|
"""
|
|
|
compare_result = {}
|
|
|
create_v1, create_v2 = self._query_obj_create(obj_name)
|
|
|
if str_md5(create_v1) != str_md5(create_v2):
|
|
|
compare_result[obj_name] = {
|
|
|
'create_v1': create_v1,
|
|
|
'create_v2': create_v2
|
|
|
}
|
|
|
return compare_result
|
|
|
|
|
|
def _compare_table_field(self, table_name):
|
|
|
"""
|
|
|
比较字段
|
|
|
:param table_name: 表名
|
|
|
:return:
|
|
|
"""
|
|
|
compare_result = {}
|
|
|
table_field_v1, table_field_v2 = self._query_table_field(table_name)
|
|
|
# 得到差异字段
|
|
|
table_v1_fields, table_v2_fields = list(table_field_v1.keys()), list(table_field_v2.keys())
|
|
|
# 版本1相对版本2,版本2相对版本1
|
|
|
k1_to_k2, k2_to_k1 = list_diff(table_v1_fields, table_v2_fields), list_diff(table_v2_fields,
|
|
|
table_v1_fields)
|
|
|
# 得到相同字段
|
|
|
same_fields = list_intersection(table_v1_fields, table_v2_fields) if (
|
|
|
is_not_empty(k1_to_k2) or is_not_empty(k2_to_k1)) \
|
|
|
else k1_to_k2
|
|
|
# 比较相同字段,内容合并转为md5进行比较。如果数据量较大可以使用minHash算法,现在没必要
|
|
|
same_field_compare_result = []
|
|
|
for field_name in same_fields:
|
|
|
# 取出相同字段比较
|
|
|
f_v1, f_v2 = table_field_v1[field_name], table_field_v2[field_name]
|
|
|
if list_2_md5(f_v1) != list_2_md5(f_v2):
|
|
|
# 记录相同字段,不同属性
|
|
|
same_field_compare_result.append(field_name)
|
|
|
if is_not_empty(k1_to_k2) or is_not_empty(k2_to_k1) or is_not_empty(same_field_compare_result):
|
|
|
compare_result[table_name] = {
|
|
|
'v1_to_v2': k1_to_k2,
|
|
|
'v2_to_v1': k2_to_k1,
|
|
|
'same_compare_result': same_field_compare_result
|
|
|
}
|
|
|
return compare_result
|
|
|
|
|
|
|
|
|
class SameSourceComparator(MetadataComparator):
|
|
|
"""
|
|
|
同库比较
|
|
|
"""
|
|
|
|
|
|
def __init__(self, source_id, version_1, version_2):
|
|
|
super().__init__(version_1, version_2, source_id)
|
|
|
|
|
|
def compare(self):
|
|
|
return super()._do_compare()
|
|
|
|
|
|
|
|
|
class DiffSourceComparator(MetadataComparator):
|
|
|
"""
|
|
|
不同库比较
|
|
|
"""
|
|
|
|
|
|
def __init__(self, source_id_1, version_1, source_id_2, version_2):
|
|
|
super().__init__(version_1, version_2, source_id_1, source_id_2)
|
|
|
|
|
|
def compare(self):
|
|
|
return super()._do_compare()
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
ssc = DiffSourceComparator('834164a2d62de959c0261e6239dd1e55', 24, 'f98fede74826c709329a65d63db167df', 1)
|
|
|
ssc.compare()
|