diff --git a/datahub/metadata/metaversion/metadata_compare_task.py b/datahub/metadata/metaversion/metadata_compare_task.py index 6892492..f0c2a21 100644 --- a/datahub/metadata/metaversion/metadata_compare_task.py +++ b/datahub/metadata/metaversion/metadata_compare_task.py @@ -6,54 +6,92 @@ # @Project : futool-tiny-datahub # @Desc : 版本比较任务管理 from common.futool.core.fu_lang import str_md5 +from datahub.local_db_conf import local_conn +from datahub.log_conf import log +from dataclasses import dataclass +from datahub.metadata.metaversion.versiondao.metadata_task_dao import MetaTaskDao + + +@dataclass +class MetadataCompareTask(object): + # 版本号1 + version_code_1: int + # 版本号2 + version_code_2: int + # 版本号1对应的数据源ID + source_id_1: str + # 版本号2对应的数据源ID + source_id_2: str = None + # 比较任务类型 + compare_task_type: str = 'same' if source_id_2 is None else 'diff' + + def compare_id_gen(self) -> str: + """ + 生成任务ID + :return: + """ + return str_md5(''.join([str(getattr(self, x)) for x in self.__dict__.keys()])) class MetadataCompareTaskManage(object): + class MetadataCompareTaskError(Exception): + """ + 自定义异常 + """ + + def __init__(self, msg=''): + Exception.__init__(self, msg) + """ 版本比较任务管理 """ - def __init__(self, version_code_1, version_code_2, source_id_1, source_id_2=None): + def __init__(self): + self.dao = MetaTaskDao(local_conn) + + def create_task(self, task: MetadataCompareTask) -> bool: """ - 初始化任务 - :param version_code_1: 版本号1 - :param version_code_2: 版本号2 - :param source_id_1: 数据源1 - :param source_id_2: 数据源2 + 创建任务 + :return: """ - self.version_code_1 = version_code_1 - self.version_code_2 = version_code_2 - self.source_id_1 = source_id_1 - self.compare_task_type = 'same' - if source_id_2: - self.source_id_2 = source_id_2 - self.compare_task_type = 'diff' + compare_id = task.compare_id_gen() + self._check_task_exist(compare_id) + return self.dao.create_task(compare_id, task.version_code_1, task.version_code_2, task.source_id_1, + task.source_id_2, task.compare_task_type) - def _compare_id_gen(self): + def finish_task(self, compare_id): """ - 生成任务ID + 结束任务 :return: """ - pass + self._check_task_not_exist(compare_id) + return self.dao.finish_task(compare_id) - def create_task(self): + def has_finished(self, compare_id): """ - 创建任务 + 判断任务是否结束 + :param compare_id: :return: """ - pass + self._check_task_not_exist(compare_id) + return self.dao.check_task_status(compare_id) - def finish_task(self): + def _check_task_exist(self, compare_id): """ - 结束任务 + 校验任务是否存在 + :param compare_id: :return: """ - pass + if self.dao.task_exists(compare_id): + log.error(f'版本比较任务已存在,任务ID={compare_id}') + raise self.MetadataCompareTaskError() - def has_finished(self, compare_id): + def _check_task_not_exist(self, compare_id): """ - 判断任务是否结束 + 校验任务是否不存在 :param compare_id: :return: """ - pass + if not self.dao.task_exists(compare_id): + log.error(f'版本比较任务不存在,请先创建任务') + raise self.MetadataCompareTaskError() diff --git a/datahub/metadata/metaversion/versiondao/metadata_task_dao.py b/datahub/metadata/metaversion/versiondao/metadata_task_dao.py new file mode 100644 index 0000000..bfca54d --- /dev/null +++ b/datahub/metadata/metaversion/versiondao/metadata_task_dao.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2023/5/27 22:06 +# @Author : old tom +# @File : metadata_task_dao.py +# @Project : futool-tiny-datahub +# @Desc : + +from common.fudb.connectors.connector_factory import ConnFactory +from common.fudb.dbapis.fu_dao import BaseDao + + +class MetaTaskDao(BaseDao): + def __init__(self, connector: ConnFactory): + super().__init__(connector) + + def create_task(self, compare_id, version_code_1, version_code_2, source_id_1, source_id_2, compare_task_type): + sql = f"insert into metadata_compare_task (compare_id,compare_task_type,source_id_1,source_id_2,version_code_1,version_code_2)" \ + f" values ('{compare_id}','{compare_task_type}','{source_id_1}','{source_id_2}',{version_code_1},{version_code_2})" + return self.execute_update(sql) > 0 + + def finish_task(self, compare_id): + sql = f"update metadata_compare_task set has_finish='Y',finish_time=now() where compare_id='{compare_id}'" + return self.execute_update(sql) > 0 + + def check_task_status(self, compare_id): + """ + 校验任务是否完成 + :param compare_id: + :return: + """ + sql = f"select has_finish,to_char(created_time,'yyyy-MM-dd hh24:mi:ss') from metadata_compare_task where compare_id='{compare_id}'" + return self.query_one(sql) + + def task_exists(self, compare_id): + rt = self.query_one(f"select count(1) from metadata_compare_task where compare_id='{compare_id}'")[0] + return rt > 0