#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2023/6/22 21:55 # @Author : old tom # @File : sql_executor.py # @Project : futool-db-lite # @Desc : 获取游标,执行SQL from sqlalchemy import text, CursorResult from transaction.connect_transaction import Transaction class SQLExecutorError(Exception): def __init__(self, msg): Exception.__init__(self, msg) class SQLExecutor(object): def __init__(self, tx: Transaction): self._tx = tx self._conn = tx.get_connection() @staticmethod def _format_sql(sql): return text(sql) def query(self, sql) -> CursorResult: try: return self._conn.execute(self._format_sql(sql)) except Exception as e: raise SQLExecutorError(msg=f'{e}') def execute_update(self, sql) -> int: try: rt = self._conn.execute(self._format_sql(sql)) return rt.rowcount except Exception as e: raise SQLExecutorError(msg=f'{e}') def execute_batch(self, sql_template, data, batch_size=1000) -> int: """ 批量执行 conn.exec_driver_sql( "INSERT INTO table (id, value) VALUES (:id, :value)", [{"id":1, "value":"v1"}, {"id":2, "value":"v2"}] ) :return: """ return self._conn.exec_driver_sql(sql_template, data, execution_options={'insertmanyvalues_page_size': batch_size}) def get_connection(self): return self._conn def get_transaction(self): return self._tx