diff --git a/fudb/connectors/__init__.py b/fudb/connectors/__init__.py new file mode 100644 index 0000000..a228065 --- /dev/null +++ b/fudb/connectors/__init__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2023/4/4 16:02 +# @Author : old tom +# @File : __init__.py.py +# @Project : futool-db +# @Desc : 数据库连接器 diff --git a/fudb/connectors/connector_factory.py b/fudb/connectors/connector_factory.py new file mode 100644 index 0000000..776be22 --- /dev/null +++ b/fudb/connectors/connector_factory.py @@ -0,0 +1,32 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2023/4/4 18:33 +# @Author : old tom +# @File : connector_factory.py +# @Project : futool-db +# @Desc : 连接器工厂 + +from fudb.connectors.dialect.dialect_connector import OracleConnector, PostgresqlConnector +from urllib.parse import quote_plus as urlquote + + +class ConnFactory(object): + """ + 数据库连接器工厂 + """ + CONNECTION_CONTAINER = { + 'oracle': OracleConnector, + 'postgresql': PostgresqlConnector + } + + def __init__(self, db_type, user, password, host, port, database): + self.db_type = db_type + # urlquote 用于处理密码中的特殊字符例如@ + self.connector = self.CONNECTION_CONTAINER[self.db_type](user, urlquote(password), host, port, database) + + def get_conn(self): + """ + 获取连接 + :return: + """ + return self.connector.get_conn() diff --git a/fudb/connectors/dialect/__init__.py b/fudb/connectors/dialect/__init__.py new file mode 100644 index 0000000..c5abc5d --- /dev/null +++ b/fudb/connectors/dialect/__init__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2023/4/4 18:54 +# @Author : old tom +# @File : __init__.py.py +# @Project : futool-db +# @Desc : diff --git a/fudb/connectors/dialect/abs_connector.py b/fudb/connectors/dialect/abs_connector.py new file mode 100644 index 0000000..d5a9744 --- /dev/null +++ b/fudb/connectors/dialect/abs_connector.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2023/4/4 16:04 +# @Author : old tom +# @File : abs_connector.py +# @Project : futool-db +# @Desc : 抽象层 + +import abc +from sqlalchemy import create_engine + + +class CommonConnector(metaclass=abc.ABCMeta): + + def __init__(self, db_conf: str): + # 初始化 + self.engine = create_engine(db_conf, pool_size=15, pool_recycle=3600) + + @abc.abstractmethod + def get_conn(self): + """ + 获取连接 + :return: + """ + pass diff --git a/fudb/connectors/dialect/dialect_connector.py b/fudb/connectors/dialect/dialect_connector.py new file mode 100644 index 0000000..e954f92 --- /dev/null +++ b/fudb/connectors/dialect/dialect_connector.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2023/4/4 18:55 +# @Author : old tom +# @File : dialect_connector.py +# @Project : futool-db +# @Desc : +from fudb.connectors.dialect.abs_connector import CommonConnector + + +class OracleConnector(CommonConnector): + """ + oracle 连接器,目前仅支持sever_name方式,SID方式待开发 + """ + + DSN = 'oracle+cx_oracle://{0}:{1}@{2}:{3}/?service_name={4}' + + def __init__(self, user, password, host, port=1521, server_name='orcl'): + super().__init__(self.DSN.format(user, password, host, port, server_name)) + + def get_conn(self): + return self.engine.connect() + + +class PostgresqlConnector(CommonConnector): + """ + pg连接器 + """ + PG_DIALECT = 'postgresql+psycopg2://{0}:{1}@{2}:{3}/{4}' + + def __init__(self, user, password, host, port=5432, database='postgres'): + super().__init__(self.PG_DIALECT.format(user, password, host, port, database)) + + def get_conn(self): + return self.engine.connect() diff --git a/fudb/dbapis/__init__.py b/fudb/dbapis/__init__.py new file mode 100644 index 0000000..30f1242 --- /dev/null +++ b/fudb/dbapis/__init__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2023/4/4 16:03 +# @Author : old tom +# @File : __init__.py.py +# @Project : futool-db +# @Desc : 操作数据库API diff --git a/fudb/dbapis/fu_collection.py b/fudb/dbapis/fu_collection.py new file mode 100644 index 0000000..af3ad4b --- /dev/null +++ b/fudb/dbapis/fu_collection.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2023/4/5 11:26 +# @Author : old tom +# @File : fu_collection.py +# @Project : futool-db +# @Desc : 集合类工具 + +def split_coll(data: [], part_size=5): + """ + 分割集合 + :param data: + :param part_size: + :return: + """ + rt = [] + if len(data) <= part_size: + rt.append(data) + else: + rt.append(data[0:part_size]) + for j, d in enumerate(data): + if j > 0 and j % part_size == 0: + rt.append(data[j:j + part_size]) + return rt diff --git a/fudb/dbapis/fu_db_api.py b/fudb/dbapis/fu_db_api.py new file mode 100644 index 0000000..66d275a --- /dev/null +++ b/fudb/dbapis/fu_db_api.py @@ -0,0 +1,195 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2023/4/4 17:29 +# @Author : old tom +# @File : fu_db_api.py +# @Project : futool-db +# @Desc : + +from sqlalchemy import Connection, text, CursorResult +from fudb.dbapis.fu_collection import split_coll +from psycopg2.extras import execute_values + + +class SqlExecuteError(Exception): + def __init__(self, msg=''): + Exception.__init__(self, msg) + + +def _select(conn: Connection, sql) -> CursorResult: + """ + 自动关闭连接 + :param conn: + :return: + """ + try: + return conn.execute(text(sql)) + finally: + conn.close() + + +def _execute_with_tx(conn: Connection, sql): + """ + 带事务执行SQL + :return: + """ + try: + conn.begin() + rt = conn.execute(text(sql)) + conn.commit() + return rt.rowcount + except Exception as e: + conn.rollback() + raise SqlExecuteError(msg=f'sql [{sql}] 执行失败,开始回滚,e={e}') + finally: + conn.close() + + +def select_one(conn: Connection, sql): + """ + 查询一个 + :param conn: + :param sql: + :return: + """ + return _select(conn, sql).fetchone() + + +def select_all(conn: Connection, sql): + """ + 查询全部 + :param conn: + :param sql: + :return: + """ + return _select(conn, sql).fetchall() + + +def count(conn: Connection, table): + """ + 统计数据量 + :param conn: + :param table: + :return: + """ + count_tpl = f'select count(1) from {table}' + return select_one(conn, count_tpl)[0] + + +def execute_update(conn: Connection, sql): + """ + 带事务执行,可用于insert update delete 语句 + :param conn: + :param sql: + :return: 受影响的行数,与java-jdbc的execute_update返回true|false相似,可用于判断是否执行成功 + """ + return _execute_with_tx(conn, sql) + + +def batch_insert(conn: Connection, db_type, sql_tpl, data, batch_size=1500): + """ + 批量插入 + :param conn: 数据库连接 + :param batch_size: 每次插入量 + :param db_type: 数据库类型 + :param sql_tpl: insert into t1 (f1,f2,f3) values %s + :param data: [(1,'tom',29),(2,'jack',30)] + :return: + """ + handler = BatchInsertHandler(db_type, sql_tpl, data, batch_size) + insert_sqls = handler.build_insert() + # 整个插入都在一个事务内 + row_count = 0 + try: + conn.begin() + for sql_set in insert_sqls: + rt = conn.execute(text(sql_set)) + row_count += rt.rowcount + conn.commit() + return row_count + except Exception as e: + conn.rollback() + raise SqlExecuteError(msg=f"批量插入异常,e={e}") + finally: + conn.close() + + +class BatchInsertHandler(object): + """ + 批量插入处理器 + oracle : + insert all + into oracle_table ( id, code ) values( 1 , '1' ) + into oracle_table ( id, code ) values( 2 , '2' ) + into oracle_table ( id, code ) values( 3 , '3' ) + into oracle_table ( id, code ) values( 4 , '4' ) + select 1 from dual ; + postgresql and mysql + into oracle_table ( id, code ) values( 1 , '1' ),( 2 , '2' ),( 3 , '3' ) + + """ + + BUILD_INSERT = { + 'oracle': 'build_oracle_insert', + 'postgresql': 'build_pg_insert', + 'mysql': 'build_mysql_insert' + } + + class NotSupportError(Exception): + def __init__(self, msg=''): + Exception.__init__(self, msg) + + def __init__(self, db_type, sql_tpl, data, batch_size): + """ + :param db_type: 数据库类型 + :param sql_tpl: insert into t1 (f1,f2,f3) values %s + :param data: [(1,'tom',29),(2,'jack',30)] + :param batch_size: + """ + if db_type not in ['oracle', 'postgresql']: + raise self.NotSupportError() + self.db_type = db_type + self.sql_tpl = sql_tpl + self.data = data + self.batch_size = batch_size + + def gen_batch_sql(self) -> []: + """ + 生成批量插入SQL + :return: + """ + pass + + def _split_data(self): + return split_coll(self.data, self.batch_size) + + def build_insert(self): + data_set = self._split_data() + sql_set = [] + for part in data_set: + sql_set.append(getattr(self, self.BUILD_INSERT[self.db_type])(part)) + return sql_set + + def build_oracle_insert(self, data_set): + begin = 'insert all \r ' + for ds in data_set: + val = '(' + for ele in ds: + val += "'" + ele + "'," if isinstance(ele, str) else str(ele) + ',' + val = val[0:-1] + ')' + begin += (self.sql_tpl.replace('%s', val) + ' \r ') + end = 'select 1 from dual' + return begin + end + + def build_pg_insert(self, data_set): + vals = '' + for ds in data_set: + val = '(' + for ele in ds: + val += "'" + ele + "'," if isinstance(ele, str) else str(ele) + ',' + val = val[0:-1] + ')' + vals += val + ',' + return self.sql_tpl.replace('%s', vals[0:-1]) + + def build_mysql_insert(self, data_set): + return self.build_pg_insert(data_set) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..ebb51d6 Binary files /dev/null and b/requirements.txt differ diff --git a/test/__init__.py b/test/__init__.py new file mode 100644 index 0000000..45994ff --- /dev/null +++ b/test/__init__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2023/4/4 17:37 +# @Author : old tom +# @File : __init__.py.py +# @Project : futool-db +# @Desc : diff --git a/test/test_connector_factory.py b/test/test_connector_factory.py new file mode 100644 index 0000000..188470d --- /dev/null +++ b/test/test_connector_factory.py @@ -0,0 +1,17 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2023/4/4 18:42 +# @Author : old tom +# @File : test_connector_factory.py +# @Project : futool-db +from unittest import TestCase +from fudb.connectors.connector_factory import ConnFactory +from fudb.dbapis.fu_db_api import select_one, count, select_all + + +# @Desc : +class TestConnFactory(TestCase): + def test_get_conn(self): + factory = ConnFactory('oracle', 'yngs_pro_constr', 'yngs_pro_constr', '172.16.1.36', 1523, 'testdb') + conn = factory.get_conn() + print(select_all(conn, 'select * from ODS_FINANCE_PLAN where rownum<=2')) diff --git a/test/test_dialect_connector.py b/test/test_dialect_connector.py new file mode 100644 index 0000000..0e5bdce --- /dev/null +++ b/test/test_dialect_connector.py @@ -0,0 +1,44 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2023/4/4 22:05 +# @Author : old tom +# @File : test_dialect_connector.py +# @Project : futool-db +from unittest import TestCase +from fudb.dbapis.fu_db_api import select_all, select_one, execute_update, batch_insert +from fudb.connectors.connector_factory import ConnFactory + +dds = [(4, 'zr4', 34), (5, 'zr5', 35), (6, 'zr6', 36), (7, 'zr7', 37), (8, 'zr8', 38), (9, 'zr9', 39), + (10, 'zr10', 40), (11, 'zr11', 41), (12, 'zr12', 42), (13, 'zr13', 43), (14, 'zr14', 44), (15, 'zr15', 45), + (16, 'zr16', 46), (17, 'zr17', 47), (18, 'zr18', 48), (19, 'zr19', 49), (20, 'zr20', 50), (21, 'zr21', 51), + (22, 'zr22', 52), (23, 'zr23', 53), (24, 'zr24', 54), (25, 'zr25', 55), (26, 'zr26', 56), (27, 'zr27', 57), + (28, 'zr28', 58), (29, 'zr29', 59), (30, 'zr30', 60), (31, 'zr31', 61), (32, 'zr32', 62), (33, 'zr33', 63), + (34, 'zr34', 64), (35, 'zr35', 65), (36, 'zr36', 66), (37, 'zr37', 67), (38, 'zr38', 68), (39, 'zr39', 69), + (40, 'zr40', 70), (41, 'zr41', 71), (42, 'zr42', 72), (43, 'zr43', 73), (44, 'zr44', 74), (45, 'zr45', 75), + (46, 'zr46', 76), (47, 'zr47', 77), (48, 'zr48', 78), (49, 'zr49', 79), (50, 'zr50', 80), (51, 'zr51', 81), + (52, 'zr52', 82), (53, 'zr53', 83), (54, 'zr54', 84), (55, 'zr55', 85), (56, 'zr56', 86), (57, 'zr57', 87), + (58, 'zr58', 88), (59, 'zr59', 89), (60, 'zr60', 90), (61, 'zr61', 91), (62, 'zr62', 92), (63, 'zr63', 93), + (64, 'zr64', 94), (65, 'zr65', 95), (66, 'zr66', 96), (67, 'zr67', 97), (68, 'zr68', 98), (69, 'zr69', 99), + (70, 'zr70', 100), (71, 'zr71', 101), (72, 'zr72', 102), (73, 'zr73', 103), (74, 'zr74', 104), + (75, 'zr75', 105), (76, 'zr76', 106), (77, 'zr77', 107), (78, 'zr78', 108), (79, 'zr79', 109), + (80, 'zr80', 110), (81, 'zr81', 111), (82, 'zr82', 112), (83, 'zr83', 113), (84, 'zr84', 114), + (85, 'zr85', 115), (86, 'zr86', 116), (87, 'zr87', 117), (88, 'zr88', 118), (89, 'zr89', 119), + (90, 'zr90', 120), (91, 'zr91', 121), (92, 'zr92', 122), (93, 'zr93', 123), (94, 'zr94', 124), + (95, 'zr95', 125), (96, 'zr96', 126), (97, 'zr97', 127), (98, 'zr98', 128), (99, 'zr99', 129), + (100, 'zr100', 130), (101, 'zr101', 131), (102, 'zr102', 132), (103, 'zr103', 133)] + + +# @Desc : +class TestPostgresqlConnector(TestCase): + def test_get_conn(self): + # factory = ConnFactory('postgresql', 'postgres', 'root@123', 'localhost', 5432, 'postgres') + factory = ConnFactory('oracle', 'zr', 'root@123', 'localhost', 1521, 'XE') + # factory2 = ConnFactory('postgresql', 'postgres', 'root@123', 'localhost', 5432, 'postgres') + # print(id(factory), id(factory2)) + conn = factory.get_conn() + # print(select_all(factory.get_conn(), 'select * from test limit 20')) + # print(select_all(factory.get_conn(), 'select * from t_user limit 20')) + # conn.close() + # execute_update(conn, 'update t_user set age=100 where id=3') + # print(execute_update(conn, 'delete from t_user where id=3')) + print(batch_insert(conn, 'oracle', 'into t_user (id,name,age) values %s', dds, 10)) diff --git a/test/test_oracle_connector.py b/test/test_oracle_connector.py new file mode 100644 index 0000000..fe9342f --- /dev/null +++ b/test/test_oracle_connector.py @@ -0,0 +1,20 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2023/4/4 17:38 +# @Author : old tom +# @File : test_oracle_connector.py +# @Project : futool-db +from unittest import TestCase +from fudb.connectors.dialect.dialect_connector import OracleConnector +from fudb.dbapis.fu_db_api import select_all + + +# @Desc : +class TestOracleConnector(TestCase): + def test_get_conn(self): + oc = OracleConnector('yngs_pro_constr', 'yngs_pro_constr', '172.16.1.36', 1523, 'testdb') + oc2 = OracleConnector('yngs_pro_constr', 'yngs_pro_constr', '172.16.1.36', 1523, 'testdb') + print(oc is oc2) + # conn = oc.get_conn() + # print(select_all(conn, 'select * from ODS_FINANCE_PLAN where rownum<=100')) + # # print(count(conn, 'ODS_FINANCE_PLAN'))