first commit

master
zr 2 years ago
commit 5ba41f5690

162
.gitignore vendored

@ -0,0 +1,162 @@
# ---> Python
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
*.pyc
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
.idea/

@ -0,0 +1,11 @@
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"
[packages]
[dev-packages]
[requires]
python_version = "3.8"

20
Pipfile.lock generated

@ -0,0 +1,20 @@
{
"_meta": {
"hash": {
"sha256": "7f7606f08e0544d8d012ef4d097dabdd6df6843a28793eb6551245d4b2db4242"
},
"pipfile-spec": 6,
"requires": {
"python_version": "3.8"
},
"sources": [
{
"name": "pypi",
"url": "https://pypi.org/simple",
"verify_ssl": true
}
]
},
"default": {},
"develop": {}
}

@ -0,0 +1,144 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2022/9/12 21:03
# @Author : old tom
# @File : fh_file_path.py
# @Project : Futool
# @Desc : 文件路径
import pathlib
import os
import typing
class PathNotExistError(Exception):
"""
路径不存在
"""
def __init__(self, msg=''):
Exception.__init__(self, msg)
def resolve_2_abs(path) -> pathlib.Path:
"""
相对路径解析为绝对路径
:param path:
:return:
"""
return pathlib.Path(path).resolve()
def isabs(path):
"""
是否绝对路径
:param path:
:return:
"""
return os.path.isabs(path)
def exist(path):
"""
文件或文件夹是否存在
:param path:
:return:
"""
return os.path.exists(path)
def rm_dir(dir_path):
"""
删除文件夹,此目录必须为空
:param dir_path:
:return:
"""
pathlib.Path(dir_path).rmdir()
def find_file_by_pattern(path, pattern) -> typing.Generator:
"""
根据匹配规则查找文件
:param path: 路径
:param pattern: 所有txt,*.txt
:return:
"""
return pathlib.Path(path).rglob(pattern)
def loop_mk_dir(dir_path, mode=0o777, exist_ok=False):
"""
创建文件夹及其子路径
:param dir_path: 文件夹路径
:param mode: 权限
:param exist_ok: 是否覆盖
:return:
"""
pathlib.Path(dir_path).mkdir(parents=True, mode=mode, exist_ok=exist_ok)
def loop_dir(dir_path, file_container: list, filter_fun=None):
"""
递归文件夹
:param dir_path:
:param file_container: 路径容器
:param filter_fun: 自定义过滤
:return:
"""
if not exist(dir_path):
raise PathNotExistError('目标文件夹不存在')
file_list = os.listdir(dir_path)
for f in file_list:
full_path = os.path.join(dir_path, f)
if os.path.isdir(full_path):
loop_dir(full_path, file_container, filter_fun)
else:
if filter_fun:
if filter_fun(full_path):
file_container.append(full_path)
else:
file_container.append(full_path)
return file_container
def is_windows_path(path) -> bool:
"""
是否windows路径
:param path:
:return:
"""
return len(str(pathlib.Path(path).drive).rstrip()) > 0
def parent_path_str(path) -> str:
"""
父级路径
:param path:
:return:
"""
return str(parent_path(path))
def parent_path(path) -> pathlib.Path:
"""
父级路径
:param path:
:return:
"""
return pathlib.Path(path).parent
def pwd():
"""
当前路径
:return:
"""
return pathlib.Path().cwd()
def home():
"""
home路径
:return:
"""
return pathlib.Path().home()

