Last active
May 28, 2025 05:23
-
-
Save wassname2/3abd785602293c166c688b2ceff7102b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import os | |
from anycache import anycache | |
from loguru import logger | |
import time | |
def chunking_by_time(startTime: pd.Timestamp, endTime: pd.Timestamp, freqs=['YE', 'ME', 'D']): | |
""" | |
How do we cache timespans? We want to take year chunks, then for the remainder months, and so on. | |
That way old data is cached in big chunks, and new data is rechunked as needed | |
```py | |
startTime = pd.to_datetime('2023-01-02 01:01') | |
endTime = pd.to_datetime('2024-04-06 05:55') | |
chunks = chunking_by_time(startTime, endTime, ['Y', 'M', 'D']) | |
[Timestamp('2023-01-02 01:01:00'), | |
Timestamp('2023-12-31 00:00:00'), | |
Timestamp('2024-01-31 00:00:00'), | |
Timestamp('2024-02-29 00:00:00'), | |
Timestamp('2024-03-31 00:00:00'), | |
Timestamp('2024-04-01 00:00:00'), | |
Timestamp('2024-04-02 00:00:00'), | |
Timestamp('2024-04-03 00:00:00'), | |
Timestamp('2024-04-04 00:00:00'), | |
Timestamp('2024-04-05 00:00:00'), | |
Timestamp('2024-04-06 00:00:00'), | |
Timestamp('2024-04-06 01:01:00')] | |
""" | |
# Create date range | |
startTime = pd.to_datetime(startTime) | |
endTime = pd.to_datetime(endTime) | |
date_range = pd.date_range(startTime, endTime) | |
# Create DataFrame | |
df = pd.DataFrame(date_range, columns=['date'], index=date_range) | |
groupers = [pd.Grouper(key='date', freq=f) for f in freqs] | |
groups = [startTime, endTime] # start with our first and last | |
for g in groupers: | |
grouped = df.groupby(g) | |
gs = list(dict(list(grouped)).keys()) | |
# only take the ones that are before our last ts | |
gs = [ts for ts in gs if ts<=endTime | |
and ts>=startTime] | |
groups += gs | |
df = df.loc[groups[-1]:] | |
gs = [ts for ts in gs if (ts<=df.index.max()) & (ts>=df.index.min())] | |
# dedup | |
groups = sorted(set(groups)) | |
return groups | |
@anycache('.anycache') | |
def cached_getTagDataV2(*args, startTime, endTime, **kwargs): | |
df = get_data(*args, startTime=startTime, endTime=endTime, **kwargs) | |
df['time'] = pd.to_datetime(df['time']) | |
df = df.set_index('time') | |
# don't cache incomplete or empty data | |
# FIXME df.time[0] is end of first period | |
# assert startTime == df.time[0], f"startTime {startTime} != {df.time[0]}: df.time[0]" | |
assert endTime == df.index[-1], f"endTime {endTime} != {df.index[-1]}: df.time[-1]" | |
return df | |
def paginateTagDataV2(tags: dict, startTime: pd.Timestamp, endTime: pd.Timestamp, **kwargs) -> dict: | |
dfs = [] | |
bins = chunking_by_time(startTime, endTime) | |
# bins = pd.date_range(startTime, endTime, freq=time_chunks) | |
# bins = sorted(set([startTime]+list(bins)+[endTime])) | |
for startTime, endTime in zip(bins[:-1], bins[1:]): | |
logger.debug(f"paging data from {startTime} to {endTime}") | |
df = cached_getTagDataV2( | |
tags, | |
startTime=startTime, | |
endTime=endTime, | |
**kwargs) | |
dfs.append(df) | |
df = pd.concat(dfs) | |
# df.drop_duplicates(keep='last', inplace=True) | |
# df = df.sort_values(by=['time', 'tag']).reset_index(drop=True) | |
return df | |
if __name__ == "__main__": | |
tags = { | |
"g4.H": "a", | |
"g4.H": "b~MED_60Min", | |
} | |
t1a = time.time() | |
data = get_data( | |
tags = tags, | |
startTime = "2020-01-01 00:00:00", | |
endTime = "2021-01-01 02:00:00" | |
) | |
data['time'] = pd.to_datetime(data['time']) | |
data = data.set_index('time') | |
logger.info(f"data took {time.time()-t1a:.2f} seconds, size {data.shape}") | |
# FIXME the problem here is that the remainer is under the data freq | |
t1a = time.time() | |
data2 = paginateTagDataV2( | |
tags = tags, | |
startTime = "2020-01-01 00:00:00", | |
endTime = "2021-01-01 02:00:00" | |
) | |
logger.info(f"data2 took {time.time()-t1a:.2f} seconds, size {data2.shape}") | |
t2a = time.time() | |
data3 = paginateTagDataV2( | |
tags = tags, | |
startTime = "2020-01-01 00:00:00", | |
endTime = "2021-02-01 03:00:00" | |
) | |
logger.info(f"data3 took {time.time()-t2a:.2f} seconds, size {data3.shape}") | |
# FIXME the problem here is that the remainer is under the data freq | |
t1a = time.time() | |
data4 = paginateTagDataV2( | |
tags = tags, | |
startTime = "2020-01-01 00:00:00", | |
endTime = "2021-01-01 00:10:00" | |
) | |
logger.info(f"data4 took {time.time()-t1a:.2f} seconds, size {data4.shape}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment