Compare commits

...

No commits in common. 'main' and 'master' have entirely different histories.
main ... master

162
.gitignore vendored

@ -0,0 +1,162 @@
# ---> Python
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
*.pyc
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
.idea/

@ -1,9 +0,0 @@
MIT License
Copyright (c) <year> <copyright holders>
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

@ -0,0 +1,11 @@
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"
[packages]
[dev-packages]
[requires]
python_version = "3.8"

20
Pipfile.lock generated

@ -0,0 +1,20 @@
{
"_meta": {
"hash": {
"sha256": "7f7606f08e0544d8d012ef4d097dabdd6df6843a28793eb6551245d4b2db4242"
},
"pipfile-spec": 6,
"requires": {
"python_version": "3.8"
},
"sources": [
{
"name": "pypi",
"url": "https://pypi.org/simple",
"verify_ssl": true
}
]
},
"default": {},
"develop": {}
}

@ -1,3 +1,59 @@
# futool-db
数据库操作工具
数据库操作工具库,基于sqlalchemy实现oracle、postgresql的原生SQL操作
# 一、初始化数据库及建立连接
```python
# oracle
oracle_factory = ConnFactory('oracle', 'xx', 'root@123', 'localhost', 1521, 'XE')
# pg
pg_factory = ConnFactory('postgresql', 'xxx', 'root@123', 'localhost', 5432, 'postgres')
# 获取连接
conn = pg_factory.get_conn()
```
## 连接池配置
```python
class CommonConnector(metaclass=abc.ABCMeta):
def __init__(self, db_conf: str):
# 初始化pool_size可修改初始化连接数,其他参数可参考sqlalchemy连接池配置
self.engine = create_engine(db_conf, pool_size=15, pool_recycle=3600)
```
# 二、sql操作
## CRUD
```python
from fudb.dbapis.fu_db_api import select_all, select_one, execute_update, batch_insert
# 返回第一条
select_one(oracle_factory.get_conn(), 'select * from test limit 20')
# 查全部
select_all(oracle_factory.get_conn(), 'select * from test limit 20')
# 数据量统计
count(oracle_factory.get_conn(), 'test')
# delete \insert \update
execute_update(oracle_factory.get_conn(),'delete from test where id=1')
# 批量插入
batch_insert(conn, 'oracle', 'into t_user (id,name,age) values %s', dds, 10)
# 注意:上述方法都会自动关闭连接,如果不想关闭的话可以参考源码进行修改
```
## 事务
```python
try:
# 开启事务
conn.begin()
# 执行SQL
rt = conn.execute(text(sql))
# 提交
conn.commit()
# 返回受影响行数,可作为执行是否成功判断依据
return rt.rowcount
except Exception as e:
# 回滚
conn.rollback()
raise SqlExecuteError(msg=f'sql [{sql}] 执行失败,开始回滚,e={e}')
finally:
# 关闭连接
conn.close()
```

@ -0,0 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/4/4 16:00
# @Author : old tom
# @File : __init__.py.py
# @Project : futool-db
# @Desc : 数据库模块

@ -0,0 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/4/4 16:02
# @Author : old tom
# @File : __init__.py.py
# @Project : futool-db
# @Desc : 数据库连接器

@ -0,0 +1,32 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/4/4 18:33
# @Author : old tom
# @File : connector_factory.py
# @Project : futool-db
# @Desc : 连接器工厂
from fudb.connectors.dialect.dialect_connector import OracleConnector, PostgresqlConnector
from urllib.parse import quote_plus as urlquote
class ConnFactory(object):
"""
数据库连接器工厂
"""
CONNECTION_CONTAINER = {
'oracle': OracleConnector,
'postgresql': PostgresqlConnector
}
def __init__(self, db_type, user, password, host, port, database):
self.db_type = db_type
# urlquote 用于处理密码中的特殊字符例如@
self.connector = self.CONNECTION_CONTAINER[self.db_type](user, urlquote(password), host, port, database)
def get_conn(self):
"""
获取连接
:return:
"""
return self.connector.get_conn()

@ -0,0 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/4/4 18:54
# @Author : old tom
# @File : __init__.py.py
# @Project : futool-db
# @Desc :

@ -0,0 +1,25 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/4/4 16:04
# @Author : old tom
# @File : abs_connector.py
# @Project : futool-db
# @Desc : 抽象层
import abc
from sqlalchemy import create_engine
class CommonConnector(metaclass=abc.ABCMeta):
def __init__(self, db_conf: str):
# 初始化
self.engine = create_engine(db_conf, pool_size=15, pool_recycle=3600)
@abc.abstractmethod
def get_conn(self):
"""
获取连接
:return:
"""
pass

@ -0,0 +1,35 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/4/4 18:55
# @Author : old tom
# @File : dialect_connector.py
# @Project : futool-db
# @Desc :
from fudb.connectors.dialect.abs_connector import CommonConnector
class OracleConnector(CommonConnector):
"""
oracle 连接器,目前仅支持sever_name方式,SID方式待开发
"""
DSN = 'oracle+cx_oracle://{0}:{1}@{2}:{3}/?service_name={4}'
def __init__(self, user, password, host, port=1521, server_name='orcl'):
super().__init__(self.DSN.format(user, password, host, port, server_name))
def get_conn(self):
return self.engine.connect()
class PostgresqlConnector(CommonConnector):
"""
pg连接器
"""
PG_DIALECT = 'postgresql+psycopg2://{0}:{1}@{2}:{3}/{4}'
def __init__(self, user, password, host, port=5432, database='postgres'):
super().__init__(self.PG_DIALECT.format(user, password, host, port, database))
def get_conn(self):
return self.engine.connect()

@ -0,0 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/4/4 16:03
# @Author : old tom
# @File : __init__.py.py
# @Project : futool-db
# @Desc : 操作数据库API

@ -0,0 +1,24 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/4/5 11:26
# @Author : old tom
# @File : fu_collection.py
# @Project : futool-db
# @Desc : 集合类工具
def split_coll(data: [], part_size=5):
"""
分割集合
:param data:
:param part_size:
:return:
"""
rt = []
if len(data) <= part_size:
rt.append(data)
else:
rt.append(data[0:part_size])
for j, d in enumerate(data):
if j > 0 and j % part_size == 0:
rt.append(data[j:j + part_size])
return rt

@ -0,0 +1,189 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/4/4 17:29
# @Author : old tom
# @File : fu_db_api.py
# @Project : futool-db
# @Desc :
from sqlalchemy import Connection, text, CursorResult
from fudb.dbapis.fu_collection import split_coll
from psycopg2.extras import execute_values
class SqlExecuteError(Exception):
def __init__(self, msg=''):
Exception.__init__(self, msg)
def _select(conn: Connection, sql) -> CursorResult:
"""
自动关闭连接
:param conn:
:return:
"""
try:
return conn.execute(text(sql))
finally:
conn.close()
def _execute_with_tx(conn: Connection, sql):
"""
带事务执行SQL
:return:
"""
try:
conn.begin()
rt = conn.execute(text(sql))
conn.commit()
return rt.rowcount
except Exception as e:
conn.rollback()
raise SqlExecuteError(msg=f'sql [{sql}] 执行失败,开始回滚,e={e}')
finally:
conn.close()
def select_one(conn: Connection, sql):
"""
查询一个
:param conn:
:param sql:
:return:
"""
return _select(conn, sql).fetchone()
def select_all(conn: Connection, sql):
"""
查询全部
:param conn:
:param sql:
:return:
"""
return _select(conn, sql).fetchall()
def count(conn: Connection, table):
"""
统计数据量
:param conn:
:param table:
:return:
"""
count_tpl = f'select count(1) from {table}'
return select_one(conn, count_tpl)[0]
def execute_update(conn: Connection, sql):
"""
带事务执行,可用于insert update delete 语句
:param conn:
:param sql:
:return: 受影响的行数与java-jdbc的execute_update返回true|false相似,可用于判断是否执行成功
"""
return _execute_with_tx(conn, sql)
def batch_insert(conn: Connection, db_type, sql_tpl, data, batch_size=1500):
"""
批量插入
:param conn: 数据库连接
:param batch_size: 每次插入量
:param db_type: 数据库类型
:param sql_tpl: insert into t1 (f1,f2,f3) values %s
:param data: [(1,'tom',29),(2,'jack',30)]
:return:
"""
handler = BatchInsertHandler(db_type, sql_tpl, data, batch_size)
insert_sqls = handler.build_insert()
# 插入都在一个事务内
row_count = 0
try:
conn.begin()
for sql_set in insert_sqls:
rt = conn.execute(text(sql_set))
row_count += rt.rowcount
conn.commit()
return row_count
except Exception as e:
conn.rollback()
raise SqlExecuteError(msg=f"批量插入异常,e={e}")
finally:
conn.close()
class BatchInsertHandler(object):
"""
批量插入处理器
oracle :
insert all
into oracle_table ( id, code ) values( 1 , '1' )
into oracle_table ( id, code ) values( 2 , '2' )
into oracle_table ( id, code ) values( 3 , '3' )
into oracle_table ( id, code ) values( 4 , '4' )
select 1 from dual ;
postgresql and mysql
into oracle_table ( id, code ) values( 1 , '1' ),( 2 , '2' ),( 3 , '3' )
"""
BUILD_INSERT = {
'oracle': 'build_oracle_insert',
'postgresql': 'build_pg_insert',
'mysql': 'build_mysql_insert'
}
class NotSupportError(Exception):
def __init__(self, msg=''):
Exception.__init__(self, msg)
def __init__(self, db_type, sql_tpl, data, batch_size):
"""
:param db_type: 数据库类型
:param sql_tpl: pg及mysql: insert into t1 (f1,f2,f3) values %s
oracle: into t1 (f1,f2,f3) values %s
:param data: [(1,'tom',29),(2,'jack',30)]
:param batch_size:
"""
if db_type not in ['oracle', 'postgresql']:
raise self.NotSupportError()
self.db_type = db_type
self.sql_tpl = sql_tpl
self.data = data
self.batch_size = batch_size
def _split_data(self):
return split_coll(self.data, self.batch_size)
def build_insert(self):
data_set = self._split_data()
sql_set = []
for part in data_set:
sql_set.append(getattr(self, self.BUILD_INSERT[self.db_type])(part))
return sql_set
def build_oracle_insert(self, data_set):
begin = 'insert all \r '
for ds in data_set:
val = '('
for ele in ds:
val += "'" + ele + "'," if isinstance(ele, str) else str(ele) + ','
val = val[0:-1] + ')'
begin += (self.sql_tpl.replace('%s', val) + ' \r ')
end = 'select 1 from dual'
return begin + end
def build_pg_insert(self, data_set):
vals = ''
for ds in data_set:
val = '('
for ele in ds:
val += "'" + ele + "'," if isinstance(ele, str) else str(ele) + ','
val = val[0:-1] + ')'
vals += val + ','
return self.sql_tpl.replace('%s', vals[0:-1])
def build_mysql_insert(self, data_set):
return self.build_pg_insert(data_set)

