You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

115 lines
3.5 KiB

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/4/8 9:25
# @Author : old tom
# @File : datasource_manage.py
# @Project : futool-tiny-datahub
# @Desc : 元数据管理
from common.fudb.connectors.connector_factory import ConnFactory
from common.fudb.dbapis.fu_db_api import select_one
from datahub.datasource.dsdao.ds_dao import DataSourceDao
from datahub.datasource.constant import ds_conf_param, DUAL_DB_TYPE
from datahub.local_db_conf import local_conn
from datahub.metadata.metaversion.metadata_version import MetadataVersionKeeper
class DataSource(object):
def __init__(self, source_id, conf: ds_conf_param):
self.source_id = source_id
self.conf = conf
self.connector = ConnFactory(conf)
def get_connector(self):
return self.connector
class DataSourceManage(object):
"""
数据源管理
"""
def __init__(self):
self.dao = DataSourceDao(local_conn)
def add(self, conf: ds_conf_param, cron='0 0 0 1/1 * ?'):
# 初始化连接器
connector = ConnFactory(conf)
if self.dao.exist_by_source(connector.connector_id):
# 数据源已存在
return False, 'datasource all ready exists'
# 初始化校验器
checker = DataSourceChecker(connector)
result, msg = checker.check()
if result:
# 入库并添加扫描任务 TODO 添加事务管理
rt = self.dao.add_datasource(connector.connector_id, conf, cron), ''
# 初始化版本
if rt:
version_keeper = MetadataVersionKeeper(connector.connector_id)
add_version_rt = version_keeper.init_version()
return True, "添加数据源并初始化版本成功" if add_version_rt else False, "添加数据源成功,初始化版本失败"
return False, '添加数据源失败'
else:
# 返回错误信息
return result, f'check failed,{msg}'
def remove(self, source_id):
self.dao.remove_datasource(source_id)
def check(self, source_id):
pass
def deactivate(self, source_id):
"""
停用数据源
:param source_id:
:return:
"""
return self.dao.deactivate_datasource(source_id)
def edit(self, source_id, param_dict):
"""
编辑数据源
:return:
"""
return self.dao.edit_datasource_conf(source_id, param_dict)
def get(self, source_id) -> DataSource:
"""
获取数据源
:param source_id:
:return:
"""
conf = self.dao.query_datasource_conf(source_id)
# 转为命名元组
conf_tuple = ds_conf_param._make(conf)
return DataSource(source_id, conf_tuple)
class DataSourceChecker(object):
"""
数据源检测
"""
def __init__(self, connector: ConnFactory):
self.connector = connector
def check(self):
"""
未知数据源连接测试
:return:
"""
try:
# 获取连接
conn = self.connector.get_conn()
except Exception as e:
return False, f'cannot get connection,e={e}'
try:
# 测试select 1
sql = 'select 1 ' + (
'from dual' if self.connector.db_type in DUAL_DB_TYPE else '')
return int(select_one(conn, sql)[0]) > 0, 'success'
except Exception as e:
return False, f'cannot execute "select 1",e={e}'