Skip to content

Instantly share code, notes, and snippets.

@wassname2
Last active May 28, 2025 05:23
Show Gist options
  • Save wassname2/3abd785602293c166c688b2ceff7102b to your computer and use it in GitHub Desktop.
Save wassname2/3abd785602293c166c688b2ceff7102b to your computer and use it in GitHub Desktop.
import pandas as pd
import os
from anycache import anycache
from loguru import logger
import time
def chunking_by_time(startTime: pd.Timestamp, endTime: pd.Timestamp, freqs=['YE', 'ME', 'D']):
"""
How do we cache timespans? We want to take year chunks, then for the remainder months, and so on.
That way old data is cached in big chunks, and new data is rechunked as needed
```py
startTime = pd.to_datetime('2023-01-02 01:01')
endTime = pd.to_datetime('2024-04-06 05:55')
chunks = chunking_by_time(startTime, endTime, ['Y', 'M', 'D'])
[Timestamp('2023-01-02 01:01:00'),
Timestamp('2023-12-31 00:00:00'),
Timestamp('2024-01-31 00:00:00'),
Timestamp('2024-02-29 00:00:00'),
Timestamp('2024-03-31 00:00:00'),
Timestamp('2024-04-01 00:00:00'),
Timestamp('2024-04-02 00:00:00'),
Timestamp('2024-04-03 00:00:00'),
Timestamp('2024-04-04 00:00:00'),
Timestamp('2024-04-05 00:00:00'),
Timestamp('2024-04-06 00:00:00'),
Timestamp('2024-04-06 01:01:00')]
"""
# Create date range
startTime = pd.to_datetime(startTime)
endTime = pd.to_datetime(endTime)
date_range = pd.date_range(startTime, endTime)
# Create DataFrame
df = pd.DataFrame(date_range, columns=['date'], index=date_range)
groupers = [pd.Grouper(key='date', freq=f) for f in freqs]
groups = [startTime, endTime] # start with our first and last
for g in groupers:
grouped = df.groupby(g)
gs = list(dict(list(grouped)).keys())
# only take the ones that are before our last ts
gs = [ts for ts in gs if ts<=endTime
and ts>=startTime]
groups += gs
df = df.loc[groups[-1]:]
gs = [ts for ts in gs if (ts<=df.index.max()) & (ts>=df.index.min())]
# dedup
groups = sorted(set(groups))
return groups
@anycache('.anycache')
def cached_getTagDataV2(*args, startTime, endTime, **kwargs):
df = get_data(*args, startTime=startTime, endTime=endTime, **kwargs)
df['time'] = pd.to_datetime(df['time'])
df = df.set_index('time')
# don't cache incomplete or empty data
# FIXME df.time[0] is end of first period
# assert startTime == df.time[0], f"startTime {startTime} != {df.time[0]}: df.time[0]"
assert endTime == df.index[-1], f"endTime {endTime} != {df.index[-1]}: df.time[-1]"
return df
def paginateTagDataV2(tags: dict, startTime: pd.Timestamp, endTime: pd.Timestamp, **kwargs) -> dict:
dfs = []
bins = chunking_by_time(startTime, endTime)
# bins = pd.date_range(startTime, endTime, freq=time_chunks)
# bins = sorted(set([startTime]+list(bins)+[endTime]))
for startTime, endTime in zip(bins[:-1], bins[1:]):
logger.debug(f"paging data from {startTime} to {endTime}")
df = cached_getTagDataV2(
tags,
startTime=startTime,
endTime=endTime,
**kwargs)
dfs.append(df)
df = pd.concat(dfs)
# df.drop_duplicates(keep='last', inplace=True)
# df = df.sort_values(by=['time', 'tag']).reset_index(drop=True)
return df
if __name__ == "__main__":
tags = {
"g4.H": "a",
"g4.H": "b~MED_60Min",
}
t1a = time.time()
data = get_data(
tags = tags,
startTime = "2020-01-01 00:00:00",
endTime = "2021-01-01 02:00:00"
)
data['time'] = pd.to_datetime(data['time'])
data = data.set_index('time')
logger.info(f"data took {time.time()-t1a:.2f} seconds, size {data.shape}")
# FIXME the problem here is that the remainer is under the data freq
t1a = time.time()
data2 = paginateTagDataV2(
tags = tags,
startTime = "2020-01-01 00:00:00",
endTime = "2021-01-01 02:00:00"
)
logger.info(f"data2 took {time.time()-t1a:.2f} seconds, size {data2.shape}")
t2a = time.time()
data3 = paginateTagDataV2(
tags = tags,
startTime = "2020-01-01 00:00:00",
endTime = "2021-02-01 03:00:00"
)
logger.info(f"data3 took {time.time()-t2a:.2f} seconds, size {data3.shape}")
# FIXME the problem here is that the remainer is under the data freq
t1a = time.time()
data4 = paginateTagDataV2(
tags = tags,
startTime = "2020-01-01 00:00:00",
endTime = "2021-01-01 00:10:00"
)
logger.info(f"data4 took {time.time()-t1a:.2f} seconds, size {data4.shape}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment