feat: 版本比较器实现

dev
old-tom 2 years ago
parent ce7b4c5380
commit d2fe8e4e37

@ -5,6 +5,8 @@
# @File : fu_collection.py
# @Project : futool-db
# @Desc : 集合类工具
import hashlib
def split_coll(data: [], part_size=5):
"""
@ -45,9 +47,31 @@ def list_symmetric_diff(coll_1: list, coll_2: list) -> list:
def list_diff(coll_1: list, coll_2: list) -> list:
"""
差集
coll_1中有而coll_2中没有
:param coll_1:
:param coll_2:
:return:
"""
return list(set(coll_1).difference(set(coll_2)))
def list_intersection(coll_1: list, coll_2: list) -> list:
"""
求交集
"""
return list(set(coll_1).intersection(set(coll_2)))
def list_2_md5(coll: list) -> str:
"""
合并列表内容
:param coll:
:return:
"""
md5 = hashlib.md5()
content = ''
for e in coll:
content += str(e)
md5.update(content.encode(encoding='utf-8'))
return md5.hexdigest()

@ -5,3 +5,16 @@
# @File : fu_lang.py
# @Project : Futool
# @Desc : 字符串相关
import hashlib
def str_md5(content: str):
"""
字符串计算MD5
:param content:
:return:
"""
md5 = hashlib.md5()
md5.update(content.encode(encoding='utf-8'))
return md5.hexdigest()

@ -14,4 +14,4 @@ local_db = ds_conf_param('postgresql', 'postgres', 'root@123', 'localhost', 5432
local_conn = ConnFactory(local_db)
# neo4j配置
graph = Graph("bolt://localhost:7687", auth=("neo4j", "havana-academy-emerald-herman-scale-5422"))
#graph = Graph("bolt://localhost:7687", auth=("neo4j", "havana-academy-emerald-herman-scale-5422"))

@ -65,6 +65,15 @@ class MetadataWareHouse(object):
"""
return self.dao.query_metadata_name_by_type(obj_type, self.source_id, version_code)
def query_table_fields(self, table_name, version_code=None):
"""
查询已入库表字段
:param table_name:
:param version_code:
:return:
"""
return self.dao.query_metadata_table_files(self.source_id, table_name, version_code)
def query_metadata_id_name(self, obj_type=None, version_code=None):
"""
查询元数据ID及名称
@ -82,7 +91,7 @@ class MetadataWareHouse(object):
"""
return self.dao.query_metadata_create(MetaDataObjType.View.value, meta_id, self.source_id)
def query_view_create_by_name(self, view_name, version_code=None):
def query_create_by_name(self, view_name, version_code=None):
"""
视图查询创建语句
:param version_code: 版本号

@ -189,3 +189,17 @@ class MetadataDao(BaseDao):
del_metadata_obj = f"delete from metadata_object where source_id = '{source_id}' AND version_code = {version_code}"
self.execute_update_tx(
[del_version_sql, del_metadata_obj_field_detail, del_metadata_obj_create_detail, del_metadata_obj])
def query_metadata_table_files(self, source_id, table_name, version_code):
"""
查询表字段
:param source_id:
:param table_name:
:param version_code:
:return:
"""
sql = f"select field_name, field_ch_name, field_type, nullable, default_value, field_length " \
f"from metadata_object_field where meta_id in (select meta_id from metadata_object " \
f"where meta_type='table' and meta_name='{table_name}' and source_id = '{source_id}' " \
f"AND version_code = {version_code})"
return self.query_all(sql)

@ -10,150 +10,205 @@ import abc
from datahub.metadata.metadata_warehouse import MetadataWareHouse
from datahub.metadata.constant.metadata_constant import MetaDataObjType
from common.futool.core.fu_collection import list_diff
from common.futool.core.fu_collection import list_diff, list_intersection, is_not_empty, list_2_md5
from common.futool.core.fu_lang import str_md5
class SourceComparator(metaclass=abc.ABCMeta):
class MetaDataComparator(metaclass=abc.ABCMeta):
def __init__(self, warehouse_1: MetadataWareHouse, warehouse_2: MetadataWareHouse = None):
self.warehouse_1 = warehouse_1
if warehouse_2:
self.warehouse_2 = warehouse_2
@abc.abstractmethod
def compare_table(self, version_1, version_2):
def __init__(self, version_1: int, version_2: int, source_id_1, source_id_2=None):
"""
比较表名
:param version_1:
:param version_2:
:return:
:param version_1: 版本号1
:param version_2: 版本号2
:param source_id_1: 数据源1
:param source_id_2: 数据源2
"""
pass
self.version_1 = version_1
self.version_2 = version_2
self.warehouse_1 = MetadataWareHouse(source_id_1)
self.warehouse_2 = MetadataWareHouse(source_id_2) if source_id_2 else None
@abc.abstractmethod
def compare_view(self, version_1, version_2):
def compare(self):
pass
def _do_compare(self):
self._compare_table()
self._compare_view()
self._compare_procedure()
def _compare_table(self):
"""
比较视图名
:param version_1:
:param version_2:
比较
1.表名
2.字段名字段类型长度是否可为空默认值
:return:
"""
pass
v1_to_v2, v2_to_v1, same_name_compare_result = self._compare_metadata(MetaDataObjType.Table.value)
return v1_to_v2, v2_to_v1, same_name_compare_result
@abc.abstractmethod
def compare_procedure(self, version_1, version_2):
def _compare_view(self):
"""
比较存储过程
:param version_1:
:param version_2:
比较视图
:return:
"""
pass
self._compare_metadata(MetaDataObjType.View.value)
@abc.abstractmethod
def compare_table_field(self, version_1, version_2):
def _compare_procedure(self):
"""
比较字段
:param version_1:
:param version_2:
比较存储过程
:return:
"""
pass
self._compare_metadata(MetaDataObjType.Procedure.value)
@abc.abstractmethod
def compare_view_create(self, version_1, version_2):
def _compare_metadata(self, obj_type):
"""
比较视图创建语句
:param version_1:
:param version_2:
比较元数据名称
:param obj_type:
:return:
"""
pass
# 版本号对应元数据名
obj_v1, obj_v2 = self._query_metadata_name(obj_type)
# v1与v2比,v2与v1比,得到新建及删除的元数据名称
v1_to_v2, v2_to_v1 = list_diff(obj_v1, obj_v2), list_diff(obj_v2, obj_v1)
# 相同名称比较
same_name_objs = list_intersection(obj_v1, obj_v2) if (is_not_empty(v1_to_v2) or is_not_empty(v2_to_v1)) \
else v1_to_v2
same_name_compare_result = self._compare_detail(same_name_objs, obj_type)
return v1_to_v2, v2_to_v1, same_name_compare_result
@abc.abstractmethod
def compare_procedure_create(self, version_1, version_2):
def _query_metadata_name(self, obj_type):
"""
比较存储过程创建语句
:param version_1:
:param version_2:
查询表名称
:return:
"""
pass
obj_v1_name = self.warehouse_1.query_metadata_name(obj_type=obj_type, version_code=self.version_1)
obj_v2_name = self.warehouse_2.query_metadata_name(obj_type=obj_type,
version_code=self.version_2) if self.warehouse_2 \
else self.warehouse_1.query_metadata_name(obj_type=obj_type, version_code=self.version_2)
return obj_v1_name, obj_v2_name
def query_table(self, version_1, version_2):
def _query_table_field(self, table_name):
"""
查询表名称
:param version_1: 版本号1
:param version_2: 版本号2
查询表字段
:param table_name:
:return:
"""
table_v1 = self.warehouse_1.query_metadata_name(obj_type=MetaDataObjType.Table.value, version_code=version_1)
table_v2 = self.warehouse_2.query_metadata_name(obj_type=MetaDataObjType.Table.value,
version_code=version_2) if self.warehouse_2 \
else self.warehouse_1.query_metadata_name(obj_type=MetaDataObjType.Table.value,
version_code=version_2)
return self._compare_data_list(table_v1, table_v2)
table_field_v1 = self.warehouse_1.query_table_fields(table_name, self.version_1)
table_field_v2 = self.warehouse_2.query_table_fields(table_name, self.version_2) if self.warehouse_2 \
else self.warehouse_1.query_table_fields(table_name, self.version_2)
@staticmethod
def _compare_data_list(version_1_data, version_2_data):
# 计算2次用于区分版本间差异
# version_1有而version_2没有
v1_to_v2 = list_diff(version_1_data, version_2_data)
# version_2有而version_1没有
v2_to_v1 = list_diff(version_2_data, version_1_data)
return v1_to_v2, v2_to_v1
# 包装返回为dict,key为字段名,value为 field_ch_name, field_type, nullable, default_value, field_length
def trans_field_to_kv(fields):
field_kv = {}
for f in fields:
field_name = f[0]
field_kv[field_name] = f[1:]
return field_kv
return trans_field_to_kv(table_field_v1), trans_field_to_kv(table_field_v2)
class SameSourceComparator(SourceComparator):
def _query_obj_create(self, obj_name):
"""
同库比较器
查询试图或存储过程SQL
:param obj_name:
:return:
"""
create_v1 = self.warehouse_1.query_create_by_name(obj_name, self.version_1)
create_v2 = self.warehouse_2.query_create_by_name(obj_name, self.version_2) if self.warehouse_2 \
else self.warehouse_1.query_create_by_name(obj_name, self.version_2)
return create_v1, create_v2
def __init__(self, source_id):
super().__init__(MetadataWareHouse(source_id))
def _compare_detail(self, metadata_obj, obj_type):
"""
比较明细
字段
试图存储过程创建语句
:param metadata_obj:
:param obj_type:
:return:
"""
compare_result = []
for obj_name in metadata_obj:
obj_compare_result = self._compare_table_field(
obj_name) if obj_type is MetaDataObjType.Table.value else self._compare_create_sql(obj_name)
if obj_compare_result:
compare_result.append(obj_compare_result)
return compare_result
def compare_table(self, version_1, version_2):
v1_to_v2, v2_to_v1 = self.query_table(version_1, version_2)
def _compare_create_sql(self, obj_name):
"""
比较试图或存储过程SQL
:param obj_name:
:return:
"""
compare_result = {}
create_v1, create_v2 = self._query_obj_create(obj_name)
if str_md5(create_v1) != str_md5(create_v2):
compare_result[obj_name] = {
'create_v1': create_v1,
'create_v2': create_v2
}
return compare_result
def compare_view(self, version_1, version_2):
pass
def _compare_table_field(self, table_name):
"""
比较字段
:param table_name: 表名
:return:
"""
compare_result = {}
table_field_v1, table_field_v2 = self._query_table_field(table_name)
# 得到差异字段
table_v1_fields, table_v2_fields = list(table_field_v1.keys()), list(table_field_v2.keys())
# 版本1相对版本2,版本2相对版本1
k1_to_k2, k2_to_k1 = list_diff(table_v1_fields, table_v2_fields), list_diff(table_v2_fields,
table_v1_fields)
# 得到相同字段
same_fields = list_intersection(table_v1_fields, table_v2_fields) if (
is_not_empty(k1_to_k2) or is_not_empty(k2_to_k1)) \
else k1_to_k2
# 比较相同字段,内容合并转为md5进行比较。如果数据量较大可以使用minHash算法现在没必要
same_field_compare_result = []
for field_name in same_fields:
# 取出相同字段比较
f_v1, f_v2 = table_field_v1[field_name], table_field_v2[field_name]
if list_2_md5(f_v1) != list_2_md5(f_v2):
# 记录相同字段,不同属性
same_field_compare_result.append(field_name)
if is_not_empty(k1_to_k2) or is_not_empty(k2_to_k1) or is_not_empty(same_field_compare_result):
compare_result[table_name] = {
'v1_to_v2': k1_to_k2,
'v2_to_v1': k2_to_k1,
'same_compare_result': same_field_compare_result
}
return compare_result
def compare_procedure(self, version_1, version_2):
pass
def compare_table_field(self, version_1, version_2):
pass
class SameSourceComparator(MetaDataComparator):
"""
同库比较
"""
def compare_view_create(self, version_1, version_2):
pass
def __init__(self, source_id, version_1, version_2):
super().__init__(version_1, version_2, source_id)
def compare_procedure_create(self, version_1, version_2):
pass
def compare(self):
return super()._do_compare()
class DiffSourceComparator(SourceComparator):
class DiffSourceComparator(MetaDataComparator):
"""
不同库比较
"""
def __init__(self, source_id_1, source_id_2):
warehouse = MetadataWareHouse(source_id_1)
warehouse_dst = MetadataWareHouse(source_id_2)
def __init__(self, source_id_1, version_1, source_id_2, version_2):
super().__init__(version_1, version_2, source_id_1, source_id_2)
def compare_table(self, version_1, version_2):
pass
def compare(self):
return super()._do_compare()
def compare_view(self, version_1, version_2):
pass
def compare_procedure(self, version_1, version_2):
pass
def compare_table_field(self, version_1, version_2):
pass
def compare_view_create(self, version_1, version_2):
pass
def compare_procedure_create(self, version_1, version_2):
pass
if __name__ == '__main__':
ssc = DiffSourceComparator('834164a2d62de959c0261e6239dd1e55', 24, 'f98fede74826c709329a65d63db167df', 1)
ssc.compare()

@ -132,7 +132,7 @@ class MetadataRelationAnalyzer(object):
:param view_name: 视图名称
:return:
"""
create_sql = self.warehouse.query_view_create_by_name(view_name)
create_sql = self.warehouse.query_create_by_name(view_name)
# source_table 可能包含表或视图
source_tables = analyze_view_sql(create_sql)
if is_not_empty(source_tables):
@ -177,7 +177,7 @@ class MetadataRelationAnalyzer(object):
try:
log.info(f'开始分析存储过程{procedure[1]}')
start_time = time.time()
create_sql = self.warehouse.query_view_create_by_name(procedure[1])
create_sql = self.warehouse.query_create_by_name(procedure[1])
analyze_result = analyze_procedure_sql(create_sql)
if is_not_empty(analyze_result):
# 构建节点与关系

@ -85,5 +85,4 @@ class ScanTaskRunner(object):
if __name__ == '__main__':
runner = ScanTaskRunner()
runner.run()
runner=ScanTaskRunner()

Loading…
Cancel
Save