From f0bf23da6f0a30e68f27cb3da9f5e2c30d3afa9a Mon Sep 17 00:00:00 2001 From: old-tom <892955278@msn.cn> Date: Wed, 23 Aug 2023 14:22:49 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=89=B9=E9=87=8F=E6=8F=92=E5=85=A5?= =?UTF-8?q?=E6=96=B9=E6=B3=95=E5=B0=81=E8=A3=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- db-conf.toml | 2 +- executor/sql_executor.py | 15 +++++++++++---- session/engine/dbengine.py | 4 +++- session/session.py | 33 +++++++++++++++++++++++++++------ 4 files changed, 42 insertions(+), 12 deletions(-) diff --git a/db-conf.toml b/db-conf.toml index fb5760e..c5dc1ab 100644 --- a/db-conf.toml +++ b/db-conf.toml @@ -16,4 +16,4 @@ pool_size = 15 # 连接超时回收(秒) pool_recycle = 3600 # 打印SQL -show_sql = true \ No newline at end of file +show_sql = false \ No newline at end of file diff --git a/executor/sql_executor.py b/executor/sql_executor.py index 838b5e0..4179c6a 100644 --- a/executor/sql_executor.py +++ b/executor/sql_executor.py @@ -6,7 +6,6 @@ # @Project : futool-db-lite # @Desc : 获取游标,执行SQL from sqlalchemy import text, CursorResult - from transaction.connect_transaction import Transaction @@ -32,14 +31,22 @@ class SQLExecutor(object): def execute_update(self, sql) -> int: try: - self._tx.begin_transaction() rt = self._conn.execute(self._format_sql(sql)) - self._tx.commit() return rt.rowcount except Exception as e: - self._tx.rollback() raise SQLExecutorError(msg=f'{e}') + def execute_batch(self, sql_template, data, batch_size=1000) -> int: + """ + 批量执行 + conn.exec_driver_sql( + "INSERT INTO table (id, value) VALUES (:id, :value)", + [{"id":1, "value":"v1"}, {"id":2, "value":"v2"}] + ) + :return: + """ + return self._conn.exec_driver_sql(sql_template, data, execution_options={'insertmanyvalues_page_size': batch_size}) + def get_connection(self): return self._conn diff --git a/session/engine/dbengine.py b/session/engine/dbengine.py index b7bcb37..c4d5e6a 100644 --- a/session/engine/dbengine.py +++ b/session/engine/dbengine.py @@ -53,7 +53,9 @@ class DatabaseEngine(object): DB_URL = { 'postgresql': 'postgresql+psycopg2://{0}:{1}@{2}:{3}/{4}', # 使用thin客户端,不需要oracle client - 'oracle': 'oracle+oracledb://{0}:{1}@{2}:{3}/?service_name={4}' + 'oracle': 'oracle+oracledb://{0}:{1}@{2}:{3}/?service_name={4}', + # arm下没连接成功,需要x86版本python + 'sqlserver': 'mssql+pymssql://{0}:{1}@{2}/{4}?charset=CP936' } # 连接池容器名称,具体看ObjContainer对象 attr = 'engines' diff --git a/session/session.py b/session/session.py index ef15731..3798be3 100644 --- a/session/session.py +++ b/session/session.py @@ -6,6 +6,8 @@ # @Project : futool-db-lite # @Desc : import threading + +import sqlalchemy from sqlalchemy import Engine from executor.sql_executor import SQLExecutor from transaction.connect_transaction import TransactionFactory @@ -89,19 +91,38 @@ class Sqlsession(object): return self.executor.query(sql).fetchall() def insert(self, sql): - return self.executor.execute_update(sql) + return self._execute_with_tx(sql) - def insert_batch(self, sql): + def insert_batch(self, sql_template, data, batch_size=1000): """ - todo implement + 批量插入实现 + :param data: [{"id":1, "value":"v1"}, {"id":2, "value":"v2"}] 或者 [(1,'v1'),(2,'v2')] + :param sql_template: "INSERT INTO table (id, value) VALUES (:id, :value)" + :param batch_size: 批量插入size + :return: """ - pass + if data and isinstance(data, list) and len(data) > 0: + # 特殊处理SQLAlchemy row对象 + if isinstance(data[0], sqlalchemy.engine.row.Row): + # 转为元组 + data = [tuple(x) for x in data] + return self.executor.execute_batch(sql_template, data, batch_size) def update(self, sql): - return self.executor.execute_update(sql) + return self._execute_with_tx(sql) def delete(self, sql): - return self.executor.execute_update(sql) + return self._execute_with_tx(sql) + + def _execute_with_tx(self, sql): + try: + self.begin_transaction() + rt = self.executor.execute_update(sql) + self.commit() + return rt + except Exception as e: + self.rollback() + print(e) def begin_transaction(self): """