You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

405 lines
10 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2022/9/10 10:17
# @Author : old tom
# @File : fu_date.py
# @Project : Futool
# @Desc : 日期时间工具
import time
from datetime import datetime, date, timedelta
import calendar
from decimal import Decimal
# 纳秒到毫秒
NANOSECOND_2_MILLISECOND = 1000000
# 纳秒到秒
NANOSECOND_2_SECOND = 1000000000
# 纳秒到分钟
NANOSECOND_2_MINUTE = 60000000000
# 毫秒单位
MILLISECOND_UNIT = 1000
# 1小时3600秒
ONE_HOUR_SECOND = 3600
# 上午下午分界
AM_PM_DIV = 11
# 一周7天
ONE_WEEK_DAYS = 7
# 日期时间格式化转换
FMT_MAPPING = {
'yyyy-MM-dd': '%Y-%m-%d',
'yyyyMMdd': '%Y%m%d',
'yyyy-MM-dd hh:mm:ss': '%Y-%m-%d %I:%M:%S',
'yyyy-MM-dd hh24:mm:ss': '%Y-%m-%d %H:%M:%S',
'yyyy-MM-dd HH:mm:ss': '%Y-%m-%d %H:%M:%S'
}
def current_year() -> int:
"""
本年
:return:
"""
return datetime.now().year
def current_month() -> int:
"""
本月
:return:
"""
return datetime.now().month
def current_day() -> int:
return datetime.now().day
def current_date() -> str:
return str(datetime.now().date())
def current_datetime(fmt='yyyy-MM-dd hh24:mm:ss') -> str:
"""
获取当前日期时间
:param fmt:
:return:
"""
return datetime.now().strftime(FMT_MAPPING[fmt])
def current_time(fmt='s') -> int:
"""
获取当前时间绝对秒
:param fmt: s: 秒 ms:毫秒 ns:纳秒
:return:
"""
time_jar = {
's': int(time.time()),
'ms': int(time.time() * MILLISECOND_UNIT),
'ns': time.time_ns()
}
return time_jar[fmt]
def current_timestamp():
"""
当前时间戳
:return:
"""
return datetime.now().timestamp()
def format_datetime_str(dt: str, fmt='yyyy-MM-dd hh24:mm:ss') -> datetime:
"""
格式化日期时间字符串
:param dt: 日期时间字符串
:param fmt: 格式化yyyy-MM-dd
:return: datetime对象
"""
return datetime.strptime(dt, FMT_MAPPING[fmt])
def format_date_str(dt: str, fmt='yyyy-MM-dd') -> date:
"""
格式化日期字符串
:param dt: 日期字符串
:param fmt: yyyy-MM-dd
:return:
"""
return datetime.strptime(dt, FMT_MAPPING[fmt]).date()
def datetime_2_second(dt: str, fmt='yyyy-MM-dd hh24:mm:ss') -> int:
"""
日期时间字符串转绝对秒
:param dt: 日期时间字符串 yyyy-MM-dd hh:mm:ss 格式
:param fmt: 格式化函数
:return:
"""
return int(format_datetime_str(dt, fmt).timestamp())
def sec_2_datatime(sec_time: int, fmt='yyyy-MM-dd hh24:mm:ss') -> str:
"""
绝对秒转日期时间
:param sec_time:
:param fmt: 格式化函数
:return:
"""
timed = time.localtime(sec_time)
return time.strftime(FMT_MAPPING[fmt], timed)
def is_leap(year: int) -> bool:
"""
是否闰年
:param year:
:return:
"""
return calendar.isleap(year)
def begin_of_week(date_str: str, fmt='yyyy-MM-dd') -> str:
"""
周开始日期
:param date_str: 年
:param fmt: 格式化
:return:
"""
formated_dt = format_date_str(date_str, fmt)
week_idx = weekday(date_str, fmt)
return date_str if week_idx == 0 else str(formated_dt - timedelta(days=week_idx))
def end_of_week(date_str: str, fmt='yyyy-MM-dd') -> str:
"""
周结束日期
:param date_str: 年
:param fmt: 格式化
:return:
"""
formated_dt = format_date_str(date_str, fmt)
week_idx = weekday(date_str, fmt)
return date_str if week_idx == 6 else str(formated_dt + timedelta(days=(6 - week_idx)))
def end_of_month(y, m) -> int:
"""
月结束日期
:param y 年
:param m 月
:return:
"""
return calendar.monthrange(y, m)[1]
def weekday(date_str: str, fmt='yyyy-MM-dd') -> int:
"""
返回日期是周几
:param date_str: 年
:param fmt: 格式化
:return: 0-7 ,0:周一
"""
fmted_date = format_datetime_str(date_str, fmt)
return calendar.weekday(fmted_date.year, fmted_date.month, fmted_date.day)
def age(birth: str, compare_date: str, fmt='yyyy-MM-dd') -> int:
"""
年龄计算
:param birth: 生日
:param compare_date: 被比较日期
:param fmt: 日期格式化 yyyy-MM-dd|yyyyMMdd
:return:
"""
fmt_birth = format_date_str(birth, fmt)
fmt_compare = format_date_str(compare_date, fmt)
birth_m = fmt_birth.replace(year=fmt_compare.year)
return fmt_compare.year - fmt_birth.year if fmt_compare > birth_m else fmt_compare.year - fmt_birth.year - 1
def age_of_now(birth: str) -> int:
"""
当前年龄
:param birth: 出生日期
:return:
"""
return age(birth, current_datetime('yyyy-MM-dd'))
def between(dt_1: str, dt_2: str, fmt='yyyy-MM-dd', time_unit='day') -> int:
"""
计算两个时间差
:param dt_1: 时间1 日期或日期时间
:param dt_2: 时间2 日期或日期时间
:param fmt: 格式化
:param time_unit: 时间单位 day: 天,hour 小时 ,minute分钟,second
:return:
"""
fmt_dt1, fmt_dt2 = format_datetime_str(dt_1, fmt), format_datetime_str(dt_2, fmt)
return abs({
'day': (fmt_dt1 - fmt_dt2).days,
'hour': int(fmt_dt1.timestamp() - fmt_dt2.timestamp()) / ONE_HOUR_SECOND,
'second': int(fmt_dt1.timestamp() - fmt_dt2.timestamp())
}[time_unit])
def time_offset(start_dt: str, offset: int, fmt='yyyy-MM-dd HH:mm:ss', time_unit='day') -> datetime:
"""
时间偏移计算计算相隔N天后的日期
:param start_dt: 开始日期(日期时间)
:param fmt: 时间格式化
:param offset: 偏移量,支持正负数
:param time_unit: 偏移量单位
:return:
"""
fmt_dt = format_datetime_str(start_dt, fmt)
return {'day': fmt_dt + timedelta(days=offset),
'hour': fmt_dt + timedelta(hours=offset),
'second': fmt_dt + timedelta(seconds=offset)}[time_unit]
def is_am(dt: str, fmt='yyyy-MM-dd HH:mm:ss') -> bool:
"""
是否上午
:param dt:
:param fmt:
:return:
"""
fmt_dt = format_datetime_str(dt, fmt)
return fmt_dt.hour <= AM_PM_DIV
def is_pm(dt: str, fmt='yyyy-MM-dd HH:mm:ss') -> bool:
"""
是否下午
:param dt:
:param fmt:
:return:
"""
fmt_dt = format_datetime_str(dt, fmt)
return fmt_dt.hour > AM_PM_DIV
def next_week(fmt='yyyy-MM-dd HH:mm:ss') -> datetime:
"""
下周同一天
:return:
"""
now = current_datetime(fmt)
return time_offset(now, ONE_WEEK_DAYS, fmt)
def next_month() -> datetime:
"""
下个月同一天
:return:
"""
return datetime.now().replace(month=current_month() + 1)
def convert_time(t, from_unit, to_unit):
"""
TODO 时间转换
:param t:
:param from_unit:
:param to_unit:
:return:
"""
pass
class StopWatch(object):
"""
简单计时器实现
"""
class TaskInfo(object):
def __init__(self, task_name, start_nanos):
# 任务名称
self.__task_name = task_name
# 开始时间
self.__start_nanos = start_nanos
# 运行状态
self.__task_running = True
# 时间花费
self.__interval_nanos = 0
@property
def task_name(self):
return self.__task_name
@property
def start_nanos(self):
return self.__start_nanos
@start_nanos.setter
def start_nanos(self, value):
self.__start_nanos = value
@task_name.setter
def task_name(self, value):
self.__task_name = value
@property
def interval_nanos(self):
return self.__interval_nanos
def set_task_stop(self, time_nanos):
self.__interval_nanos = time_nanos
self.__task_running = False
@property
def task_running(self):
return self.__task_running
@task_running.setter
def task_running(self, values=False):
self.__task_running = values
class StopWatchError(Exception):
def __init__(self, msg):
Exception.__init__(self, msg)
def __init__(self):
self.task_queue = {}
def _task_exist(self, task_name):
"""
判断任务是否存在
:param task_name:
:return:
"""
return task_name in self.task_queue.keys()
def start(self, task_name):
if self._task_exist(task_name):
raise self.StopWatchError(f'{task_name} already running')
else:
self.task_queue[task_name] = self.TaskInfo(task_name, current_time(fmt='ns'))
def stop(self, task_name, time_unit='s'):
if not self._task_exist(task_name):
raise self.StopWatchError(f'{task_name} not running')
else:
task = self.task_queue[task_name]
interval_time_nanos = current_time(fmt='ns') - task.start_nanos
# 设置执行时间及修改任务状态
task.set_task_stop(time_nanos=interval_time_nanos)
return {
'ns': interval_time_nanos,
'ms': interval_time_nanos / NANOSECOND_2_MILLISECOND,
's': interval_time_nanos // NANOSECOND_2_SECOND,
'm': Decimal(interval_time_nanos / NANOSECOND_2_MINUTE).quantize(Decimal("0.01"),
rounding="ROUND_HALF_UP")
}[time_unit]
def all_task(self):
"""
返回总任务数
:return:
"""
return len(self.task_queue)
def pretty_print(self, task_name):
"""
格式化输出控制台
:return:
"""
if not self._task_exist(task_name):
raise self.StopWatchError(f'{task_name} not running')
# 判断任务状态
task = self.task_queue[task_name]
if task.task_running:
self.stop(task_name)
print(f'[{task_name}] running time = {task.interval_nanos // NANOSECOND_2_SECOND} second')
def clear(self):
"""
清空任务
:return:
"""
self.task_queue = {}