You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

405 lines
10 KiB

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2022/9/10 10:17
# @Author : old tom
# @File : fu_date.py
# @Project : Futool
# @Desc : 日期时间工具
import time
from datetime import datetime, date, timedelta
import calendar
from decimal import Decimal
# 纳秒到毫秒
NANOSECOND_2_MILLISECOND = 1000000
# 纳秒到秒
NANOSECOND_2_SECOND = 1000000000
# 纳秒到分钟
NANOSECOND_2_MINUTE = 60000000000
# 毫秒单位
MILLISECOND_UNIT = 1000
# 1小时3600秒
ONE_HOUR_SECOND = 3600
# 上午下午分界
AM_PM_DIV = 11
# 一周7天
ONE_WEEK_DAYS = 7
# 日期时间格式化转换
FMT_MAPPING = {
'yyyy-MM-dd': '%Y-%m-%d',
'yyyyMMdd': '%Y%m%d',
'yyyy-MM-dd hh:mm:ss': '%Y-%m-%d %I:%M:%S',
'yyyy-MM-dd hh24:mm:ss': '%Y-%m-%d %H:%M:%S',
'yyyy-MM-dd HH:mm:ss': '%Y-%m-%d %H:%M:%S'
}
def current_year() -> int:
"""
本年
:return:
"""
return datetime.now().year
def current_month() -> int:
"""
本月
:return:
"""
return datetime.now().month
def current_day() -> int:
return datetime.now().day
def current_date() -> str:
return str(datetime.now().date())
def current_datetime(fmt='yyyy-MM-dd hh24:mm:ss') -> str:
"""
获取当前日期时间
:param fmt:
:return:
"""
return datetime.now().strftime(FMT_MAPPING[fmt])
def current_time(fmt='s') -> int:
"""
获取当前时间绝对秒
:param fmt: s: ms:毫秒 ns:纳秒
:return:
"""
time_jar = {
's': int(time.time()),
'ms': int(time.time() * MILLISECOND_UNIT),
'ns': time.time_ns()
}
return time_jar[fmt]
def current_timestamp():
"""
当前时间戳
:return:
"""
return datetime.now().timestamp()
def format_datetime_str(dt: str, fmt='yyyy-MM-dd hh24:mm:ss') -> datetime:
"""
格式化日期时间字符串
:param dt: 日期时间字符串
:param fmt: 格式化yyyy-MM-dd
:return: datetime对象
"""
return datetime.strptime(dt, FMT_MAPPING[fmt])
def format_date_str(dt: str, fmt='yyyy-MM-dd') -> date:
"""
格式化日期字符串
:param dt: 日期字符串
:param fmt: yyyy-MM-dd
:return:
"""
return datetime.strptime(dt, FMT_MAPPING[fmt]).date()
def datetime_2_second(dt: str, fmt='yyyy-MM-dd hh24:mm:ss') -> int:
"""
日期时间字符串转绝对秒
:param dt: 日期时间字符串 yyyy-MM-dd hh:mm:ss 格式
:param fmt: 格式化函数
:return:
"""
return int(format_datetime_str(dt, fmt).timestamp())
def sec_2_datatime(sec_time: int, fmt='yyyy-MM-dd hh24:mm:ss') -> str:
"""
绝对秒转日期时间
:param sec_time:
:param fmt: 格式化函数
:return:
"""
timed = time.localtime(sec_time)
return time.strftime(FMT_MAPPING[fmt], timed)
def is_leap(year: int) -> bool:
"""
是否闰年
:param year:
:return:
"""
return calendar.isleap(year)
def begin_of_week(date_str: str, fmt='yyyy-MM-dd') -> str:
"""
周开始日期
:param date_str:
:param fmt: 格式化
:return:
"""
formated_dt = format_date_str(date_str, fmt)
week_idx = weekday(date_str, fmt)
return date_str if week_idx == 0 else str(formated_dt - timedelta(days=week_idx))
def end_of_week(date_str: str, fmt='yyyy-MM-dd') -> str:
"""
周结束日期
:param date_str:
:param fmt: 格式化
:return:
"""
formated_dt = format_date_str(date_str, fmt)
week_idx = weekday(date_str, fmt)
return date_str if week_idx == 6 else str(formated_dt + timedelta(days=(6 - week_idx)))
def end_of_month(y, m) -> int:
"""
月结束日期
:param y
:param m
:return:
"""
return calendar.monthrange(y, m)[1]
def weekday(date_str: str, fmt='yyyy-MM-dd') -> int:
"""
返回日期是周几
:param date_str:
:param fmt: 格式化
:return: 0-7 ,0:周一
"""
fmted_date = format_datetime_str(date_str, fmt)
return calendar.weekday(fmted_date.year, fmted_date.month, fmted_date.day)
def age(birth: str, compare_date: str, fmt='yyyy-MM-dd') -> int:
"""
年龄计算
:param birth: 生日
:param compare_date: 被比较日期
:param fmt: 日期格式化 yyyy-MM-dd|yyyyMMdd
:return:
"""
fmt_birth = format_date_str(birth, fmt)
fmt_compare = format_date_str(compare_date, fmt)
birth_m = fmt_birth.replace(year=fmt_compare.year)
return fmt_compare.year - fmt_birth.year if fmt_compare > birth_m else fmt_compare.year - fmt_birth.year - 1
def age_of_now(birth: str) -> int:
"""
当前年龄
:param birth: 出生日期
:return:
"""
return age(birth, current_datetime('yyyy-MM-dd'))
def between(dt_1: str, dt_2: str, fmt='yyyy-MM-dd', time_unit='day') -> int:
"""
计算两个时间差
:param dt_1: 时间1 日期或日期时间
:param dt_2: 时间2 日期或日期时间
:param fmt: 格式化
:param time_unit: 时间单位 day: ,hour 小时 ,minute分钟,second
:return:
"""
fmt_dt1, fmt_dt2 = format_datetime_str(dt_1, fmt), format_datetime_str(dt_2, fmt)
return abs({
'day': (fmt_dt1 - fmt_dt2).days,
'hour': int(fmt_dt1.timestamp() - fmt_dt2.timestamp()) / ONE_HOUR_SECOND,
'second': int(fmt_dt1.timestamp() - fmt_dt2.timestamp())
}[time_unit])
def time_offset(start_dt: str, offset: int, fmt='yyyy-MM-dd HH:mm:ss', time_unit='day') -> datetime:
"""
时间偏移计算计算相隔N天后的日期
:param start_dt: 开始日期日期时间
:param fmt: 时间格式化
:param offset: 偏移量支持正负数
:param time_unit: 偏移量单位
:return:
"""
fmt_dt = format_datetime_str(start_dt, fmt)
return {'day': fmt_dt + timedelta(days=offset),
'hour': fmt_dt + timedelta(hours=offset),
'second': fmt_dt + timedelta(seconds=offset)}[time_unit]
def is_am(dt: str, fmt='yyyy-MM-dd HH:mm:ss') -> bool:
"""
是否上午
:param dt:
:param fmt:
:return:
"""
fmt_dt = format_datetime_str(dt, fmt)
return fmt_dt.hour <= AM_PM_DIV
def is_pm(dt: str, fmt='yyyy-MM-dd HH:mm:ss') -> bool:
"""
是否下午
:param dt:
:param fmt:
:return:
"""
fmt_dt = format_datetime_str(dt, fmt)
return fmt_dt.hour > AM_PM_DIV
def next_week(fmt='yyyy-MM-dd HH:mm:ss') -> datetime:
"""
下周同一天
:return:
"""
now = current_datetime(fmt)
return time_offset(now, ONE_WEEK_DAYS, fmt)
def next_month() -> datetime:
"""
下个月同一天
:return:
"""
return datetime.now().replace(month=current_month() + 1)
def convert_time(t, from_unit, to_unit):
"""
TODO 时间转换
:param t:
:param from_unit:
:param to_unit:
:return:
"""
pass
class StopWatch(object):
"""
简单计时器实现
"""
class TaskInfo(object):
def __init__(self, task_name, start_nanos):
# 任务名称
self.__task_name = task_name
# 开始时间
self.__start_nanos = start_nanos
# 运行状态
self.__task_running = True
# 时间花费
self.__interval_nanos = 0
@property
def task_name(self):
return self.__task_name
@property
def start_nanos(self):
return self.__start_nanos
@start_nanos.setter
def start_nanos(self, value):
self.__start_nanos = value
@task_name.setter
def task_name(self, value):
self.__task_name = value
@property
def interval_nanos(self):
return self.__interval_nanos
def set_task_stop(self, time_nanos):
self.__interval_nanos = time_nanos
self.__task_running = False
@property
def task_running(self):
return self.__task_running
@task_running.setter
def task_running(self, values=False):
self.__task_running = values
class StopWatchError(Exception):
def __init__(self, msg):
Exception.__init__(self, msg)
def __init__(self):
self.task_queue = {}
def _task_exist(self, task_name):
"""
判断任务是否存在
:param task_name:
:return:
"""
return task_name in self.task_queue.keys()
def start(self, task_name):
if self._task_exist(task_name):
raise self.StopWatchError(f'{task_name} already running')
else:
self.task_queue[task_name] = self.TaskInfo(task_name, current_time(fmt='ns'))
def stop(self, task_name, time_unit='s'):
if not self._task_exist(task_name):
raise self.StopWatchError(f'{task_name} not running')
else:
task = self.task_queue[task_name]
interval_time_nanos = current_time(fmt='ns') - task.start_nanos
# 设置执行时间及修改任务状态
task.set_task_stop(time_nanos=interval_time_nanos)
return {
'ns': interval_time_nanos,
'ms': interval_time_nanos / NANOSECOND_2_MILLISECOND,
's': interval_time_nanos // NANOSECOND_2_SECOND,
'm': Decimal(interval_time_nanos / NANOSECOND_2_MINUTE).quantize(Decimal("0.01"),
rounding="ROUND_HALF_UP")
}[time_unit]
def all_task(self):
"""
返回总任务数
:return:
"""
return len(self.task_queue)
def pretty_print(self, task_name):
"""
格式化输出控制台
:return:
"""
if not self._task_exist(task_name):
raise self.StopWatchError(f'{task_name} not running')
# 判断任务状态
task = self.task_queue[task_name]
if task.task_running:
self.stop(task_name)
print(f'[{task_name}] running time = {task.interval_nanos // NANOSECOND_2_SECOND} second')
def clear(self):
"""
清空任务
:return:
"""
self.task_queue = {}