Binary file not shown.

@ -0,0 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/4/4 17:37
# @Author : old tom
# @File : __init__.py.py
# @Project : futool-db
# @Desc :

@ -0,0 +1,17 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/4/4 18:42
# @Author : old tom
# @File : test_connector_factory.py
# @Project : futool-db
from unittest import TestCase
from fudb.connectors.connector_factory import ConnFactory
from fudb.dbapis.fu_db_api import select_one, count, select_all
# @Desc :
class TestConnFactory(TestCase):
def test_get_conn(self):
factory = ConnFactory('oracle', 'yngs_pro_constr', 'yngs_pro_constr', '172.16.1.36', 1523, 'testdb')
conn = factory.get_conn()
print(select_all(conn, 'select * from ODS_FINANCE_PLAN where rownum<=2'))

@ -0,0 +1,55 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/4/4 22:05
# @Author : old tom
# @File : test_dialect_connector.py
# @Project : futool-db
from unittest import TestCase
from fudb.dbapis.fu_db_api import select_all, select_one, count, execute_update, batch_insert
from fudb.connectors.connector_factory import ConnFactory
dds = [(4, 'zr4', 34), (5, 'zr5', 35), (6, 'zr6', 36), (7, 'zr7', 37), (8, 'zr8', 38), (9, 'zr9', 39),
(10, 'zr10', 40), (11, 'zr11', 41), (12, 'zr12', 42), (13, 'zr13', 43), (14, 'zr14', 44), (15, 'zr15', 45),
(16, 'zr16', 46), (17, 'zr17', 47), (18, 'zr18', 48), (19, 'zr19', 49), (20, 'zr20', 50), (21, 'zr21', 51),
(22, 'zr22', 52), (23, 'zr23', 53), (24, 'zr24', 54), (25, 'zr25', 55), (26, 'zr26', 56), (27, 'zr27', 57),
(28, 'zr28', 58), (29, 'zr29', 59), (30, 'zr30', 60), (31, 'zr31', 61), (32, 'zr32', 62), (33, 'zr33', 63),
(34, 'zr34', 64), (35, 'zr35', 65), (36, 'zr36', 66), (37, 'zr37', 67), (38, 'zr38', 68), (39, 'zr39', 69),
(40, 'zr40', 70), (41, 'zr41', 71), (42, 'zr42', 72), (43, 'zr43', 73), (44, 'zr44', 74), (45, 'zr45', 75),
(46, 'zr46', 76), (47, 'zr47', 77), (48, 'zr48', 78), (49, 'zr49', 79), (50, 'zr50', 80), (51, 'zr51', 81),
(52, 'zr52', 82), (53, 'zr53', 83), (54, 'zr54', 84), (55, 'zr55', 85), (56, 'zr56', 86), (57, 'zr57', 87),
(58, 'zr58', 88), (59, 'zr59', 89), (60, 'zr60', 90), (61, 'zr61', 91), (62, 'zr62', 92), (63, 'zr63', 93),
(64, 'zr64', 94), (65, 'zr65', 95), (66, 'zr66', 96), (67, 'zr67', 97), (68, 'zr68', 98), (69, 'zr69', 99),
(70, 'zr70', 100), (71, 'zr71', 101), (72, 'zr72', 102), (73, 'zr73', 103), (74, 'zr74', 104),
(75, 'zr75', 105), (76, 'zr76', 106), (77, 'zr77', 107), (78, 'zr78', 108), (79, 'zr79', 109),
(80, 'zr80', 110), (81, 'zr81', 111), (82, 'zr82', 112), (83, 'zr83', 113), (84, 'zr84', 114),
(85, 'zr85', 115), (86, 'zr86', 116), (87, 'zr87', 117), (88, 'zr88', 118), (89, 'zr89', 119),
(90, 'zr90', 120), (91, 'zr91', 121), (92, 'zr92', 122), (93, 'zr93', 123), (94, 'zr94', 124),
(95, 'zr95', 125), (96, 'zr96', 126), (97, 'zr97', 127), (98, 'zr98', 128), (99, 'zr99', 129),
(100, 'zr100', 130), (101, 'zr101', 131), (102, 'zr102', 132), (103, 'zr103', 133)]
# @Desc :
class TestPostgresqlConnector(TestCase):
def test_get_conn(self):
# factory = ConnFactory('postgresql', 'postgres', 'root@123', 'localhost', 5432, 'postgres')
oracle_factory = ConnFactory('oracle', 'zr', 'root@123', 'localhost', 1521, 'XE')
# factory2 = ConnFactory('postgresql', 'postgres', 'root@123', 'localhost', 5432, 'postgres')
# print(id(factory), id(factory2))
conn = oracle_factory.get_conn()
# 返回第一条
select_one(oracle_factory.get_conn(), 'select * from test limit 20')
# 查全部
select_all(oracle_factory.get_conn(), 'select * from test limit 20')
# 数据量统计
count(oracle_factory.get_conn(), 'test')
# delete \insert \update
execute_update(oracle_factory.get_conn(),'delete from test where id=1')
# 批量插入
batch_insert(conn, 'oracle', 'into t_user (id,name,age) values %s', dds, 10)
# print(select_all(factory.get_conn(), 'select * from test limit 20'))
# print(select_all(factory.get_conn(), 'select * from t_user limit 20'))
# conn.close()
# execute_update(conn, 'update t_user set age=100 where id=3')
# print(execute_update(conn, 'delete from t_user where id=3'))
print(batch_insert(conn, 'oracle', 'into t_user (id,name,age) values %s', dds, 10))

@ -0,0 +1,20 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/4/4 17:38
# @Author : old tom
# @File : test_oracle_connector.py
# @Project : futool-db
from unittest import TestCase
from fudb.connectors.dialect.dialect_connector import OracleConnector
from fudb.dbapis.fu_db_api import select_all
# @Desc :
class TestOracleConnector(TestCase):
def test_get_conn(self):
oc = OracleConnector('yngs_pro_constr', 'yngs_pro_constr', '172.16.1.36', 1523, 'testdb')
oc2 = OracleConnector('yngs_pro_constr', 'yngs_pro_constr', '172.16.1.36', 1523, 'testdb')
print(oc is oc2)
# conn = oc.get_conn()
# print(select_all(conn, 'select * from ODS_FINANCE_PLAN where rownum<=100'))
# # print(count(conn, 'ODS_FINANCE_PLAN'))
Loading…
Cancel
Save