Compare commits

...

No commits in common. 'master' and 'main' have entirely different histories.
master ... main

4
.gitignore vendored

@ -3,7 +3,6 @@
__pycache__/
*.py[cod]
*$py.class
*.pyc
# C extensions
*.so
@ -159,4 +158,5 @@ cython_debug/
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
.idea/
#.idea/

@ -0,0 +1,9 @@
MIT License
Copyright (c) <year> <copyright holders>
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

@ -1,11 +0,0 @@
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"
[packages]
[dev-packages]
[requires]
python_version = "3.8"

20
Pipfile.lock generated

@ -1,20 +0,0 @@
{
"_meta": {
"hash": {
"sha256": "7f7606f08e0544d8d012ef4d097dabdd6df6843a28793eb6551245d4b2db4242"
},
"pipfile-spec": 6,
"requires": {
"python_version": "3.8"
},
"sources": [
{
"name": "pypi",
"url": "https://pypi.org/simple",
"verify_ssl": true
}
]
},
"default": {},
"develop": {}
}

@ -0,0 +1,3 @@
# futool
python工具包

@ -1,144 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2022/9/12 21:03
# @Author : old tom
# @File : fh_file_path.py
# @Project : Futool
# @Desc : 文件路径
import pathlib
import os
import typing
class PathNotExistError(Exception):
"""
路径不存在
"""
def __init__(self, msg=''):
Exception.__init__(self, msg)
def resolve_2_abs(path) -> pathlib.Path:
"""
相对路径解析为绝对路径
:param path:
:return:
"""
return pathlib.Path(path).resolve()
def isabs(path):
"""
是否绝对路径
:param path:
:return:
"""
return os.path.isabs(path)
def exist(path):
"""
文件或文件夹是否存在
:param path:
:return:
"""
return os.path.exists(path)
def rm_dir(dir_path):
"""
删除文件夹,此目录必须为空
:param dir_path:
:return:
"""
pathlib.Path(dir_path).rmdir()
def find_file_by_pattern(path, pattern) -> typing.Generator:
"""
根据匹配规则查找文件
:param path: 路径
:param pattern: 所有txt,*.txt
:return:
"""
return pathlib.Path(path).rglob(pattern)
def loop_mk_dir(dir_path, mode=0o777, exist_ok=False):
"""
创建文件夹及其子路径
:param dir_path: 文件夹路径
:param mode: 权限
:param exist_ok: 是否覆盖
:return:
"""
pathlib.Path(dir_path).mkdir(parents=True, mode=mode, exist_ok=exist_ok)
def loop_dir(dir_path, file_container: list, filter_fun=None):
"""
递归文件夹
:param dir_path:
:param file_container: 路径容器
:param filter_fun: 自定义过滤
:return:
"""
if not exist(dir_path):
raise PathNotExistError('目标文件夹不存在')
file_list = os.listdir(dir_path)
for f in file_list:
full_path = os.path.join(dir_path, f)
if os.path.isdir(full_path):
loop_dir(full_path, file_container, filter_fun)
else:
if filter_fun:
if filter_fun(full_path):
file_container.append(full_path)
else:
file_container.append(full_path)
return file_container
def is_windows_path(path) -> bool:
"""
是否windows路径
:param path:
:return:
"""
return len(str(pathlib.Path(path).drive).rstrip()) > 0
def parent_path_str(path) -> str:
"""
父级路径
:param path:
:return:
"""
return str(parent_path(path))
def parent_path(path) -> pathlib.Path:
"""
父级路径
:param path:
:return:
"""
return pathlib.Path(path).parent
def pwd():
"""
当前路径
:return:
"""
return pathlib.Path().cwd()
def home():
"""
home路径
:return:
"""
return pathlib.Path().home()

@ -1,273 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2022/9/10 10:17
# @Author : old tom
# @File : fu_date.py
# @Project : Futool
# @Desc : 日期时间工具
import time
from datetime import datetime, date, timedelta
import calendar
# 毫秒单位
MILLISECOND_UNIT = 1000
# 1小时3600秒
ONE_HOUR_SECOND = 3600
# 上午下午分界
AM_PM_DIV = 11
# 一周7天
ONE_WEEK_DAYS = 7
# 日期时间格式化转换
FMT_MAPPING = {
'yyyy-MM-dd': '%Y-%m-%d',
'yyyyMMdd': '%Y%m%d',
'yyyy-MM-dd hh:mm:ss': '%Y-%m-%d %I:%M:%S',
'yyyy-MM-dd hh24:mm:ss': '%Y-%m-%d %H:%M:%S',
'yyyy-MM-dd HH:mm:ss': '%Y-%m-%d %H:%M:%S'
}
def current_year() -> int:
"""
本年
:return:
"""
return datetime.now().year
def current_month() -> int:
"""
本月
:return:
"""
return datetime.now().month
def current_day() -> int:
return datetime.now().day
def current_date() -> str:
return str(datetime.now().date())
def current_datetime(fmt='yyyy-MM-dd hh24:mm:ss') -> str:
"""
获取当前日期时间
:param fmt:
:return:
"""
return datetime.now().strftime(FMT_MAPPING[fmt])
def current_time(fmt='s') -> int:
"""
获取当前时间绝对秒
:param fmt: s: ms:毫秒 ns:纳秒
:return:
"""
time_jar = {
's': int(time.time()),
'ms': int(time.time() * MILLISECOND_UNIT),
'ns': time.time_ns()
}
return time_jar[fmt]
def current_timestamp():
"""
当前时间戳
:return:
"""
return datetime.now().timestamp()
def format_datetime_str(dt: str, fmt='yyyy-MM-dd hh24:mm:ss') -> datetime:
"""
格式化日期时间字符串
:param dt: 日期时间字符串
:param fmt: 格式化yyyy-MM-dd
:return: datetime对象
"""
return datetime.strptime(dt, FMT_MAPPING[fmt])
def format_date_str(dt: str, fmt='yyyy-MM-dd') -> date:
"""
格式化日期字符串
:param dt: 日期字符串
:param fmt: yyyy-MM-dd
:return:
"""
return datetime.strptime(dt, FMT_MAPPING[fmt]).date()
def datetime_2_second(dt: str, fmt='yyyy-MM-dd hh24:mm:ss') -> int:
"""
日期时间字符串转绝对秒
:param dt: 日期时间字符串 yyyy-MM-dd hh:mm:ss 格式
:param fmt: 格式化函数
:return:
"""
return int(format_datetime_str(dt, fmt).timestamp())
def sec_2_datatime(sec_time: int, fmt='yyyy-MM-dd hh24:mm:ss') -> str:
"""
绝对秒转日期时间
:param sec_time:
:param fmt: 格式化函数
:return:
"""
timed = time.localtime(sec_time)
return time.strftime(FMT_MAPPING[fmt], timed)
def is_leap(year: int) -> bool:
"""
是否闰年
:param year:
:return:
"""
return calendar.isleap(year)
def begin_of_week(date_str: str, fmt='yyyy-MM-dd') -> str:
"""
周开始日期
:param date_str:
:param fmt: 格式化
:return:
"""
formated_dt = format_date_str(date_str, fmt)
week_idx = weekday(date_str, fmt)
return date_str if week_idx == 0 else str(formated_dt - timedelta(days=week_idx))
def end_of_week(date_str: str, fmt='yyyy-MM-dd') -> str:
"""
周结束日期
:param date_str:
:param fmt: 格式化
:return:
"""
formated_dt = format_date_str(date_str, fmt)
week_idx = weekday(date_str, fmt)
return date_str if week_idx == 6 else str(formated_dt + timedelta(days=(6 - week_idx)))
def end_of_month(y, m) -> int:
"""
月结束日期
:param y
:param m
:return:
"""
return calendar.monthrange(y, m)[1]
def weekday(date_str: str, fmt='yyyy-MM-dd') -> int:
"""
返回日期是周几
:param date_str:
:param fmt: 格式化
:return: 0-7 ,0:周一
"""
fmted_date = format_datetime_str(date_str, fmt)
return calendar.weekday(fmted_date.year, fmted_date.month, fmted_date.day)
def age(birth: str, compare_date: str, fmt='yyyy-MM-dd') -> int:
"""
年龄计算
:param birth: 生日
:param compare_date: 被比较日期
:param fmt: 日期格式化 yyyy-MM-dd|yyyyMMdd
:return:
"""
fmt_birth = format_date_str(birth, fmt)
fmt_compare = format_date_str(compare_date, fmt)
birth_m = fmt_birth.replace(year=fmt_compare.year)
return fmt_compare.year - fmt_birth.year if fmt_compare > birth_m else fmt_compare.year - fmt_birth.year - 1
def age_of_now(birth: str) -> int:
"""
当前年龄
:param birth: 出生日期
:return:
"""
return age(birth, current_datetime('yyyy-MM-dd'))
def between(dt_1: str, dt_2: str, fmt='yyyy-MM-dd', time_unit='day') -> int:
"""
计算两个时间差
:param dt_1: 时间1 日期或日期时间
:param dt_2: 时间2 日期或日期时间
:param fmt: 格式化
:param time_unit: 时间单位 day: ,hour 小时 ,minute分钟,second
:return:
"""
fmt_dt1, fmt_dt2 = format_datetime_str(dt_1, fmt), format_datetime_str(dt_2, fmt)
return abs({
'day': (fmt_dt1 - fmt_dt2).days,
'hour': int(fmt_dt1.timestamp() - fmt_dt2.timestamp()) / ONE_HOUR_SECOND,
'second': int(fmt_dt1.timestamp() - fmt_dt2.timestamp())
}[time_unit])
def time_offset(start_dt: str, offset: int, fmt='yyyy-MM-dd HH:mm:ss', time_unit='day') -> datetime:
"""
时间偏移计算计算相隔N天后的日期
:param start_dt: 开始日期日期时间
:param fmt: 时间格式化
:param offset: 偏移量支持正负数
:param time_unit: 偏移量单位
:return:
"""
fmt_dt = format_datetime_str(start_dt, fmt)
return {'day': fmt_dt + timedelta(days=offset),
'hour': fmt_dt + timedelta(hours=offset),
'second': fmt_dt + timedelta(seconds=offset)}[time_unit]
def is_am(dt: str, fmt='yyyy-MM-dd HH:mm:ss') -> bool:
"""
是否上午
:param dt:
:param fmt:
:return:
"""
fmt_dt = format_datetime_str(dt, fmt)
return fmt_dt.hour <= AM_PM_DIV
def is_pm(dt: str, fmt='yyyy-MM-dd HH:mm:ss') -> bool:
"""
是否下午
:param dt:
:param fmt:
:return:
"""
fmt_dt = format_datetime_str(dt, fmt)
return fmt_dt.hour > AM_PM_DIV
def next_week(fmt='yyyy-MM-dd HH:mm:ss') -> datetime:
"""
下周同一天
:return:
"""
now = current_datetime(fmt)
return time_offset(now, ONE_WEEK_DAYS, fmt)
def next_month() -> datetime:
"""
下个月同一天
:return:
"""
return datetime.now().replace(month=current_month() + 1)

