#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2023/4/8 10:11 # @Author : old tom # @File : ds_dao.py # @Project : futool-tiny-datahub # @Desc : 数据库操作 from common.fudb.connectors.connector_factory import ConnFactory from common.fudb.dbapis.fu_dao import BaseDao from sqlalchemy import text from datahub.log_conf import log from datahub.datasource.constant import ds_conf_param class DataSourceDao(BaseDao): def __init__(self, connector: ConnFactory): super().__init__(connector) def add_datasource(self, source_id, conf: ds_conf_param, cron): conn = self.connector.get_conn() try: # 开启事务 conn.begin() # 添加数据源 sql = f"insert into datasource_main (source_id,source_type,host,port,username,password,database_name) values ('{source_id}'" \ f",'{conf.db_type}','{conf.host}',{conf.port},'{conf.user}','{conf.password}','{conf.database}')" conn.execute(text(sql)) # 创建扫描任务 scan_sql = f"insert into scan_task_conf (source_id,cron_expression) values ('{source_id}','{cron}')" conn.execute(text(scan_sql)) conn.commit() return True except Exception as e: conn.rollback() log.error(f'添加数据源[{conf}]失败,e={e}') return False finally: conn.close() def remove_datasource(self, source_id): return self.execute_update(f"delete from datasource_main where source_id='{source_id}'") def deactivate_datasource(self, source_id): """ 禁用数据源 :param source_id: :return: """ return self.execute_update( f"update datasource_main set has_used='N' where source_id='{source_id}'") > 0 def exist_by_source(self, source_id): """ 判断数据源是否存在 :param source_id: :return: """ sql = f"select 1 from datasource_main where source_id='{source_id}'" return self.query_one(sql) is not None def edit_datasource_conf(self, source_id, param_dict): """ 数据源编辑 :param source_id: 主键 :param param_dict: 参数字典 kv,k与数据库字段名称相同 :return: """ return self.dynamic_update_by_param('datasource_main', f"where source_id='{source_id}'", param_dict) def query_datasource_conf(self, source_id): conf_field = ['source_type', 'username', 'password', 'host', 'port', 'database_name'] return self.query_one( f"select {','.join(conf_field)} from datasource_main where source_id='{source_id}'")