@ -0,0 +1,273 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2022/9/10 10:17
# @Author : old tom
# @File : fu_date.py
# @Project : Futool
# @Desc : 日期时间工具
import time
from datetime import datetime, date, timedelta
import calendar
# 毫秒单位
MILLISECOND_UNIT = 1000
# 1小时3600秒
ONE_HOUR_SECOND = 3600
# 上午下午分界
AM_PM_DIV = 11
# 一周7天
ONE_WEEK_DAYS = 7
# 日期时间格式化转换
FMT_MAPPING = {
'yyyy-MM-dd': '%Y-%m-%d',
'yyyyMMdd': '%Y%m%d',
'yyyy-MM-dd hh:mm:ss': '%Y-%m-%d %I:%M:%S',
'yyyy-MM-dd hh24:mm:ss': '%Y-%m-%d %H:%M:%S',
'yyyy-MM-dd HH:mm:ss': '%Y-%m-%d %H:%M:%S'
}
def current_year() -> int:
"""
本年
:return:
"""
return datetime.now().year
def current_month() -> int:
"""
本月
:return:
"""
return datetime.now().month
def current_day() -> int:
return datetime.now().day
def current_date() -> str:
return str(datetime.now().date())
def current_datetime(fmt='yyyy-MM-dd hh24:mm:ss') -> str:
"""
获取当前日期时间
:param fmt:
:return:
"""
return datetime.now().strftime(FMT_MAPPING[fmt])
def current_time(fmt='s') -> int:
"""
获取当前时间绝对秒
:param fmt: s: ms:毫秒 ns:纳秒
:return:
"""
time_jar = {
's': int(time.time()),
'ms': int(time.time() * MILLISECOND_UNIT),
'ns': time.time_ns()
}
return time_jar[fmt]
def current_timestamp():
"""
当前时间戳
:return:
"""
return datetime.now().timestamp()
def format_datetime_str(dt: str, fmt='yyyy-MM-dd hh24:mm:ss') -> datetime:
"""
格式化日期时间字符串
:param dt: 日期时间字符串
:param fmt: 格式化yyyy-MM-dd
:return: datetime对象
"""
return datetime.strptime(dt, FMT_MAPPING[fmt])
def format_date_str(dt: str, fmt='yyyy-MM-dd') -> date:
"""
格式化日期字符串
:param dt: 日期字符串
:param fmt: yyyy-MM-dd
:return:
"""
return datetime.strptime(dt, FMT_MAPPING[fmt]).date()
def datetime_2_second(dt: str, fmt='yyyy-MM-dd hh24:mm:ss') -> int:
"""
日期时间字符串转绝对秒
:param dt: 日期时间字符串 yyyy-MM-dd hh:mm:ss 格式
:param fmt: 格式化函数
:return:
"""
return int(format_datetime_str(dt, fmt).timestamp())
def sec_2_datatime(sec_time: int, fmt='yyyy-MM-dd hh24:mm:ss') -> str:
"""
绝对秒转日期时间
:param sec_time:
:param fmt: 格式化函数
:return:
"""
timed = time.localtime(sec_time)
return time.strftime(FMT_MAPPING[fmt], timed)
def is_leap(year: int) -> bool:
"""
是否闰年
:param year:
:return:
"""
return calendar.isleap(year)
def begin_of_week(date_str: str, fmt='yyyy-MM-dd') -> str:
"""
周开始日期
:param date_str:
:param fmt: 格式化
:return:
"""
formated_dt = format_date_str(date_str, fmt)
week_idx = weekday(date_str, fmt)
return date_str if week_idx == 0 else str(formated_dt - timedelta(days=week_idx))
def end_of_week(date_str: str, fmt='yyyy-MM-dd') -> str:
"""
周结束日期
:param date_str:
:param fmt: 格式化
:return:
"""
formated_dt = format_date_str(date_str, fmt)
week_idx = weekday(date_str, fmt)
return date_str if week_idx == 6 else str(formated_dt + timedelta(days=(6 - week_idx)))
def end_of_month(y, m) -> int:
"""
月结束日期
:param y
:param m
:return:
"""
return calendar.monthrange(y, m)[1]
def weekday(date_str: str, fmt='yyyy-MM-dd') -> int:
"""
返回日期是周几
:param date_str:
:param fmt: 格式化
:return: 0-7 ,0:周一
"""
fmted_date = format_datetime_str(date_str, fmt)
return calendar.weekday(fmted_date.year, fmted_date.month, fmted_date.day)
def age(birth: str, compare_date: str, fmt='yyyy-MM-dd') -> int:
"""
年龄计算
:param birth: 生日
:param compare_date: 被比较日期
:param fmt: 日期格式化 yyyy-MM-dd|yyyyMMdd
:return:
"""
fmt_birth = format_date_str(birth, fmt)
fmt_compare = format_date_str(compare_date, fmt)
birth_m = fmt_birth.replace(year=fmt_compare.year)
return fmt_compare.year - fmt_birth.year if fmt_compare > birth_m else fmt_compare.year - fmt_birth.year - 1
def age_of_now(birth: str) -> int:
"""
当前年龄
:param birth: 出生日期
:return:
"""
return age(birth, current_datetime('yyyy-MM-dd'))
def between(dt_1: str, dt_2: str, fmt='yyyy-MM-dd', time_unit='day') -> int:
"""
计算两个时间差
:param dt_1: 时间1 日期或日期时间
:param dt_2: 时间2 日期或日期时间
:param fmt: 格式化
:param time_unit: 时间单位 day: ,hour 小时 ,minute分钟,second
:return:
"""
fmt_dt1, fmt_dt2 = format_datetime_str(dt_1, fmt), format_datetime_str(dt_2, fmt)
return abs({
'day': (fmt_dt1 - fmt_dt2).days,
'hour': int(fmt_dt1.timestamp() - fmt_dt2.timestamp()) / ONE_HOUR_SECOND,
'second': int(fmt_dt1.timestamp() - fmt_dt2.timestamp())
}[time_unit])
def time_offset(start_dt: str, offset: int, fmt='yyyy-MM-dd HH:mm:ss', time_unit='day') -> datetime:
"""
时间偏移计算计算相隔N天后的日期
:param start_dt: 开始日期日期时间
:param fmt: 时间格式化
:param offset: 偏移量支持正负数
:param time_unit: 偏移量单位
:return:
"""
fmt_dt = format_datetime_str(start_dt, fmt)
return {'day': fmt_dt + timedelta(days=offset),
'hour': fmt_dt + timedelta(hours=offset),
'second': fmt_dt + timedelta(seconds=offset)}[time_unit]
def is_am(dt: str, fmt='yyyy-MM-dd HH:mm:ss') -> bool:
"""
是否上午
:param dt:
:param fmt:
:return:
"""
fmt_dt = format_datetime_str(dt, fmt)
return fmt_dt.hour <= AM_PM_DIV
def is_pm(dt: str, fmt='yyyy-MM-dd HH:mm:ss') -> bool:
"""
是否下午
:param dt:
:param fmt:
:return:
"""
fmt_dt = format_datetime_str(dt, fmt)
return fmt_dt.hour > AM_PM_DIV
def next_week(fmt='yyyy-MM-dd HH:mm:ss') -> datetime:
"""
下周同一天
:return:
"""
now = current_datetime(fmt)
return time_offset(now, ONE_WEEK_DAYS, fmt)
def next_month() -> datetime:
"""
下个月同一天
:return:
"""
return datetime.now().replace(month=current_month() + 1)

@ -0,0 +1,230 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2022/9/10 10:45
# @Author : old tom
# @File : fu_file.py
# @Project : Futool
# @Desc : 文件操作
import shutil
import os
import hashlib
import zipfile
from pathlib import Path
from futool.core.fh_file_path import loop_mk_dir, parent_path, exist
from enum import Enum
class FileSuffix(Enum):
"""
常见文件后缀
"""
XLSX = '.xlsx'
XLS = '.xls'
DOC = '.doc'
DOCX = '.docx'
PPT = '.ppt'
PPTX = '.pptx'
EXE = '.exe'
MSI = '.msi'
ISO = '.iso'
PGP = '.pgp'
PNG = '.png'
JPG = 'jpg'
JPEG = '.jpeg'
GIF = '.gif'
WAV = '.wav'
JAR = '.jar'
PY = '.py'
BAT = '.bat'
DLL = '.dll'
ZIP = '.zip'
SEVEN_ZIP = '.7z'
TAR = '.tar'
RAR = '.rar'
class FileNotExistError(Exception):
def __init__(self, msg=''):
Exception.__init__(self, msg)
def split_text():
pass
def split_json():
pass
def compress(path, z_path, z_name='zip'):
"""
压缩文件或文件夹
:param path:
:param z_path: 压缩后路径
:param z_name: 压缩格式 zip rar gz
:return:
"""
pass
def un_compress(path):
"""
解压
:param path:
:return:
"""
pass
def md5(file_path) -> str:
"""
文件MD5
:param file_path:
:return:
"""
if not exist(file_path):
raise FileNotExistError('文件不存在')
with open(file_path, 'rb') as f:
data = f.read()
return hashlib.md5(data).hexdigest()
def move(src, dst):
"""
移动文件夹或文件
:param src:
:param dst: 目标
:return:
"""
shutil.move(src, dst)
def copy(src, dst, override=False):
"""
复制
:param src:
:param dst:
:param override:
:return:
"""
if override and exist(dst):
os.remove(dst)
shutil.copy(src, dst)
def copy_dir(src, dst, override=False) -> str:
"""
复制文件夹
:param src: 源目录
:param dst: 目的目录
:param override: 是否覆盖
:return:
"""
if override and exist(dst):
os.remove(dst)
return shutil.copytree(src, dst, dirs_exist_ok=override)
def copy_file(src, dst, override=False) -> str:
"""
复制文件
:param src: 源文件
:param dst: 目的文件或目的目录
:param override: 是否覆盖
:return:
"""
if override and exist(dst):
os.remove(dst)
if Path(dst).is_dir():
dst = os.path.join(dst, file_full_name(src))
return shutil.copyfile(src, dst)
def rename(file_path, neo_name) -> Path:
"""
重命名
:param file_path: 文件路径
:param neo_name: 新命名
:return:
"""
return Path(file_path).rename(parent_path(file_path).joinpath(neo_name))
def delete(file_path):
"""
删除文件
:param file_path:
:return:
"""
if exist(file_path):
os.remove(file_path)
def touch(file_path, mode=0o777, cover=True):
"""
创建文件
:param file_path: 文件路径
:param mode: 权限
:param cover: 是否覆盖
:return:
"""
parent = parent_path(file_path)
if not parent.exists():
loop_mk_dir(str(parent), mode=mode, exist_ok=cover)
Path(file_path).touch(mode=mode, exist_ok=cover)
def file_name(file_path) -> str:
"""
获取文件名
:param file_path:
:return:
"""
full_name = file_full_name(file_path)
return full_name.split(sep='.')[0] if (full_name and '.' in full_name) else full_name
def file_full_name(file_path) -> str:
"""
文件全名,带后缀
:param file_path:
:return:
"""
return Path(file_path).name
def suffix(file_path) -> str:
"""
文件后缀
:return:
"""
return Path(file_path).suffix
def suffixes(file_path) -> list:
"""
多后缀:xxx.tar.gz
:param file_path:
:return: [.tar,.gz]
"""
return Path(file_path).suffixes
def file_info(file_path):
"""
返回文件信息
:param file_path:
:return:
st_mode=33206 文件模式包括文件类型和文件模式位即权限位
st_ino=281474976714543 与平台有关但如果不为零则根据 st_dev 值唯一地标识文件
通常 Unix 上该值表示索引节点号 (inode number) Windows 上该值表示 文件索引号
st_dev=10943705 该文件所在设备的标识符
st_nlink=1 硬链接的数量
st_uid=0 文件所有者的用户 ID
st_gid=0 文件所有者的用户组 ID
st_size=453 文件大小以字节为单位
st_atime=1662966762 最近的访问时间以秒为单位
st_mtime=1652331424 最近的修改时间以秒为单位
st_ctime=1652331424 Windows 上表示创建时间以秒为单位 Unix 上表示最近的元数据更改时间
"""
return Path(file_path).stat()

