You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

76 lines
2.3 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/6/22 21:55
# @Author : old tom
# @File : sql_executor.py
# @Project : futool-db-lite
# @Desc : 获取游标,执行SQL
from sqlalchemy import text, CursorResult
from transaction.connect_transaction import Transaction
class SQLExecutorError(Exception):
def __init__(self, msg):
Exception.__init__(self, msg)
class SQLExecutor(object):
def __init__(self, tx: Transaction):
self._tx = tx
self._conn = tx.get_connection()
@staticmethod
def _format_sql(sql):
return text(sql)
def query(self, sql) -> CursorResult:
"""
关于auto begin官方文档给出的解释 https://docs.sqlalchemy.org/en/20/core/connections.html#commit-as-you-go
auto begin会导致出现 can't call begin() here unless rollback() or commit() is called first.
"""
try:
return self._conn.execute(self._format_sql(sql))
except Exception as e:
raise SQLExecutorError(msg=f'{e}')
finally:
self._conn.commit()
def query_by_stream(self, sql, stream_size) -> CursorResult:
"""
流式查询
:param sql: sql
:param stream_size: 每次返回数据量
:return:
"""
try:
return self._conn.execution_options(yield_per=stream_size).execute(self._format_sql(sql))
except Exception as e:
raise SQLExecutorError(msg=f'{e}')
finally:
self._conn.commit()
def execute_update(self, sql) -> int:
try:
rt = self._conn.execute(self._format_sql(sql))
return rt.rowcount
except Exception as e:
raise SQLExecutorError(msg=f'{e}')
def execute_batch(self, sql_template, data, batch_size=1000) -> int:
"""
批量执行
conn.exec_driver_sql(
"INSERT INTO table (id, value) VALUES (:id, :value)",
[{"id":1, "value":"v1"}, {"id":2, "value":"v2"}]
)
:return:
"""
return self._conn.exec_driver_sql(sql_template, data,
execution_options={'insertmanyvalues_page_size': batch_size})
def get_connection(self):
return self._conn
def get_transaction(self):
return self._tx