[docs]defget_start_datetime_from_range(range:str|None,*,now=None)->datetime|None:""" Generate a list of datetime objects from a range string. """ifnotrange:returnNonenow=nowordatetime.now(UTC)ifrange=="1h":start_datetime=now-timedelta(hours=1)elifrange=="24h":start_datetime=now-timedelta(hours=24)elifrange=="7d":start_datetime=now-timedelta(days=7)elifrange=="1m":start_datetime=now-timedelta(days=30)elifrange=="1y":start_datetime=now-timedelta(days=365)else:raiseValueError(f"Unknown range: {range}")returnstart_datetime
def_truncate_datetime_for_time_bucket(dt:datetime,time_bucket:str="day")->datetime:""" Truncate a datetime object to the given time bucket. """iftime_bucket=="day":returndt.replace(hour=0,minute=0,second=0,microsecond=0)eliftime_bucket=="hour":returndt.replace(minute=0,second=0,microsecond=0)eliftime_bucket=="minute":returndt.replace(second=0,microsecond=0)else:raiseValueError(f"Unknown time bucket: {time_bucket}")
[docs]defgenerate_continuous_time_range(discontinuous_transactions:List[TransactionsGroupedByTimeBucket],start_datetime=None,time_bucket:str="day",)->List[TransactionsGroupedByTimeBucket]:""" Generate a list of datetime objects from a range string. """# Generate a list of datetime objects from a range stringstart_datetime=start_datetimeifstart_datetimeelsediscontinuous_transactions[0]["datetime"]start_datetime=_truncate_datetime_for_time_bucket(start_datetime,time_bucket)end_datetime=datetime.now(UTC)delta=timedelta(days=1iftime_bucket=="day"else0,hours=1iftime_bucket=="hour"else0,minutes=1iftime_bucket=="minute"else0,)datetime_range=[start_datetime+i*deltaforiinrange(int((end_datetime-start_datetime)/delta)+1)]# Fill missing data pointscontinuous_transactions=[]fortindatetime_range:iftin[d["datetime"]fordindiscontinuous_transactions]:continuous_transactions.append([dfordindiscontinuous_transactionsifd["datetime"]==t][0])else:continuous_transactions.append({"datetime":t,"count":None,"errors":None,"cached":0,"meanDuration":None,"meanTpdex":None})returncontinuous_transactions