@ -0,0 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2022/9/10 10:15
# @Author : old tom
# @File : fu_lang.py
# @Project : Futool
# @Desc : 字符串相关

@ -0,0 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2022/9/10 10:17
# @Author : old tom
# @File : fu_math.py
# @Project : Futool
# @Desc : 数学计算

@ -0,0 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2022/9/10 10:46
# @Author : old tom
# @File : fu_parser.py
# @Project : Futool
# @Desc : 解析器(CSV,JSON,NB文件)

@ -0,0 +1,94 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2022/8/31 23:34
# @Author : old tom
# @File : http_downloader.py
# @Project : Futool
# @Desc : 文件下载器
import time
from futool.http.http_request import head
from multiprocessing import Pool
import urllib.request as req
class HttpDownloader(object):
"""
HTTP 下载器
"""
def __init__(self, pool=None):
self.pool = Pool(16) if not pool else pool
def download(self, url, dst, chunk_size=1000):
"""
文件下,自动开启多线程
:param url: 下载链接
:param dst: 保存路径
:param chunk_size: 文件块
:return:
"""
is_support, content_length = HttpDownloader.is_support_range(url)
if is_support:
# 每个线程下载字节偏移量
offset = self.fork(int(content_length), chunk_size)
self.__join(offset, url, dst)
else:
print('无法获取Content-Length,使用单线程下载')
pass
@staticmethod
def is_support_range(url):
"""
判断是否支持range请求
:return:
"""
wrapper = head(url)
header = wrapper.header()
h_keys = header.keys()
if 'Accept-Ranges' in h_keys and 'Content-Length' in h_keys and header['Accept-Ranges'] != 'none':
return True, header['Content-Length']
else:
return False, 0
@staticmethod
def fork(content_length: int, chunk_size):
"""
拆分线程
:param chunk_size: 文件块大小
:param content_length:
:return:
"""
offset = []
if content_length <= chunk_size:
offset.append((0, content_length))
else:
for i in range(content_length // chunk_size):
start_offset = chunk_size * i + 1
end_offset = start_offset - 1 + chunk_size
offset.append((0 if i == 0 else start_offset, end_offset))
offset.append((chunk_size * (content_length // chunk_size), content_length))
return offset
def __join(self, offset, url, dst):
"""
多线程下载
:param offset:
:param url:
:param dst:
:return:
"""
def download_by_thread(part):
_request = req.Request(url=url, headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/104.0.5112.102 Safari/537.36 Edg/104.0.1293.70",
'Range': f'bytes:{part[0]}-{part[1]}'
}, method='GET')
response = req.urlopen(_request)
with open(dst + f'.{time.time_ns()}', 'wb') as f:
f.write(response.read())
self.pool.map(download_by_thread, offset)
self.pool.close()
self.pool.join()

@ -0,0 +1,158 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2022/8/29 21:31
# @Author : old tom
# @File : http_request.py
# @Project : Futool
import urllib.request as req
import urllib.parse
from futool.http.http_response import ResponseWrapper
from http import cookiejar
import json
import shutil
import os
DEFAULT_HEADER = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/104.0.5112.102 Safari/537.36 Edg/104.0.1293.70",
"Accept": "*/*",
"Connection": "keep-alive"
}
DEFAULT_TIMEOUT = 10
class HttpRequestError(Exception):
def __init__(self, msg=''):
Exception.__init__(self, msg)
def get(url, param=None, header=None, timeout=DEFAULT_TIMEOUT):
"""
普通get请求
:param url: 请求地址
:param param: 参数字典 自动拼接url
:param header: 请求头
:param timeout: 超时()
:return:
"""
return request(url, 'GET', param, header=header, timeout=timeout)
def head(url, param=None, header=None, timeout=DEFAULT_TIMEOUT):
"""
发起head请求
:param url:
:param param:
:param header:
:param timeout:
:return:
"""
return request(url, 'HEAD', param, header, timeout)
def post_form(url, data=None, header=None, timeout=DEFAULT_TIMEOUT):
"""
post 请求提交普通表单
:param url:
:param data:
:param header:
:param timeout:
:return:
"""
return request(url, 'POST', data=data, header=header, timeout=timeout)
def post(url, json_param=None, header=None, timeout=DEFAULT_TIMEOUT):
"""
post 请求提交body 参数
:param url:
:param json_param:
:param header:
:param timeout:
:return:
"""
return request(url, 'POST', json_param=json_param, header=header, timeout=timeout)
def request(url, method, param=None, data=None, json_param=None, header=None, timeout=DEFAULT_TIMEOUT, before_send=None,
success_handler=None,
error_handler=None):
"""
基础请求方法
:param method: 请求方法 目前支持get post 方法
:param url: 请求地址
:param param: 请求参数拼接url或表单
:param data: 表单参数
:param json_param: json body参数
:param header: 请求头
:param timeout: 超时
:param before_send: 发送前处理
:param success_handler: 钩子函数成功处理
:param error_handler: 钩子函数失败处理
:return:
"""
_request = None
if method not in ['GET', 'POST', 'HEAD']:
raise HttpRequestError('not support method')
if header is None:
header = DEFAULT_HEADER
opener, cookie = _init_cookie_jar()
if before_send:
before_send(opener, cookie, header)
if method in ['GET', 'HEAD']:
if param:
param_str = urllib.parse.urlencode(param)
url = url + '?' + param_str
_request = req.Request(url, headers=header, method=method)
if 'POST' == method:
if data:
# 表单
data = urllib.parse.urlencode(data).encode('utf-8')
header.update({"Content-Type": "application/x-www-form-urlencoded"})
if json_param:
# json body
data = bytes(json.dumps(json_param), 'utf-8')
header.update({"Content-Type": "application/json"})
_request = req.Request(url, data=data, headers=header, method=method)
wrapper = ResponseWrapper(opener.open(_request, timeout=timeout), cookie)
if success_handler and wrapper.is_ok():
return success_handler(wrapper)
elif error_handler and not wrapper.is_ok():
return error_handler(wrapper)
return wrapper
def _init_cookie_jar():
"""
初始化cookie容器
:return:
"""
cookie = cookiejar.CookieJar()
handler = req.HTTPCookieProcessor(cookie)
return req.build_opener(handler), cookie
def upload_file():
# TODO
pass
def download_file(url, dst_path, overwrite=True, duplicate_handler=None, buffer=16 * 1024):
"""
文件下载
:param url: URL路径
:param dst_path: 目标位置
:param overwrite: 是否覆盖同名文件
:param duplicate_handler: 自定义重复文件处理
:param buffer:
:return:
"""
if os.path.isfile(dst_path) and overwrite:
os.remove(dst_path)
if os.path.isfile(dst_path) and not overwrite and duplicate_handler:
duplicate_handler(dst_path)
b_resp = req.urlopen(url)
with open(dst_path, 'wb') as f:
shutil.copyfileobj(b_resp, f, buffer)

@ -0,0 +1,79 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2022/8/29 23:14
# @Author : old tom
# @File : http_response.py
# @Project : Futool
# @Desc : 响应解析
from http.client import HTTPResponse
from http.cookiejar import CookieJar
import json
DEFAULT_ENCODING = 'UTF-8'
# 响应类型编码
RESPONSE_CONTENT_ENCODING = "Content-Encoding"
# 压缩类型
COMPRESS_TYPE = ('gzip', 'deflate', 'br')
class ResponseWrapper(object):
def __init__(self, response: HTTPResponse, cookie: CookieJar = None):
self.resp = response
if cookie and len(cookie) > 0:
self.cookie = cookie
def body(self, encoding=DEFAULT_ENCODING):
return self.resp.read().decode(encoding)
def json_body(self, encoding=DEFAULT_ENCODING):
return json.loads(self.body(encoding))
def status(self):
return self.resp.status
def is_ok(self):
st_code = self.resp.status
return 200 <= st_code <= 300
def header(self, name=None):
return self.resp.getheader(name) if name else self._parse_header_dict()
def _parse_header_dict(self):
headers = self.resp.getheaders()
header_dict = {}
if headers:
for h in headers:
header_dict[h[0]] = h[1]
return header_dict
def is_compress(self):
"""
是否压缩
:return:
"""
return self.compress_type() in COMPRESS_TYPE
def compress_type(self):
"""
压缩格式
:return:
"""
header = self.header()
if RESPONSE_CONTENT_ENCODING in header.keys():
res_content_encoding = header[RESPONSE_CONTENT_ENCODING]
return res_content_encoding
def cookies(self):
"""
获取cookie
:return:
"""
ck = {}
if self.cookie:
for item in self.cookie:
ck[item.name] = item.value
return ck

@ -0,0 +1,85 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2022/9/9 0:04
# @Author : old tom
# @File : fu_sys.py
# @Project : Futool
# @Desc : 用于获取当前系统信息内存及硬盘信息获取请使用psutil
import os
import platform
import getpass
def is_windows():
"""
是否windows
:return:
"""
return 'Windows' == platform.system()
def is_linux():
"""
是否linux
:return:
"""
return 'Linux' == platform.system()
def is_unix():
"""
是否unix
:return:
"""
return 'Unix' == platform.system()
def os_name():
"""
操作系统名称
:return:
"""
return platform.platform()
def host_name():
"""
获取主机名
:return:
"""
return platform.node()
def sys_user():
"""
系统用户
:return:
"""
return getpass.getuser()
def sys_user_dir():
"""
当前用户目录
:return:
"""
return os.path.expanduser('~')
class CpuInfo(object):
@staticmethod
def cpu_architecture():
"""
CPU架构AMD64i386
:return:
"""
return platform.machine()
@staticmethod
def cpu_count():
"""
CPU核数
:return:
"""
return os.cpu_count()

@ -0,0 +1,44 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2022/9/10 10:07
# @Author : old tom
# @File : test_fu_sys.py
# @Project : Futool
from unittest import TestCase
from futool.system import fu_sys
# @Desc :
class Test(TestCase):
def test_is_windows(self):
rt = fu_sys.is_windows()
self.assertTrue(rt)
def test_is_linux(self):
rt = fu_sys.is_linux()
self.assertFalse(rt)
def test_is_unix(self):
rt = fu_sys.is_unix()
self.assertFalse(rt)
def test_os_name(self):
rt = fu_sys.os_name()
self.assertIsNotNone(rt)
def test_host_name(self):
rt = fu_sys.host_name()
self.assertIsNotNone(rt)
def test_sys_user(self):
rt = fu_sys.sys_user()
self.assertIsNotNone(rt)
def test_sys_user_dir(self):
rt = fu_sys.sys_user_dir()
self.assertIsNotNone(rt)
def test_cpu_info(self):
cpu_info = fu_sys.CpuInfo()
self.assertIsNotNone(cpu_info.cpu_count())
self.assertIsNotNone(cpu_info.cpu_architecture())
Loading…
Cancel
Save