|
|
#!/usr/bin/env python
|
|
|
# -*- coding: utf-8 -*-
|
|
|
# @Time : 2023/4/2 9:08
|
|
|
# @Author : old tom
|
|
|
# @File : fu_excel.py
|
|
|
# @Project : futool
|
|
|
# @Desc : 读写excel,需要引入openPyxl
|
|
|
|
|
|
from futool.core import fu_file
|
|
|
from openpyxl import load_workbook, Workbook
|
|
|
|
|
|
|
|
|
class ExcelNotFoundError(Exception):
|
|
|
"""
|
|
|
excel文件不存在
|
|
|
"""
|
|
|
|
|
|
def __init__(self, msg=''):
|
|
|
Exception.__init__(self, msg)
|
|
|
|
|
|
|
|
|
class SheetNotExistError(Exception):
|
|
|
"""
|
|
|
sheet不存在
|
|
|
"""
|
|
|
|
|
|
def __init__(self, msg=''):
|
|
|
Exception.__init__(self, msg)
|
|
|
|
|
|
|
|
|
class ExcelReader(object):
|
|
|
"""
|
|
|
excel读取
|
|
|
TODO 存在性能问题需要重构;增加流式读取功能;sheet分片方式存在问题
|
|
|
"""
|
|
|
|
|
|
class SheetNotLoadError(Exception):
|
|
|
def __init__(self):
|
|
|
Exception.__init__(self, 'sheet not load,you need call load_sheet()')
|
|
|
|
|
|
class OutOfColIndexError(Exception):
|
|
|
def __init__(self, msg):
|
|
|
Exception.__init__(self, msg)
|
|
|
|
|
|
def __init__(self, file_path):
|
|
|
if not fu_file.exist(file_path):
|
|
|
raise ExcelNotFoundError(msg=f'path={file_path}')
|
|
|
self.wb = load_workbook(file_path, read_only=True)
|
|
|
self.sheetnames = self.wb.sheetnames
|
|
|
self.sheet = None
|
|
|
self.col_nums = 0
|
|
|
|
|
|
def load_sheet(self, sheet_name: str):
|
|
|
"""
|
|
|
指定名称读取sheet
|
|
|
"""
|
|
|
if sheet_name not in self.sheetnames:
|
|
|
raise SheetNotExistError(msg=f'{sheet_name} not exists')
|
|
|
self.sheet = self.wb[sheet_name]
|
|
|
return self.sheet
|
|
|
|
|
|
def load_sheet_by_index(self, sheet_index: int):
|
|
|
"""
|
|
|
指定下标读取sheet,从0开始
|
|
|
"""
|
|
|
return self.load_sheet(self.sheetnames[sheet_index])
|
|
|
|
|
|
def load_sheet_first(self):
|
|
|
"""
|
|
|
加载第一个sheet
|
|
|
:return:
|
|
|
"""
|
|
|
return self.load_sheet_by_index(0)
|
|
|
|
|
|
def read_row(self, row_index):
|
|
|
"""
|
|
|
读取某一行
|
|
|
"""
|
|
|
self._check_sheet()
|
|
|
element = []
|
|
|
for i, row in enumerate(self.sheet):
|
|
|
if i == row_index:
|
|
|
for cell in row:
|
|
|
element.append(cell.value)
|
|
|
return element
|
|
|
|
|
|
def read_range_rows(self, start, end):
|
|
|
"""
|
|
|
范围读取
|
|
|
"""
|
|
|
self._check_sheet()
|
|
|
elements = []
|
|
|
for i, row in enumerate(self.sheet):
|
|
|
if start <= i <= end:
|
|
|
element = []
|
|
|
for cell in row:
|
|
|
element.append(cell.value)
|
|
|
elements.append(tuple(element))
|
|
|
return elements
|
|
|
|
|
|
def read_rows(self, row_index: [] = None):
|
|
|
"""
|
|
|
指定读取多行
|
|
|
:param row_index: 行号,例如:[1,3,5]
|
|
|
:return:
|
|
|
"""
|
|
|
self._check_sheet()
|
|
|
elements = []
|
|
|
for i, row in enumerate(self.sheet):
|
|
|
if i in row_index:
|
|
|
element = []
|
|
|
for cell in row:
|
|
|
element.append(cell.value)
|
|
|
elements.append(tuple(element))
|
|
|
return elements
|
|
|
|
|
|
def read_first(self):
|
|
|
"""
|
|
|
读取标题行
|
|
|
"""
|
|
|
return self.read_row(0)
|
|
|
|
|
|
def read_all(self, skip_head=True):
|
|
|
"""
|
|
|
读取全部
|
|
|
:return:
|
|
|
"""
|
|
|
self._check_sheet()
|
|
|
elements = []
|
|
|
for i, row in enumerate(self.sheet):
|
|
|
if skip_head:
|
|
|
if i > 0:
|
|
|
element = []
|
|
|
for cell in row:
|
|
|
element.append(cell.value)
|
|
|
elements.append(tuple(element))
|
|
|
return elements
|
|
|
|
|
|
def read_column(self, col_index, skip_head=True):
|
|
|
"""
|
|
|
按列读取
|
|
|
:param col_index: 列下标,从0开始
|
|
|
:param skip_head: 跳过第一行
|
|
|
:return: list
|
|
|
"""
|
|
|
self._check_sheet()
|
|
|
element = []
|
|
|
for i, row in enumerate(self.sheet):
|
|
|
if skip_head:
|
|
|
if i > 0:
|
|
|
for j, cell in enumerate(row):
|
|
|
if j == col_index:
|
|
|
element.append(cell.value)
|
|
|
return element
|
|
|
|
|
|
def read_range_column(self, start, end, skip_head=True):
|
|
|
"""
|
|
|
范围读取列,下标从0开始
|
|
|
:param start: 开始下标
|
|
|
:param end: 结束下标
|
|
|
:param skip_head: 跳过第一行
|
|
|
:return:
|
|
|
"""
|
|
|
self._col_nums()
|
|
|
if end >= self.col_nums:
|
|
|
raise self.OutOfColIndexError(msg=f'out of column index,max col was {self.col_nums} ')
|
|
|
elements = []
|
|
|
for i in range(start, end + 1):
|
|
|
elements.append(tuple(self.read_column(i, skip_head)))
|
|
|
return elements
|
|
|
|
|
|
def _check_sheet(self):
|
|
|
if self.sheet is None:
|
|
|
raise self.SheetNotLoadError()
|
|
|
|
|
|
def _col_nums(self):
|
|
|
self.col_nums = len(self.read_first())
|
|
|
|
|
|
|
|
|
class SimpleExcelWriter(object):
|
|
|
"""
|
|
|
excel写入
|
|
|
"""
|
|
|
|
|
|
class ExcelFileExistsError(Exception):
|
|
|
"""
|
|
|
excel文件已存在
|
|
|
"""
|
|
|
|
|
|
def __init__(self, msg=''):
|
|
|
Exception.__init__(self, msg)
|
|
|
|
|
|
def __init__(self, write_path):
|
|
|
if fu_file.exist(write_path):
|
|
|
raise self.ExcelFileExistsError(f'{write_path} all ready exists')
|
|
|
self.write_path = write_path
|
|
|
self.wb = Workbook(write_only=True)
|
|
|
|
|
|
def write(self, head: [], data: [], sheet_name='Sheet1', index=0):
|
|
|
"""
|
|
|
写excel文件
|
|
|
:param index: sheet 下标
|
|
|
:param head: 第一行标题
|
|
|
:param data: 数据
|
|
|
:param sheet_name: sheet名称
|
|
|
:return:
|
|
|
"""
|
|
|
ws = self.wb.create_sheet(title=sheet_name, index=index)
|
|
|
# 写入列头
|
|
|
ws.append(head)
|
|
|
for d in data:
|
|
|
ws.append(d)
|
|
|
self.wb.save(self.write_path)
|