Skip to content

Instantly share code, notes, and snippets.

@8one6
Created January 19, 2014 15:36
Show Gist options
  • Save 8one6/8506455 to your computer and use it in GitHub Desktop.
Save 8one6/8506455 to your computer and use it in GitHub Desktop.
A review of various functions for computing drawdowns and rolling drawdowns of a given 1-d data structure (numpy array or pandas series).
Display the source blob
Display the rendered blob
Raw
{
"metadata": {
"name": "Drawdown functions"
},
"nbformat": 3,
"nbformat_minor": 0,
"worksheets": [
{
"cells": [
{
"cell_type": "code",
"collapsed": false,
"input": "import numpy as np\nimport pandas as pd\nfrom numpy.lib.stride_tricks import as_strided\n\ndef gen_series(n):\n return pd.Series(np.random.randn(n)).cumsum()",
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 1
},
{
"cell_type": "code",
"collapsed": false,
"input": "def dd_1(ser):\n # naively loop over all possible start and end points\n # O(n^2)\n servals = ser.values\n drawdowns = []\n for end in range(len(servals)):\n for start in range(end):\n drawdowns.append(servals[end] - servals[start])\n return min(min(drawdowns), 0)\n\ndef dd_2(ser):\n # only compare each point to the previous running peak\n # O(N)\n running_max = pd.expanding_max(ser)\n cur_dd = ser - running_max\n return min(0, cur_dd.min())",
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 2
},
{
"cell_type": "code",
"collapsed": false,
"input": "anomalies = []\nfor i in range(100):\n ser = gen_series(1000)\n answers = pd.Series([dd_1(ser), dd_2(ser)])\n diffs = answers - answers[0]\n max_abs_diff = diffs.abs().max()\n if max_abs_diff != 0:\n anomalies.append(ser)\n \nlen(anomalies)",
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 3,
"text": "0"
}
],
"prompt_number": 3
},
{
"cell_type": "code",
"collapsed": false,
"input": "ser = gen_series(10)\nprint 'n = 10'\n%timeit dd_1(ser)\n%timeit dd_2(ser)\n\nser = gen_series(100)\nprint 'n = 100'\n%timeit dd_1(ser)\n%timeit dd_2(ser)\n\nser = gen_series(1000)\nprint 'n = 1000'\n%timeit dd_1(ser)\n%timeit dd_2(ser)\n\nser = gen_series(10000)\nprint 'n = 10000'\n%timeit dd_1(ser)\n%timeit dd_2(ser)",
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": "n = 10\n10000 loops, best of 3: 31.9 \u00b5s per loop"
},
{
"output_type": "stream",
"stream": "stdout",
"text": "\n10000 loops, best of 3: 125 \u00b5s per loop"
},
{
"output_type": "stream",
"stream": "stdout",
"text": "\nn = 100\n100 loops, best of 3: 3.1 ms per loop"
},
{
"output_type": "stream",
"stream": "stdout",
"text": "\n10000 loops, best of 3: 123 \u00b5s per loop"
},
{
"output_type": "stream",
"stream": "stdout",
"text": "\nn = 1000\n1 loops, best of 3: 290 ms per loop"
},
{
"output_type": "stream",
"stream": "stdout",
"text": "\n10000 loops, best of 3: 154 \u00b5s per loop"
},
{
"output_type": "stream",
"stream": "stdout",
"text": "\nn = 10000\n1 loops, best of 3: 33 s per loop"
},
{
"output_type": "stream",
"stream": "stdout",
"text": "\n1000 loops, best of 3: 441 \u00b5s per loop"
},
{
"output_type": "stream",
"stream": "stdout",
"text": "\n"
}
],
"prompt_number": 4
},
{
"cell_type": "code",
"collapsed": false,
"input": "def dd_3(ser):\n if isinstance(ser, pd.Series):\n ser = ser.values\n running_global_peak = ser[0]\n min_since_global_peak = ser[0]\n running_max_dd = 0\n running_global_peak_id = np.nan\n running_max_dd_peak_id = np.nan\n running_max_dd_trough_id = np.nan\n\n for i, val in enumerate(ser):\n if val >= running_global_peak:\n running_global_peak = val\n running_global_peak_id = i\n min_since_global_peak = val\n if val < min_since_global_peak:\n min_since_global_peak = val\n if val - running_global_peak <= running_max_dd:\n running_max_dd = val - running_global_peak\n running_max_dd_peak_id = running_global_peak_id\n running_max_dd_trough_id = i\n return (running_max_dd, running_max_dd_peak_id, running_max_dd_trough_id, running_global_peak_id)",
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 5
},
{
"cell_type": "code",
"collapsed": false,
"input": "anomalies = []\nfor i in range(100):\n ser = gen_series(1000)\n answers = pd.Series([dd_2(ser), dd_3(ser)[0]])\n diffs = answers - answers[0]\n max_abs_diff = diffs.abs().max()\n if max_abs_diff != 0:\n anomalies.append(ser)\n \nlen(anomalies)",
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 6,
"text": "0"
}
],
"prompt_number": 6
},
{
"cell_type": "code",
"collapsed": false,
"input": "ser = gen_series(100)\nprint 'n = 100'\n%timeit dd_2(ser)\n%timeit dd_3(ser)\n\nser = gen_series(1000)\nprint 'n = 1000'\n%timeit dd_2(ser)\n%timeit dd_3(ser)\n\nser = gen_series(10000)\nprint 'n = 10000'\n%timeit dd_2(ser)\n%timeit dd_3(ser)",
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": "n = 100\n10000 loops, best of 3: 118 \u00b5s per loop"
},
{
"output_type": "stream",
"stream": "stdout",
"text": "\n10000 loops, best of 3: 41.3 \u00b5s per loop"
},
{
"output_type": "stream",
"stream": "stdout",
"text": "\nn = 1000\n10000 loops, best of 3: 159 \u00b5s per loop"
},
{
"output_type": "stream",
"stream": "stdout",
"text": "\n1000 loops, best of 3: 351 \u00b5s per loop"
},
{
"output_type": "stream",
"stream": "stdout",
"text": "\nn = 10000\n1000 loops, best of 3: 436 \u00b5s per loop"
},
{
"output_type": "stream",
"stream": "stdout",
"text": "\n100 loops, best of 3: 2.41 ms per loop"
},
{
"output_type": "stream",
"stream": "stdout",
"text": "\n"
}
],
"prompt_number": 7
},
{
"cell_type": "code",
"collapsed": false,
"input": "def rolling_dd_using_dd1(ser, win):\n return pd.rolling_apply(ser, win, dd_2, min_periods=0)\n\ndef rolling_dd_custom(ser, window):\n index = ser.index\n name = ser.name\n ser = ser.values\n \n n = len(ser)\n result = np.zeros((n, 4))\n running_global_peak = ser[0]\n min_since_global_peak = ser[0]\n running_max_dd = 0\n running_global_peak_id = np.nan\n running_max_dd_peak_id = np.nan\n running_max_dd_trough_id = np.nan\n for i, val in enumerate(ser):\n if i < window:\n if val >= running_global_peak:\n running_global_peak = val\n running_global_peak_id = i\n min_since_global_peak = val\n if val < min_since_global_peak:\n min_since_global_peak = val\n if val - running_global_peak <= running_max_dd:\n running_max_dd = val - running_global_peak\n running_max_dd_peak_id = running_global_peak_id\n running_max_dd_trough_id = i\n result[i, :] = np.array(\n (running_max_dd, \n running_max_dd_peak_id, \n running_max_dd_trough_id, \n running_global_peak_id)\n )\n else:\n # current window max index is i\n # current window min index is i-window+1\n # possible difficult situations\n # 1) previous running_global_peak falls out of early edge of sample\n prob_1 = result[i-1, 3]<=i-window\n # 2) previous running_max_peak_id falls out of early edge of sample\n prob_2 = result[i-1, 1]<=i-window\n if prob_1 or prob_2:\n rolling_window = ser[i-window+1: i+1]\n intermed = dd_3(rolling_window)\n result[i, :] = np.array((intermed[0],\n intermed[1] + i - window + 1,\n intermed[2] + i - window + 1,\n intermed[3] + i - window + 1))\n else:\n # the new windowed global peak is straightforward:\n result[i, 3] = i if ser[i]>=ser[result[i-1,3]] else result[i-1,3]\n # one candidate is previous row's drawdown. other candidate is from previous row's\n # running_global_peak to the newly entered value\n new_candidate_drawdown = val - ser[result[i-1, 3]]\n if new_candidate_drawdown <= result[i-1, 0]:\n result[i, 0] = new_candidate_drawdown\n result[i, 1] = result[i-1, 3]\n result[i, 2] = i\n else:\n result[i, 0] = result[i-1, 0]\n result[i, 1] = result[i-1, 1]\n result[i, 2] = result[i-1, 2]\n \n result = result[:, 0]\n result = pd.Series(result, name=name, index=index)\n return result",
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 8
},
{
"cell_type": "code",
"collapsed": false,
"input": "def windowed_view(x, window_size):\n \"\"\"Creat a 2d windowed view of a 1d array.\n\n `x` must be a 1d numpy array.\n\n `numpy.lib.stride_tricks.as_strided` is used to create the view.\n The data is not copied.\n\n Example:\n\n >>> x = np.array([1, 2, 3, 4, 5, 6])\n >>> windowed_view(x, 3)\n array([[1, 2, 3],\n [2, 3, 4],\n [3, 4, 5],\n [4, 5, 6]])\n \"\"\"\n y = as_strided(x, shape=(x.size - window_size + 1, window_size),\n strides=(x.strides[0], x.strides[0]))\n return y\n\ndef rolling_max_dd_so(x, window_size, min_periods=1):\n \"\"\"Compute the rolling maximum drawdown of `x`.\n\n `x` must be a 1d numpy array.\n `min_periods` should satisfy `1 <= min_periods <= window_size`.\n\n Returns an 1d array with length `len(x) - min_periods + 1`.\n \"\"\"\n if min_periods < window_size:\n pad = np.empty(window_size - min_periods)\n pad.fill(x[0])\n x = np.concatenate((pad, x))\n y = windowed_view(x, window_size)\n running_max_y = np.maximum.accumulate(y, axis=1)\n dd = y - running_max_y\n return dd.min(axis=1)\n\ndef rolling_max_dd_so_wrapped(ser, window_size, min_periods=1):\n vals = ser.values\n rmdd = rolling_max_dd_so(vals, window_size, min_periods)\n return pd.Series(data=rmdd, index=ser.index, name=ser.name)",
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 9
},
{
"cell_type": "code",
"collapsed": false,
"input": "ser = gen_series(1000)\nprint 'n=1000, w=10'\n%timeit rolling_dd_using_dd1(ser, 10)\n%timeit rolling_dd_custom(ser, 10)\n%timeit rolling_max_dd_so_wrapped(ser, 10)\n\nser = gen_series(1000)\nprint 'n=1000, w=100'\n%timeit rolling_dd_using_dd1(ser, 100)\n%timeit rolling_dd_custom(ser, 100)\n%timeit rolling_max_dd_so_wrapped(ser, 100)\n\nser = gen_series(10000)\nprint 'n=10000, w=100'\n%timeit rolling_dd_using_dd1(ser, 100)\n%timeit rolling_dd_custom(ser, 100)\n%timeit rolling_max_dd_so_wrapped(ser, 100)\n\nser = gen_series(10000)\nprint 'n=10000, w=1000'\n%timeit rolling_dd_using_dd1(ser, 1000)\n%timeit rolling_dd_custom(ser, 1000)\n%timeit rolling_max_dd_so_wrapped(ser, 1000)",
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": "n=1000, w=10\n10 loops, best of 3: 27.4 ms per loop"
},
{
"output_type": "stream",
"stream": "stdout",
"text": "\n100 loops, best of 3: 12.3 ms per loop"
},
{
"output_type": "stream",
"stream": "stdout",
"text": "\n1000 loops, best of 3: 209 \u00b5s per loop"
},
{
"output_type": "stream",
"stream": "stdout",
"text": "\nn=1000, w=100\n10 loops, best of 3: 30.3 ms per loop"
},
{
"output_type": "stream",
"stream": "stdout",
"text": "\n100 loops, best of 3: 10.2 ms per loop"
},
{
"output_type": "stream",
"stream": "stdout",
"text": "\n1000 loops, best of 3: 1.09 ms per loop"
},
{
"output_type": "stream",
"stream": "stdout",
"text": "\nn=10000, w=100\n1 loops, best of 3: 299 ms per loop"
},
{
"output_type": "stream",
"stream": "stdout",
"text": "\n10 loops, best of 3: 118 ms per loop"
},
{
"output_type": "stream",
"stream": "stdout",
"text": "\n100 loops, best of 3: 11.2 ms per loop"
},
{
"output_type": "stream",
"stream": "stdout",
"text": "\nn=10000, w=1000\n1 loops, best of 3: 421 ms per loop"
},
{
"output_type": "stream",
"stream": "stdout",
"text": "\n10 loops, best of 3: 130 ms per loop"
},
{
"output_type": "stream",
"stream": "stdout",
"text": "\n10 loops, best of 3: 152 ms per loop"
},
{
"output_type": "stream",
"stream": "stdout",
"text": "\n"
}
],
"prompt_number": 10
},
{
"cell_type": "code",
"collapsed": false,
"input": "anomalies = []\nfor i in range(100):\n ser = gen_series(1000)\n one = rolling_dd_using_dd1(ser, 50)\n two = rolling_dd_custom(ser, 50)\n three = rolling_max_dd_so_wrapped(ser, 50)\n df = pd.concat([one, two, three], axis=1)\n check = df.sub(df.iloc[:, 0], axis=0).abs().sum(1).max()\n if check > 0:\n anomalies.append(ser)\n \nlen(anomalies)",
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 11,
"text": "0"
}
],
"prompt_number": 11
}
],
"metadata": {}
}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment