数据库操作工具
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
old-tom 2dafde0463
更新 'README.md'
2 years ago
fudb feat:first commit 2 years ago
test feat:first commit 2 years ago
.gitignore first commit 2 years ago
Pipfile first commit 2 years ago
Pipfile.lock first commit 2 years ago
README.md 更新 'README.md' 2 years ago
requirements.txt feat:first commit 2 years ago

README.md

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

futool-db

数据库操作工具库,基于sqlalchemy实现oracle、postgresql的原生SQL操作

一、初始化数据库及建立连接

# oracle
oracle_factory = ConnFactory('oracle', 'xx', 'root@123', 'localhost', 1521, 'XE')
# pg
pg_factory = ConnFactory('postgresql', 'xxx', 'root@123', 'localhost', 5432, 'postgres')
# 获取连接
conn = pg_factory.get_conn()

连接池配置

class CommonConnector(metaclass=abc.ABCMeta):

    def __init__(self, db_conf: str):
        # 初始化pool_size可修改初始化连接数,其他参数可参考sqlalchemy连接池配置
        self.engine = create_engine(db_conf, pool_size=15, pool_recycle=3600)

二、sql操作

CRUD

from fudb.dbapis.fu_db_api import select_all, select_one, execute_update, batch_insert

        # 返回第一条
        select_one(oracle_factory.get_conn(), 'select * from test limit 20')
        # 查全部
        select_all(oracle_factory.get_conn(), 'select * from test limit 20')
        # 数据量统计
        count(oracle_factory.get_conn(), 'test')
        # delete \insert \update
        execute_update(oracle_factory.get_conn(),'delete from test where id=1')
        # 批量插入
        batch_insert(conn, 'oracle', 'into t_user (id,name,age) values %s', dds, 10)
# 注意:上述方法都会自动关闭连接,如果不想关闭的话可以参考源码进行修改

事务

    try:
        # 开启事务
        conn.begin()
        # 执行SQL
        rt = conn.execute(text(sql))
        # 提交
        conn.commit()
        # 返回受影响行数,可作为执行是否成功判断依据
        return rt.rowcount
    except Exception as e:
        # 回滚
        conn.rollback()
        raise SqlExecuteError(msg=f'sql [{sql}] 执行失败,开始回滚,e={e}')
    finally:
        # 关闭连接
        conn.close()