#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2023/4/8 9:25 # @Author : old tom # @File : datasource_manage.py # @Project : futool-tiny-datahub # @Desc : 元数据管理 from common.fudb.connectors.connector_factory import ConnFactory from common.fudb.dbapis.fu_db_api import select_one from datahub.datasource.dsdao.ds_dao import DataSourceDao from datahub.datasource.constant import ds_conf_param, DUAL_DB_TYPE from datahub.local_db_conf import local_conn from datahub.metadata.metaversion.metadata_version import MetadataVersionKeeper class DataSource(object): def __init__(self, source_id, conf: ds_conf_param): self.source_id = source_id self.conf = conf self.connector = ConnFactory(conf) def get_connector(self): return self.connector class DataSourceManage(object): """ 数据源管理 """ def __init__(self): self.dao = DataSourceDao(local_conn) def add(self, conf: ds_conf_param, cron='0 0 0 1/1 * ?'): # 初始化连接器 connector = ConnFactory(conf) if self.dao.exist_by_source(connector.connector_id): # 数据源已存在 return False, 'datasource all ready exists' # 初始化校验器 checker = DataSourceChecker(connector) result, msg = checker.check() if result: # 入库并添加扫描任务 TODO 添加事务管理 rt = self.dao.add_datasource(connector.connector_id, conf, cron), '' # 初始化版本 if rt: version_keeper = MetadataVersionKeeper(connector.connector_id) add_version_rt = version_keeper.init_version() return True, "添加数据源并初始化版本成功" if add_version_rt else False, "添加数据源成功,初始化版本失败" return False, '添加数据源失败' else: # 返回错误信息 return result, f'check failed,{msg}' def remove(self, source_id): self.dao.remove_datasource(source_id) def check(self, source_id): pass def deactivate(self, source_id): """ 停用数据源 :param source_id: :return: """ return self.dao.deactivate_datasource(source_id) def edit(self, source_id, param_dict): """ 编辑数据源 :return: """ return self.dao.edit_datasource_conf(source_id, param_dict) def get(self, source_id) -> DataSource: """ 获取数据源 :param source_id: :return: """ conf = self.dao.query_datasource_conf(source_id) # 转为命名元组 conf_tuple = ds_conf_param._make(conf) return DataSource(source_id, conf_tuple) class DataSourceChecker(object): """ 数据源检测 """ def __init__(self, connector: ConnFactory): self.connector = connector def check(self): """ 未知数据源连接测试 :return: """ try: # 获取连接 conn = self.connector.get_conn() except Exception as e: return False, f'cannot get connection,e={e}' try: # 测试select 1 sql = 'select 1 ' + ( 'from dual' if self.connector.db_type in DUAL_DB_TYPE else '') return int(select_one(conn, sql)[0]) > 0, 'success' except Exception as e: return False, f'cannot execute "select 1",e={e}'