feat: 新增元数据比较任务管理器

dev
old-tom 2 years ago
parent f84e6b79f2
commit 7d9f888930

@ -6,54 +6,92 @@
# @Project : futool-tiny-datahub
# @Desc : 版本比较任务管理
from common.futool.core.fu_lang import str_md5
from datahub.local_db_conf import local_conn
from datahub.log_conf import log
from dataclasses import dataclass
from datahub.metadata.metaversion.versiondao.metadata_task_dao import MetaTaskDao
@dataclass
class MetadataCompareTask(object):
# 版本号1
version_code_1: int
# 版本号2
version_code_2: int
# 版本号1对应的数据源ID
source_id_1: str
# 版本号2对应的数据源ID
source_id_2: str = None
# 比较任务类型
compare_task_type: str = 'same' if source_id_2 is None else 'diff'
def compare_id_gen(self) -> str:
"""
生成任务ID
:return:
"""
return str_md5(''.join([str(getattr(self, x)) for x in self.__dict__.keys()]))
class MetadataCompareTaskManage(object):
class MetadataCompareTaskError(Exception):
"""
自定义异常
"""
def __init__(self, msg=''):
Exception.__init__(self, msg)
"""
版本比较任务管理
"""
def __init__(self, version_code_1, version_code_2, source_id_1, source_id_2=None):
def __init__(self):
self.dao = MetaTaskDao(local_conn)
def create_task(self, task: MetadataCompareTask) -> bool:
"""
初始化任务
:param version_code_1: 版本号1
:param version_code_2: 版本号2
:param source_id_1: 数据源1
:param source_id_2: 数据源2
创建任务
:return:
"""
self.version_code_1 = version_code_1
self.version_code_2 = version_code_2
self.source_id_1 = source_id_1
self.compare_task_type = 'same'
if source_id_2:
self.source_id_2 = source_id_2
self.compare_task_type = 'diff'
compare_id = task.compare_id_gen()
self._check_task_exist(compare_id)
return self.dao.create_task(compare_id, task.version_code_1, task.version_code_2, task.source_id_1,
task.source_id_2, task.compare_task_type)
def _compare_id_gen(self):
def finish_task(self, compare_id):
"""
生成任务ID
结束任务
:return:
"""
pass
self._check_task_not_exist(compare_id)
return self.dao.finish_task(compare_id)
def create_task(self):
def has_finished(self, compare_id):
"""
创建任务
判断任务是否结束
:param compare_id:
:return:
"""
pass
self._check_task_not_exist(compare_id)
return self.dao.check_task_status(compare_id)
def finish_task(self):
def _check_task_exist(self, compare_id):
"""
结束任务
校验任务是否存在
:param compare_id:
:return:
"""
pass
if self.dao.task_exists(compare_id):
log.error(f'版本比较任务已存在,任务ID={compare_id}')
raise self.MetadataCompareTaskError()
def has_finished(self, compare_id):
def _check_task_not_exist(self, compare_id):
"""
判断任务是否结束
校验任务是否不存在
:param compare_id:
:return:
"""
pass
if not self.dao.task_exists(compare_id):
log.error(f'版本比较任务不存在,请先创建任务')
raise self.MetadataCompareTaskError()

@ -0,0 +1,37 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/5/27 22:06
# @Author : old tom
# @File : metadata_task_dao.py
# @Project : futool-tiny-datahub
# @Desc :
from common.fudb.connectors.connector_factory import ConnFactory
from common.fudb.dbapis.fu_dao import BaseDao
class MetaTaskDao(BaseDao):
def __init__(self, connector: ConnFactory):
super().__init__(connector)
def create_task(self, compare_id, version_code_1, version_code_2, source_id_1, source_id_2, compare_task_type):
sql = f"insert into metadata_compare_task (compare_id,compare_task_type,source_id_1,source_id_2,version_code_1,version_code_2)" \
f" values ('{compare_id}','{compare_task_type}','{source_id_1}','{source_id_2}',{version_code_1},{version_code_2})"
return self.execute_update(sql) > 0
def finish_task(self, compare_id):
sql = f"update metadata_compare_task set has_finish='Y',finish_time=now() where compare_id='{compare_id}'"
return self.execute_update(sql) > 0
def check_task_status(self, compare_id):
"""
校验任务是否完成
:param compare_id:
:return:
"""
sql = f"select has_finish,to_char(created_time,'yyyy-MM-dd hh24:mi:ss') from metadata_compare_task where compare_id='{compare_id}'"
return self.query_one(sql)
def task_exists(self, compare_id):
rt = self.query_one(f"select count(1) from metadata_compare_task where compare_id='{compare_id}'")[0]
return rt > 0
Loading…
Cancel
Save