diff --git a/executor/sql_executor.py b/executor/sql_executor.py index 4179c6a..4255832 100644 --- a/executor/sql_executor.py +++ b/executor/sql_executor.py @@ -24,10 +24,30 @@ class SQLExecutor(object): return text(sql) def query(self, sql) -> CursorResult: + """ + 关于auto begin官方文档给出的解释 https://docs.sqlalchemy.org/en/20/core/connections.html#commit-as-you-go + auto begin会导致出现: can't call begin() here unless rollback() or commit() is called first. + """ try: return self._conn.execute(self._format_sql(sql)) except Exception as e: raise SQLExecutorError(msg=f'{e}') + finally: + self._conn.commit() + + def query_by_stream(self, sql, stream_size) -> CursorResult: + """ + 流式查询 + :param sql: sql + :param stream_size: 每次返回数据量 + :return: + """ + try: + return self._conn.execution_options(yield_per=stream_size).execute(self._format_sql(sql)) + except Exception as e: + raise SQLExecutorError(msg=f'{e}') + finally: + self._conn.commit() def execute_update(self, sql) -> int: try: @@ -45,7 +65,8 @@ class SQLExecutor(object): ) :return: """ - return self._conn.exec_driver_sql(sql_template, data, execution_options={'insertmanyvalues_page_size': batch_size}) + return self._conn.exec_driver_sql(sql_template, data, + execution_options={'insertmanyvalues_page_size': batch_size}) def get_connection(self): return self._conn diff --git a/session/session.py b/session/session.py index 3798be3..370dede 100644 --- a/session/session.py +++ b/session/session.py @@ -50,32 +50,6 @@ class SqlSessionCache(object): pass -class SqlsessionFactory(object): - """ - 工厂模式创建sqlSession - """ - - def __init__(self, engine: Engine): - self._engine = engine - self.cache = SqlSessionCache(self.create_session) - - def open_session(self): - """ - 从缓存获取,如果没有会自动调用create_session创建 - """ - return self.cache() - - def create_session(self): - try: - conn = self._engine.connect() - tx_factory = TransactionFactory(conn) - tx = tx_factory.create_transaction() - executor = SQLExecutor(tx) - return Sqlsession(executor, self.cache) - except Exception as e: - raise CreateSessionError(msg=f'创建sqlSession异常,e={e}') - - class Sqlsession(object): def __init__(self, executor: SQLExecutor, session_cache: SqlSessionCache): self.executor = executor @@ -90,6 +64,19 @@ class Sqlsession(object): def select_all(self, sql): return self.executor.query(sql).fetchall() + def select_by_stream(self, sql, stream_size): + """ + 流式查询,循环取数据 + :param sql: + :param stream_size: 每次返回量 + for partition in result.partitions(): + # partition is an iterable that will be at most stream_size items + for row in partition: + print(f"{row}") + :return: + """ + return self.executor.query_by_stream(sql, stream_size) + def insert(self, sql): return self._execute_with_tx(sql) @@ -152,3 +139,29 @@ class Sqlsession(object): """ self.executor.get_connection().close() self.session_cache.remove_session() + + +class SqlsessionFactory(object): + """ + 工厂模式创建sqlSession + """ + + def __init__(self, engine: Engine): + self._engine = engine + self.cache = SqlSessionCache(self.create_session) + + def open_session(self) -> Sqlsession: + """ + 从缓存获取,如果没有会自动调用create_session创建 + """ + return self.cache() + + def create_session(self) -> Sqlsession: + try: + conn = self._engine.connect() + tx_factory = TransactionFactory(conn) + tx = tx_factory.create_transaction() + executor = SQLExecutor(tx) + return Sqlsession(executor, self.cache) + except Exception as e: + raise CreateSessionError(msg=f'创建sqlSession异常,e={e}')