@ -1,230 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2022/9/10 10:45
# @Author : old tom
# @File : fu_file.py
# @Project : Futool
# @Desc : 文件操作
import shutil
import os
import hashlib
import zipfile
from pathlib import Path
from futool.core.fh_file_path import loop_mk_dir, parent_path, exist
from enum import Enum
class FileSuffix(Enum):
"""
常见文件后缀
"""
XLSX = '.xlsx'
XLS = '.xls'
DOC = '.doc'
DOCX = '.docx'
PPT = '.ppt'
PPTX = '.pptx'
EXE = '.exe'
MSI = '.msi'
ISO = '.iso'
PGP = '.pgp'
PNG = '.png'
JPG = 'jpg'
JPEG = '.jpeg'
GIF = '.gif'
WAV = '.wav'
JAR = '.jar'
PY = '.py'
BAT = '.bat'
DLL = '.dll'
ZIP = '.zip'
SEVEN_ZIP = '.7z'
TAR = '.tar'
RAR = '.rar'
class FileNotExistError(Exception):
def __init__(self, msg=''):
Exception.__init__(self, msg)
def split_text():
pass
def split_json():
pass
def compress(path, z_path, z_name='zip'):
"""
压缩文件或文件夹
:param path:
:param z_path: 压缩后路径
:param z_name: 压缩格式 zip rar gz
:return:
"""
pass
def un_compress(path):
"""
解压
:param path:
:return:
"""
pass
def md5(file_path) -> str:
"""
文件MD5
:param file_path:
:return:
"""
if not exist(file_path):
raise FileNotExistError('文件不存在')
with open(file_path, 'rb') as f:
data = f.read()
return hashlib.md5(data).hexdigest()
def move(src, dst):
"""
移动文件夹或文件
:param src:
:param dst: 目标
:return:
"""
shutil.move(src, dst)
def copy(src, dst, override=False):
"""
复制
:param src:
:param dst:
:param override:
:return:
"""
if override and exist(dst):
os.remove(dst)
shutil.copy(src, dst)
def copy_dir(src, dst, override=False) -> str:
"""
复制文件夹
:param src: 源目录
:param dst: 目的目录
:param override: 是否覆盖
:return:
"""
if override and exist(dst):
os.remove(dst)
return shutil.copytree(src, dst, dirs_exist_ok=override)
def copy_file(src, dst, override=False) -> str:
"""
复制文件
:param src: 源文件
:param dst: 目的文件或目的目录
:param override: 是否覆盖
:return:
"""
if override and exist(dst):
os.remove(dst)
if Path(dst).is_dir():
dst = os.path.join(dst, file_full_name(src))
return shutil.copyfile(src, dst)
def rename(file_path, neo_name) -> Path:
"""
重命名
:param file_path: 文件路径
:param neo_name: 新命名
:return:
"""
return Path(file_path).rename(parent_path(file_path).joinpath(neo_name))
def delete(file_path):
"""
删除文件
:param file_path:
:return:
"""
if exist(file_path):
os.remove(file_path)
def touch(file_path, mode=0o777, cover=True):
"""
创建文件
:param file_path: 文件路径
:param mode: 权限
:param cover: 是否覆盖
:return:
"""
parent = parent_path(file_path)
if not parent.exists():
loop_mk_dir(str(parent), mode=mode, exist_ok=cover)
Path(file_path).touch(mode=mode, exist_ok=cover)
def file_name(file_path) -> str:
"""
获取文件名
:param file_path:
:return:
"""
full_name = file_full_name(file_path)
return full_name.split(sep='.')[0] if (full_name and '.' in full_name) else full_name
def file_full_name(file_path) -> str:
"""
文件全名,带后缀
:param file_path:
:return:
"""
return Path(file_path).name
def suffix(file_path) -> str:
"""
文件后缀
:return:
"""
return Path(file_path).suffix
def suffixes(file_path) -> list:
"""
多后缀:xxx.tar.gz
:param file_path:
:return: [.tar,.gz]
"""
return Path(file_path).suffixes
def file_info(file_path):
"""
返回文件信息
:param file_path:
:return:
st_mode=33206 文件模式包括文件类型和文件模式位即权限位
st_ino=281474976714543 与平台有关但如果不为零则根据 st_dev 值唯一地标识文件
通常 Unix 上该值表示索引节点号 (inode number) Windows 上该值表示 文件索引号
st_dev=10943705 该文件所在设备的标识符
st_nlink=1 硬链接的数量
st_uid=0 文件所有者的用户 ID
st_gid=0 文件所有者的用户组 ID
st_size=453 文件大小以字节为单位
st_atime=1662966762 最近的访问时间以秒为单位
st_mtime=1652331424 最近的修改时间以秒为单位
st_ctime=1652331424 Windows 上表示创建时间以秒为单位 Unix 上表示最近的元数据更改时间
"""
return Path(file_path).stat()

@ -1,7 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2022/9/10 10:15
# @Author : old tom
# @File : fu_lang.py
# @Project : Futool
# @Desc : 字符串相关

@ -1,7 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2022/9/10 10:17
# @Author : old tom
# @File : fu_math.py
# @Project : Futool
# @Desc : 数学计算

@ -1,7 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2022/9/10 10:46
# @Author : old tom
# @File : fu_parser.py
# @Project : Futool
# @Desc : 解析器(CSV,JSON,NB文件)

@ -1,94 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2022/8/31 23:34
# @Author : old tom
# @File : http_downloader.py
# @Project : Futool
# @Desc : 文件下载器
import time
from futool.http.http_request import head
from multiprocessing import Pool
import urllib.request as req
class HttpDownloader(object):
"""
HTTP 下载器
"""
def __init__(self, pool=None):
self.pool = Pool(16) if not pool else pool
def download(self, url, dst, chunk_size=1000):
"""
文件下,自动开启多线程
:param url: 下载链接
:param dst: 保存路径
:param chunk_size: 文件块
:return:
"""
is_support, content_length = HttpDownloader.is_support_range(url)
if is_support:
# 每个线程下载字节偏移量
offset = self.fork(int(content_length), chunk_size)
self.__join(offset, url, dst)
else:
print('无法获取Content-Length,使用单线程下载')
pass
@staticmethod
def is_support_range(url):
"""
判断是否支持range请求
:return:
"""
wrapper = head(url)
header = wrapper.header()
h_keys = header.keys()
if 'Accept-Ranges' in h_keys and 'Content-Length' in h_keys and header['Accept-Ranges'] != 'none':
return True, header['Content-Length']
else:
return False, 0
@staticmethod
def fork(content_length: int, chunk_size):
"""
拆分线程
:param chunk_size: 文件块大小
:param content_length:
:return:
"""
offset = []
if content_length <= chunk_size:
offset.append((0, content_length))
else:
for i in range(content_length // chunk_size):
start_offset = chunk_size * i + 1
end_offset = start_offset - 1 + chunk_size
offset.append((0 if i == 0 else start_offset, end_offset))
offset.append((chunk_size * (content_length // chunk_size), content_length))
return offset
def __join(self, offset, url, dst):
"""
多线程下载
:param offset:
:param url:
:param dst:
:return:
"""
def download_by_thread(part):
_request = req.Request(url=url, headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/104.0.5112.102 Safari/537.36 Edg/104.0.1293.70",
'Range': f'bytes:{part[0]}-{part[1]}'
}, method='GET')
response = req.urlopen(_request)
with open(dst + f'.{time.time_ns()}', 'wb') as f:
f.write(response.read())
self.pool.map(download_by_thread, offset)
self.pool.close()
self.pool.join()

@ -1,158 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2022/8/29 21:31
# @Author : old tom
# @File : http_request.py
# @Project : Futool
import urllib.request as req
import urllib.parse
from futool.http.http_response import ResponseWrapper
from http import cookiejar
import json
import shutil
import os
DEFAULT_HEADER = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/104.0.5112.102 Safari/537.36 Edg/104.0.1293.70",
"Accept": "*/*",
"Connection": "keep-alive"
}
DEFAULT_TIMEOUT = 10
class HttpRequestError(Exception):
def __init__(self, msg=''):
Exception.__init__(self, msg)
def get(url, param=None, header=None, timeout=DEFAULT_TIMEOUT):
"""
普通get请求
:param url: 请求地址
:param param: 参数字典 自动拼接url
:param header: 请求头
:param timeout: 超时()
:return:
"""
return request(url, 'GET', param, header=header, timeout=timeout)
def head(url, param=None, header=None, timeout=DEFAULT_TIMEOUT):
"""
发起head请求
:param url:
:param param:
:param header:
:param timeout:
:return:
"""
return request(url, 'HEAD', param, header, timeout)
def post_form(url, data=None, header=None, timeout=DEFAULT_TIMEOUT):
"""
post 请求提交普通表单
:param url:
:param data:
:param header:
:param timeout:
:return:
"""
return request(url, 'POST', data=data, header=header, timeout=timeout)
def post(url, json_param=None, header=None, timeout=DEFAULT_TIMEOUT):
"""
post 请求提交body 参数
:param url:
:param json_param:
:param header:
:param timeout:
:return:
"""
return request(url, 'POST', json_param=json_param, header=header, timeout=timeout)
def request(url, method, param=None, data=None, json_param=None, header=None, timeout=DEFAULT_TIMEOUT, before_send=None,
success_handler=None,
error_handler=None):
"""
基础请求方法
:param method: 请求方法 目前支持get post 方法
:param url: 请求地址
:param param: 请求参数拼接url或表单
:param data: 表单参数
:param json_param: json body参数
:param header: 请求头
:param timeout: 超时
:param before_send: 发送前处理
:param success_handler: 钩子函数成功处理
:param error_handler: 钩子函数失败处理
:return:
"""
_request = None
if method not in ['GET', 'POST', 'HEAD']:
raise HttpRequestError('not support method')
if header is None:
header = DEFAULT_HEADER
opener, cookie = _init_cookie_jar()
if before_send:
before_send(opener, cookie, header)
if method in ['GET', 'HEAD']:
if param:
param_str = urllib.parse.urlencode(param)
url = url + '?' + param_str
_request = req.Request(url, headers=header, method=method)
if 'POST' == method:
if data:
# 表单
data = urllib.parse.urlencode(data).encode('utf-8')
header.update({"Content-Type": "application/x-www-form-urlencoded"})
if json_param:
# json body
data = bytes(json.dumps(json_param), 'utf-8')
header.update({"Content-Type": "application/json"})
_request = req.Request(url, data=data, headers=header, method=method)
wrapper = ResponseWrapper(opener.open(_request, timeout=timeout), cookie)
if success_handler and wrapper.is_ok():
return success_handler(wrapper)
elif error_handler and not wrapper.is_ok():
return error_handler(wrapper)
return wrapper
def _init_cookie_jar():
"""
初始化cookie容器
:return:
"""
cookie = cookiejar.CookieJar()
handler = req.HTTPCookieProcessor(cookie)
return req.build_opener(handler), cookie
def upload_file():
# TODO
pass
def download_file(url, dst_path, overwrite=True, duplicate_handler=None, buffer=16 * 1024):
"""
文件下载
:param url: URL路径
:param dst_path: 目标位置
:param overwrite: 是否覆盖同名文件
:param duplicate_handler: 自定义重复文件处理
:param buffer:
:return:
"""
if os.path.isfile(dst_path) and overwrite:
os.remove(dst_path)
if os.path.isfile(dst_path) and not overwrite and duplicate_handler:
duplicate_handler(dst_path)
b_resp = req.urlopen(url)
with open(dst_path, 'wb') as f:
shutil.copyfileobj(b_resp, f, buffer)

@ -1,79 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2022/8/29 23:14
# @Author : old tom
# @File : http_response.py
# @Project : Futool
# @Desc : 响应解析
from http.client import HTTPResponse
from http.cookiejar import CookieJar
import json
DEFAULT_ENCODING = 'UTF-8'
# 响应类型编码
RESPONSE_CONTENT_ENCODING = "Content-Encoding"
# 压缩类型
COMPRESS_TYPE = ('gzip', 'deflate', 'br')
class ResponseWrapper(object):
def __init__(self, response: HTTPResponse, cookie: CookieJar = None):
self.resp = response
if cookie and len(cookie) > 0:
self.cookie = cookie
def body(self, encoding=DEFAULT_ENCODING):
return self.resp.read().decode(encoding)
def json_body(self, encoding=DEFAULT_ENCODING):
return json.loads(self.body(encoding))
def status(self):
return self.resp.status
def is_ok(self):
st_code = self.resp.status
return 200 <= st_code <= 300
def header(self, name=None):
return self.resp.getheader(name) if name else self._parse_header_dict()
def _parse_header_dict(self):
headers = self.resp.getheaders()
header_dict = {}
if headers:
for h in headers:
header_dict[h[0]] = h[1]
return header_dict
def is_compress(self):
"""
是否压缩
:return:
"""
return self.compress_type() in COMPRESS_TYPE
def compress_type(self):
"""
压缩格式
:return:
"""
header = self.header()
if RESPONSE_CONTENT_ENCODING in header.keys():
res_content_encoding = header[RESPONSE_CONTENT_ENCODING]
return res_content_encoding
def cookies(self):
"""
获取cookie
:return:
"""
ck = {}
if self.cookie:
for item in self.cookie:
ck[item.name] = item.value
return ck

@ -1,7 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/4/2 9:07
# @Author : old tom
# @File : __init__.py.py
# @Project : futool
# @Desc : excel、word操作相关

@ -1,213 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/4/2 9:08
# @Author : old tom
# @File : fu_excel.py
# @Project : futool
# @Desc : 读写excel,需要引入openPyxl
from futool.core import fu_file
from openpyxl import load_workbook, Workbook
class ExcelNotFoundError(Exception):
"""
excel文件不存在
"""
def __init__(self, msg=''):
Exception.__init__(self, msg)
class SheetNotExistError(Exception):
"""
sheet不存在
"""
def __init__(self, msg=''):
Exception.__init__(self, msg)
class ExcelReader(object):
"""
excel读取
TODO 存在性能问题需要重构;增加流式读取功能;sheet分片方式存在问题
"""
class SheetNotLoadError(Exception):
def __init__(self):
Exception.__init__(self, 'sheet not load,you need call load_sheet()')
class OutOfColIndexError(Exception):
def __init__(self, msg):
Exception.__init__(self, msg)
def __init__(self, file_path):
if not fu_file.exist(file_path):
raise ExcelNotFoundError(msg=f'path={file_path}')
self.wb = load_workbook(file_path, read_only=True)
self.sheetnames = self.wb.sheetnames
self.sheet = None
self.col_nums = 0
def load_sheet(self, sheet_name: str):
"""
指定名称读取sheet
"""
if sheet_name not in self.sheetnames:
raise SheetNotExistError(msg=f'{sheet_name} not exists')
self.sheet = self.wb[sheet_name]
return self.sheet
def load_sheet_by_index(self, sheet_index: int):
"""
指定下标读取sheet,从0开始
"""
return self.load_sheet(self.sheetnames[sheet_index])
def load_sheet_first(self):
"""
加载第一个sheet
:return:
"""
return self.load_sheet_by_index(0)
def read_row(self, row_index):
"""
读取某一行
"""
self._check_sheet()
element = []
for i, row in enumerate(self.sheet):
if i == row_index:
for cell in row:
element.append(cell.value)
return element
def read_range_rows(self, start, end):
"""
范围读取
"""
self._check_sheet()
elements = []
for i, row in enumerate(self.sheet):
if start <= i <= end:
element = []
for cell in row:
element.append(cell.value)
elements.append(tuple(element))
return elements
def read_rows(self, row_index: [] = None):
"""
指定读取多行
:param row_index: 行号,例如:[1,3,5]
:return:
"""
self._check_sheet()
elements = []
for i, row in enumerate(self.sheet):
if i in row_index:
element = []
for cell in row:
element.append(cell.value)
elements.append(tuple(element))
return elements
def read_first(self):
"""
读取标题行
"""
return self.read_row(0)
def read_all(self, skip_head=True):
"""
读取全部
:return:
"""
self._check_sheet()
elements = []
for i, row in enumerate(self.sheet):
if skip_head:
if i > 0:
element = []
for cell in row:
element.append(cell.value)
elements.append(tuple(element))
return elements
def read_column(self, col_index, skip_head=True):
"""
按列读取
:param col_index: 列下标从0开始
:param skip_head: 跳过第一行
:return: list
"""
self._check_sheet()
element = []
for i, row in enumerate(self.sheet):
if skip_head:
if i > 0:
for j, cell in enumerate(row):
if j == col_index:
element.append(cell.value)
return element
def read_range_column(self, start, end, skip_head=True):
"""
范围读取列,下标从0开始
:param start: 开始下标
:param end: 结束下标
:param skip_head: 跳过第一行
:return:
"""
self._col_nums()
if end >= self.col_nums:
raise self.OutOfColIndexError(msg=f'out of column index,max col was {self.col_nums} ')
elements = []
for i in range(start, end + 1):
elements.append(tuple(self.read_column(i, skip_head)))
return elements
def _check_sheet(self):
if self.sheet is None:
raise self.SheetNotLoadError()
def _col_nums(self):
self.col_nums = len(self.read_first())
class SimpleExcelWriter(object):
"""
excel写入
"""
class ExcelFileExistsError(Exception):
"""
excel文件已存在
"""
def __init__(self, msg=''):
Exception.__init__(self, msg)
def __init__(self, write_path):
if fu_file.exist(write_path):
raise self.ExcelFileExistsError(f'{write_path} all ready exists')
self.write_path = write_path
self.wb = Workbook(write_only=True)
def write(self, head: [], data: [], sheet_name='Sheet1', index=0):
"""
写excel文件
:param index: sheet 下标
:param head: 第一行标题
:param data: 数据
:param sheet_name: sheet名称
:return:
"""
ws = self.wb.create_sheet(title=sheet_name, index=index)
# 写入列头
ws.append(head)
for d in data:
ws.append(d)
self.wb.save(self.write_path)

@ -1,85 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2022/9/9 0:04
# @Author : old tom
# @File : fu_sys.py
# @Project : Futool
# @Desc : 用于获取当前系统信息内存及硬盘信息获取请使用psutil
import os
import platform
import getpass
def is_windows():
"""
是否windows
:return:
"""
return 'Windows' == platform.system()
def is_linux():
"""
是否linux
:return:
"""
return 'Linux' == platform.system()
def is_unix():
"""
是否unix
:return:
"""
return 'Unix' == platform.system()
def os_name():
"""
操作系统名称
:return:
"""
return platform.platform()
def host_name():
"""
获取主机名
:return:
"""
return platform.node()
def sys_user():
"""
系统用户
:return:
"""
return getpass.getuser()
def sys_user_dir():
"""
当前用户目录
:return:
"""
return os.path.expanduser('~')
class CpuInfo(object):
@staticmethod
def cpu_architecture():
"""
CPU架构AMD64i386
:return:
"""
return platform.machine()
@staticmethod
def cpu_count():
"""
CPU核数
:return:
"""
return os.cpu_count()

@ -1 +0,0 @@
openpyxl~=3.1.2

@ -1,84 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/4/2 9:01
# @Author : old tom
# @File : test_fu_date.py
# @Project : futool
from unittest import TestCase
from futool.core import fu_date
# @Desc :
class Test(TestCase):
def test_current_year(self):
rt = fu_date.current_year()
self.assertTrue(rt)
def test_current_month(self):
self.fail()
def test_current_day(self):
self.fail()
def test_current_date(self):
self.fail()
def test_current_datetime(self):
self.fail()
def test_current_time(self):
self.fail()
def test_current_timestamp(self):
self.fail()
def test_format_datetime_str(self):
self.fail()
def test_format_date_str(self):
self.fail()
def test_datetime_2_second(self):
self.fail()
def test_sec_2_datatime(self):
self.fail()
def test_is_leap(self):
self.fail()
def test_begin_of_week(self):
self.fail()
def test_end_of_week(self):
self.fail()
def test_end_of_month(self):
self.fail()
def test_weekday(self):
self.fail()
def test_age(self):
self.fail()
def test_age_of_now(self):
self.fail()
def test_between(self):
self.fail()
def test_time_offset(self):
self.fail()
def test_is_am(self):
self.fail()
def test_is_pm(self):
self.fail()
def test_next_week(self):
self.fail()
def test_next_month(self):
self.fail()

@ -1,67 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/4/2 9:33
# @Author : old tom
# @File : test_fu_excel.py
# @Project : futool
from unittest import TestCase
from futool.poi.fu_excel import ExcelReader, SimpleExcelWriter
ex_reader = ExcelReader(file_path=r'D:\test\test3.xlsx')
# @Desc :
class TestExcelReader(TestCase):
def test_read_sheet(self):
sheet = ex_reader.load_sheet('Sheet1')
# for row in sheet:
# for cell in row:
# print(cell.value)
for i, row in enumerate(sheet):
if i == 0:
for cell in row:
print(cell.value)
def test_read_sheet_by_index(self):
sheet = ex_reader.load_sheet_by_index(0)
for row in sheet:
for cell in row:
print(cell.value)
def test_read_row(self):
ex_reader.load_sheet('Sheet1')
print(ex_reader.read_row(0))
def test_read_range_row(self):
ex_reader.load_sheet('Sheet1')
print(ex_reader.read_range_rows(1, 3))
def test_read_column(self):
ex_reader.load_sheet('Sheet1')
print(ex_reader.read_column(4))
def test_read_rows(self):
ex_reader.load_sheet('Sheet1')
print(ex_reader.read_rows(row_index=[0, 3]))
def test_read_all(self):
ex_reader.load_sheet('Sheet1')
print(ex_reader.read_all())
def test_read_range_column(self):
ex_reader.load_sheet('Sheet1')
print(ex_reader.read_range_column(1, 4))
class TestExcelWriter(TestCase):
def test_write(self):
ew = SimpleExcelWriter(r'D:\test\test3.xlsx')
# ew.write(head=['序号', '姓名', '年龄', '身份证', '住址'], data=[('1', '张三', '1', '101111', '特特特特特')])
ew.write(head=['序号', '姓名', '年龄', '身份证', '住址'], data=self.gen_big_excel())
def gen_big_excel(self):
big_data = []
for i in range(0, 999999):
big_data.append(('1', '张三', '1', '101111', '特特特特特'))
return big_data

@ -1,44 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2022/9/10 10:07
# @Author : old tom
# @File : test_fu_sys.py
# @Project : Futool
from unittest import TestCase
from futool.system import fu_sys
# @Desc :
class Test(TestCase):
def test_is_windows(self):
rt = fu_sys.is_windows()
self.assertTrue(rt)
def test_is_linux(self):
rt = fu_sys.is_linux()
self.assertFalse(rt)
def test_is_unix(self):
rt = fu_sys.is_unix()
self.assertFalse(rt)
def test_os_name(self):
rt = fu_sys.os_name()
self.assertIsNotNone(rt)
def test_host_name(self):
rt = fu_sys.host_name()
self.assertIsNotNone(rt)
def test_sys_user(self):
rt = fu_sys.sys_user()
self.assertIsNotNone(rt)
def test_sys_user_dir(self):
rt = fu_sys.sys_user_dir()
self.assertIsNotNone(rt)
def test_cpu_info(self):
cpu_info = fu_sys.CpuInfo()
self.assertIsNotNone(cpu_info.cpu_count())
self.assertIsNotNone(cpu_info.cpu_architecture())
Loading…
Cancel
Save