有一个列表,里面元素是 dict
dict 有个 key 是表示日期的
现在知道开始和结束日期,按天计算
希望把列表中缺少的日期补充完成
比如:
开始日期是 2022-06-19,结束日期是 2022-06-22
[{"dt": "2022-06-20","key": 33}, {"dt": "2022-06-22","key": 45}]
这里就缺少了 2022-06-19 和 2022-06-21 的数据
需要把这个两个日期补上
先根据开始和结束日期生成一个日期列表
创建一个新的列表
然后遍历日期列表,
- 如果日期存在,就取现有的元素,放入新列表
- 否则就生成一个该日期的默认值
import time
import datetime
import copy
import arrow
from typing import List
def timestamp2datetime(timestamp, fmt='%Y-%m-%d %H:%M:%S') -> str:
"""timestamp to date string"""
return time.strftime(fmt, time.localtime(timestamp))
def get_full_dt_sequence(
*,
start_ts: int, # timestamp, ms
end_ts: int, # timestamp, ms
source: List[dict],
date_key: str,
date_fmt: str = '%Y-%m-%d',
) -> List[dict]:
"""
>>> s = [{'dt': '2020-01-02', 'value': 1}, {'dt': '2020-01-04', 'value': 2}]
>>> get_full_dt_sequence(start_ts=1577808000000, end_ts=1578182400000, source=s, date_key='dt')
[{'dt': '2020-01-01', 'value': 0}, {'dt': '2020-01-02', 'value': 1}, {'dt': '2020-01-03', 'value': 0}, {'dt': '2020-01-04', 'value': 2}, {'dt': '2020-01-05', 'value': 0}]
>>> s = [{'dt': '2020-01-02', 'value': 1}, {'dt': '2020-01-04', 'value': 2}]
>>> get_full_dt_sequence(start_ts=None, end_ts=None, source=s, date_key='dt')
[{'dt': '2020-01-02', 'value': 1}, {'dt': '2020-01-03', 'value': 0}, {'dt': '2020-01-04', 'value': 2}]
"""
if not source:
return source
if start_ts is None:
start_ts: int = datestr2timestamp(source[0][date_key], fmt=date_fmt, ts='ms')
if end_ts is None:
end_ts: int = datestr2timestamp(source[-1][date_key], fmt=date_fmt, ts='ms')
start_dt = arrow.get(timestamp2datetime(start_ts / 1000))
end_dt = arrow.get(timestamp2datetime(end_ts / 1000))
date_range: List[str] = [str(arrow_obj.date()) for arrow_obj in
arrow.Arrow.range('day', start=start_dt, end=end_dt)]
default_item: dict = copy.deepcopy(source[0])
# set default value
for k in default_item.keys():
if k != date_key:
default_item[k] = 0
new_list: List[dict] = []
source_date_list: List[str] = [str(item[date_key]) for item in source]
for date in date_range:
if date in source_date_list:
# use source data
item: dict = source[source_date_list.index(date)]
new_list.append(item)
else:
# use default value
default_item_copy = copy.deepcopy(default_item)
default_item_copy[date_key] = date
new_list.append(default_item_copy)
return new_list
if __name__ == '__main__':
import doctest
doctest.testmod()