#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2023/6/22 21:55 # @Author : old tom # @File : sql_executor.py # @Project : futool-db-lite # @Desc : 获取游标,执行SQL from sqlalchemy import text, CursorResult from db.transaction.connect_transaction import Transaction class SQLExecutorError(Exception): def __init__(self, msg): Exception.__init__(self, msg) class SQLExecutor(object): def __init__(self, tx: Transaction): self._tx = tx self._conn = tx.get_connection() @staticmethod def _format_sql(sql): return text(sql) def query(self, sql) -> CursorResult: try: return self._conn.execute(self._format_sql(sql)) except Exception as e: raise SQLExecutorError(msg=f'{e}') def execute_update(self, sql) -> int: try: self._tx.begin_transaction() rt = self._conn.execute(self._format_sql(sql)) self._tx.commit() return rt.rowcount except Exception as e: self._tx.rollback() raise SQLExecutorError(msg=f'{e}') def get_connection(self): return self._conn def get_transaction(self): return self._tx