Skip to content

Instantly share code, notes, and snippets.

@twiecki
Created July 11, 2016 13:43
Show Gist options
  • Save twiecki/d5f86b28e3f0304a76b6599a7a164bdd to your computer and use it in GitHub Desktop.
Save twiecki/d5f86b28e3f0304a76b6599a7a164bdd to your computer and use it in GitHub Desktop.
Rolling window and rolling apply with flexible lookback and resample behavior that also support parallelism.
import pandas as pd
from pandas.tseries.offsets import *
def rolling_window(data, resample='1BM', lookback=12 * BMonthEnd()):
dts = data.resample(resample).index
if lookback == 'aggregate':
for dt in dts:
yield data.loc[:dt]
else:
dts = dts[dts > data.index[0] + lookback]
for dt in dts:
yield data.loc[dt-lookback:dt]
def rolling_apply(func, data, resample='1BM', lookback=12 * BMonthEnd(), column_wise=False,
view=None, args=(), kwargs={}):
if column_wise:
return pd.concat([rolling_apply(func, data[col], resample=resample,
lookback=lookback, column_wise=False)
for col in data
])
else:
if view is None:
return pd.concat({data_window.index[-1]: func(data_window, *args, **kwargs)
for data_window in rolling_window(data, resample=resample, lookback=lookback)
}, axis=1).T
else:
workers = {}
for data_window in rolling_window(data, resample=resample, lookback=lookback):
workers[data_window.index[-1]] = view.apply_async(func, data_window, *args, **kwargs)
return pd.concat({dt: worker.get() for dt, worker in workers.iteritems()}, axis=1).T
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment