#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2023/6/22 19:54 # @Author : old tom # @File : dbengine.py # @Project : futool-db-lite # @Desc : from urllib.parse import quote_plus as urlquote from sqlalchemy import create_engine from config.db_config import DbConfigLoader, DEFAULT_CONF_PATH from session.session import SqlsessionFactory from util.futool_lang import str_md5 class DatabaseEngineError(Exception): def __init__(self, msg): Exception.__init__(self, msg) class ObjContainer(object): # DatabaseEngine容器 engines = {} # SqlsessionFactory容器 factory = {} def has_obj(self, attr, k): return k in getattr(self, attr) def get_obj(self, attr, k): return getattr(self, attr)[k] def put_obj(self, attr, k, v): getattr(self, attr)[k] = v def remove_obj(self, attr, k): del getattr(self, attr)[k] def clean(self): self.engines = {} self.factory = {} container = ObjContainer() class DatabaseEngine(object): """ 数据库连接 dialect+driver://username:password@host:port/database mysql: mysql+pymysql://scott:tiger@localhost/foo sqlserver: mssql+pymssql://scott:tiger@hostname:port/dbname """ DB_URL = { 'postgresql': 'postgresql+psycopg2://{0}:{1}@{2}:{3}/{4}', # 使用thin客户端,不需要oracle client 'oracle': 'oracle+oracledb://{0}:{1}@{2}:{3}/?service_name={4}' } # 连接池容器名称,具体看ObjContainer对象 attr = 'engines' def __init__(self, db_name, conf_path=DEFAULT_CONF_PATH): """ pool_size:连接池大小 pool_recycle:连接回收(秒) pool_pre_ping:测试连接,执行select 1 参考 https://docs.sqlalchemy.org/en/20/core/pooling.html#connection-pool-configuration """ loader = DbConfigLoader(db_name, conf_path) # 数据库配置 conf = loader.db_conf if conf.dialect not in self.DB_URL: raise DatabaseEngineError(msg='不支持的数据库类型') # urlquote 处理密码中的特殊字符 url = self.DB_URL[conf.dialect].format(conf.user, urlquote(conf.passwd), conf.host, conf.port, conf.database) # URL生成唯一ID self.engine_id = str_md5(url) # 根据数据源ID判断是否新建连接池 if container.has_obj(self.attr, self.engine_id): self.engine = container.get_obj(self.attr, self.engine_id) else: engine = create_engine(url, pool_size=conf.pool_size, pool_recycle=conf.pool_recycle, pool_pre_ping=True, echo=conf.show_sql) container.put_obj(self.attr, self.engine_id, engine) self.engine = engine def create_session_factory(self): """ 创建session工厂 相同数据库URL下全局只有一个SqlsessionFactory,除非有其他数据源 根据engine_id判断是否新建对象 :return: """ attr = 'factory' if container.has_obj(attr, self.engine_id): return container.get_obj(attr, self.engine_id) else: factory = SqlsessionFactory(self.engine) container.put_obj(attr, self.engine_id, factory) return factory