feat: 版本比较器开头

master
old-tom 2 years ago
parent f73d5e82ab
commit 1f288162fb

@ -31,3 +31,23 @@ def is_not_empty(coll: list) -> bool:
:return: :return:
""" """
return coll is not None and len(coll) > 0 return coll is not None and len(coll) > 0
def list_symmetric_diff(coll_1: list, coll_2: list) -> list:
"""
对称差集
:param coll_1:
:param coll_2:
:return:
"""
return list(set(coll_1).symmetric_difference(set(coll_2)))
def list_diff(coll_1: list, coll_2: list) -> list:
"""
coll_1中有而coll_2中没有
:param coll_1:
:param coll_2:
:return:
"""
return list(set(coll_1).difference(set(coll_2)))

@ -13,9 +13,9 @@ def singleton(cls):
""" """
_instance = {} _instance = {}
def inner(): def inner(*args, **kwargs):
if cls not in _instance: if cls not in _instance:
_instance[cls] = cls() _instance[cls] = cls(*args, **kwargs)
return _instance[cls] return _instance[cls]
return inner return inner

@ -115,4 +115,4 @@ class MetadataScanner(object):
if __name__ == '__main__': if __name__ == '__main__':
scanner = MetadataScanner() scanner = MetadataScanner()
scanner.scan_metadata('834164a2d62de959c0261e6239dd1e55') scanner.scan_metadata('f98fede74826c709329a65d63db167df')

@ -0,0 +1,159 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/5/14 21:15
# @Author : old tom
# @File : metadata_comparator.py
# @Project : futool-tiny-datahub
# @Desc : 版本比较器
import abc
from datahub.metadata.metadata_warehouse import MetadataWareHouse
from datahub.metadata.constant.metadata_constant import MetaDataObjType
from common.futool.core.fu_collection import list_diff
class SourceComparator(metaclass=abc.ABCMeta):
def __init__(self, warehouse_1: MetadataWareHouse, warehouse_2: MetadataWareHouse = None):
self.warehouse_1 = warehouse_1
if warehouse_2:
self.warehouse_2 = warehouse_2
@abc.abstractmethod
def compare_table(self, version_1, version_2):
"""
比较表名
:param version_1:
:param version_2:
:return:
"""
pass
@abc.abstractmethod
def compare_view(self, version_1, version_2):
"""
比较视图名
:param version_1:
:param version_2:
:return:
"""
pass
@abc.abstractmethod
def compare_procedure(self, version_1, version_2):
"""
比较存储过程
:param version_1:
:param version_2:
:return:
"""
pass
@abc.abstractmethod
def compare_table_field(self, version_1, version_2):
"""
比较字段
:param version_1:
:param version_2:
:return:
"""
pass
@abc.abstractmethod
def compare_view_create(self, version_1, version_2):
"""
比较视图创建语句
:param version_1:
:param version_2:
:return:
"""
pass
@abc.abstractmethod
def compare_procedure_create(self, version_1, version_2):
"""
比较存储过程创建语句
:param version_1:
:param version_2:
:return:
"""
pass
def query_table(self, version_1, version_2):
"""
查询表名称
:param version_1: 版本号1
:param version_2: 版本号2
:return:
"""
table_v1 = self.warehouse_1.query_metadata_name(obj_type=MetaDataObjType.Table.value, version_code=version_1)
table_v2 = self.warehouse_2.query_metadata_name(obj_type=MetaDataObjType.Table.value,
version_code=version_2) if self.warehouse_2 \
else self.warehouse_1.query_metadata_name(obj_type=MetaDataObjType.Table.value,
version_code=version_2)
return self._compare_data_list(table_v1, table_v2)
@staticmethod
def _compare_data_list(version_1_data, version_2_data):
# 计算2次用于区分版本间差异
# version_1有而version_2没有
v1_to_v2 = list_diff(version_1_data, version_2_data)
# version_2有而version_1没有
v2_to_v1 = list_diff(version_2_data, version_1_data)
return v1_to_v2, v2_to_v1
class SameSourceComparator(SourceComparator):
"""
同库比较器
"""
def __init__(self, source_id):
super().__init__(MetadataWareHouse(source_id))
def compare_table(self, version_1, version_2):
v1_to_v2, v2_to_v1 = self.query_table(version_1, version_2)
def compare_view(self, version_1, version_2):
pass
def compare_procedure(self, version_1, version_2):
pass
def compare_table_field(self, version_1, version_2):
pass
def compare_view_create(self, version_1, version_2):
pass
def compare_procedure_create(self, version_1, version_2):
pass
class DiffSourceComparator(SourceComparator):
"""
不同库比较
"""
def __init__(self, source_id_1, source_id_2):
warehouse = MetadataWareHouse(source_id_1)
warehouse_dst = MetadataWareHouse(source_id_2)
def compare_table(self, version_1, version_2):
pass
def compare_view(self, version_1, version_2):
pass
def compare_procedure(self, version_1, version_2):
pass
def compare_table_field(self, version_1, version_2):
pass
def compare_view_create(self, version_1, version_2):
pass
def compare_procedure_create(self, version_1, version_2):
pass

@ -27,7 +27,7 @@ class MetadataVersionKeeper(object):
初始化版本号 初始化版本号
:return: :return:
""" """
flag = self.dao.init_version(self.source_id) == 1 flag = (self.dao.init_version(self.source_id) == 1)
if flag: if flag:
log.info(f"[{self.source_id}] 数据源初始化版本成功") log.info(f"[{self.source_id}] 数据源初始化版本成功")
else: else:

@ -82,3 +82,8 @@ class ScanTaskRunner(object):
for task in enable_task: for task in enable_task:
self.executor.submit(task[0], CronExpTrigger.parse_crontab(task[1]), self.scanner.scan_metadata) self.executor.submit(task[0], CronExpTrigger.parse_crontab(task[1]), self.scanner.scan_metadata)
log.info(f'datasource scan task [{task[0]}] submit success') log.info(f'datasource scan task [{task[0]}] submit success')
if __name__ == '__main__':
runner = ScanTaskRunner()
runner.run()

Loading…
Cancel
Save