#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2023/4/8 10:11 # @Author : old tom # @File : ds_dao.py # @Project : futool-tiny-datahub # @Desc : 数据库操作 from common.fudb.connectors.connector_factory import ConnFactory from common.fudb.dbapis.fu_dao import BaseDao from datahub.datasource.constant import ds_conf_param class DataSourceDao(BaseDao): def __init__(self, connector: ConnFactory): super().__init__(connector) def add_datasource(self, source_id, conf: ds_conf_param): sql = f"insert into datasource_main (source_id,source_type,host,port,username,password,database_name) values ('{source_id}'" \ f",'{conf.db_type}','{conf.host}',{conf.port},'{conf.user}','{conf.password}','{conf.database}')" return self.execute_update(sql) > 0 def remove_datasource(self, source_id): return self.execute_update(f"delete from datasource_main where source_id='{source_id}'") def deactivate_datasource(self, source_id): return self.execute_update( f"update datasource_main set has_used='N' where source_id='{source_id}'") > 0 def exist_by_source(self, source_id): """ 判断数据源是否存在 :param source_id: :return: """ return self.query_one(f"select 1 from datasource_main where source_id='{source_id}'") > 0 def edit_datasource_conf(self, source_id, param_dict): """ 数据源编辑 :param source_id: 主键 :param param_dict: 参数字典 kv,k与数据库字段名称相同 :return: """ return self.dynamic_update_by_param('datasource_main', f"where source_id='{source_id}'", param_dict) def query_datasource_conf(self, source_id): conf_field = ['source_type', 'username', 'password', 'host', 'port', 'database_name'] return self.query_one( f"select {','.join(conf_field)} from datasource_main where source_id='{source_id}'") if __name__ == '__main__': local_ds = ds_conf_param('postgresql', 'postgres', 'root@123', 'localhost', 5432, 'postgres') dao = DataSourceDao(ConnFactory(local_ds)) rt = dao.query_datasource_conf('db143d11741a9575fdea92ed2b39dc53') print(ds_conf_param._make(rt))