You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

214 lines
5.7 KiB

2 years ago
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/4/2 9:08
# @Author : old tom
# @File : fu_excel.py
# @Project : futool
# @Desc : 读写excel,需要引入openPyxl
from futool.core import fu_file
from openpyxl import load_workbook, Workbook
class ExcelNotFoundError(Exception):
"""
excel文件不存在
"""
def __init__(self, msg=''):
Exception.__init__(self, msg)
class SheetNotExistError(Exception):
"""
sheet不存在
"""
def __init__(self, msg=''):
Exception.__init__(self, msg)
class ExcelReader(object):
"""
excel读取
TODO 存在性能问题需要重构;增加流式读取功能;sheet分片方式存在问题
"""
class SheetNotLoadError(Exception):
def __init__(self):
Exception.__init__(self, 'sheet not load,you need call load_sheet()')
class OutOfColIndexError(Exception):
def __init__(self, msg):
Exception.__init__(self, msg)
def __init__(self, file_path):
if not fu_file.exist(file_path):
raise ExcelNotFoundError(msg=f'path={file_path}')
self.wb = load_workbook(file_path, read_only=True)
self.sheetnames = self.wb.sheetnames
self.sheet = None
self.col_nums = 0
def load_sheet(self, sheet_name: str):
"""
指定名称读取sheet
"""
if sheet_name not in self.sheetnames:
raise SheetNotExistError(msg=f'{sheet_name} not exists')
self.sheet = self.wb[sheet_name]
return self.sheet
def load_sheet_by_index(self, sheet_index: int):
"""
指定下标读取sheet,从0开始
"""
return self.load_sheet(self.sheetnames[sheet_index])
def load_sheet_first(self):
"""
加载第一个sheet
:return:
"""
return self.load_sheet_by_index(0)
def read_row(self, row_index):
"""
读取某一行
"""
self._check_sheet()
element = []
for i, row in enumerate(self.sheet):
if i == row_index:
for cell in row:
element.append(cell.value)
return element
def read_range_rows(self, start, end):
"""
范围读取
"""
self._check_sheet()
elements = []
for i, row in enumerate(self.sheet):
if start <= i <= end:
element = []
for cell in row:
element.append(cell.value)
elements.append(tuple(element))
return elements
def read_rows(self, row_index: [] = None):
"""
指定读取多行
:param row_index: 行号,例如:[1,3,5]
:return:
"""
self._check_sheet()
elements = []
for i, row in enumerate(self.sheet):
if i in row_index:
element = []
for cell in row:
element.append(cell.value)
elements.append(tuple(element))
return elements
def read_first(self):
"""
读取标题行
"""
return self.read_row(0)
def read_all(self, skip_head=True):
"""
读取全部
:return:
"""
self._check_sheet()
elements = []
for i, row in enumerate(self.sheet):
if skip_head:
if i > 0:
element = []
for cell in row:
element.append(cell.value)
elements.append(tuple(element))
return elements
def read_column(self, col_index, skip_head=True):
"""
按列读取
:param col_index: 列下标从0开始
:param skip_head: 跳过第一行
:return: list
"""
self._check_sheet()
element = []
for i, row in enumerate(self.sheet):
if skip_head:
if i > 0:
for j, cell in enumerate(row):
if j == col_index:
element.append(cell.value)
return element
def read_range_column(self, start, end, skip_head=True):
"""
范围读取列,下标从0开始
:param start: 开始下标
:param end: 结束下标
:param skip_head: 跳过第一行
:return:
"""
self._col_nums()
if end >= self.col_nums:
raise self.OutOfColIndexError(msg=f'out of column index,max col was {self.col_nums} ')
elements = []
for i in range(start, end + 1):
elements.append(tuple(self.read_column(i, skip_head)))
return elements
def _check_sheet(self):
if self.sheet is None:
raise self.SheetNotLoadError()
def _col_nums(self):
self.col_nums = len(self.read_first())
class SimpleExcelWriter(object):
"""
excel写入
"""
class ExcelFileExistsError(Exception):
"""
excel文件已存在
"""
def __init__(self, msg=''):
Exception.__init__(self, msg)
def __init__(self, write_path):
if fu_file.exist(write_path):
raise self.ExcelFileExistsError(f'{write_path} all ready exists')
self.write_path = write_path
self.wb = Workbook(write_only=True)
def write(self, head: [], data: [], sheet_name='Sheet1', index=0):
"""
写excel文件
:param index: sheet 下标
:param head: 第一行标题
:param data: 数据
:param sheet_name: sheet名称
:return:
"""
ws = self.wb.create_sheet(title=sheet_name, index=index)
# 写入列头
ws.append(head)
for d in data:
ws.append(d)
self.wb.save(self.write_path)