Skip to content

Instantly share code, notes, and snippets.

@hhl60492
Created November 27, 2017 19:08
Show Gist options
  • Save hhl60492/e222bff9a4821c4c91a926f65bd7e630 to your computer and use it in GitHub Desktop.
Save hhl60492/e222bff9a4821c4c91a926f65bd7e630 to your computer and use it in GitHub Desktop.
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
# read in the csv
df = pd.read_csv("eth-cad-max.csv")
# get the prices col slice from df
prices = np.array(df['price'])
foo = []
sweep = 220
for i in range(sweep):
# pick a point in the time series to start
start = 600+i
holdout = 0.05 # 5 percent holdout set to test the Gaussian Process predictions
temp = prices[start:]
# compute an estimate of the Hurst exponent for a given time series,
# use Rescaled range method... though there are better methods out there
def find_range(x, chunks):
chunks = np.power(2, chunks)
# break array into (more or less even) chunks
chunks_size = []
s = int(len(x) / chunks)
for i in range(chunks):
chunks_size.append(s)
if(int(len(x)) % int(chunks) != 0):
mod = int(len(x)) % int(chunks)
while (mod > 0):
for i in range(len(chunks_size)):
if(mod == 0):
break
else:
chunks_size[i] = chunks_size[i]+1
mod = mod - 1
chunks_list = []
for i in range(len(chunks_size)):
if (i == 0):
chunks_list.append(x[0:(i + 1) * chunks_size[i]])
elif (i<len(chunks_size)):
chunks_list.append(x[i*chunks_size[i]:(i+1)*chunks_size[i]])
else:
break
chunks_H = []
chunks_list_deviations = []
# compute the means and deviations for each chunk
for i in range(len(chunks_list)):
m = np.average(chunks_list[i])
foo = []
for j in range(len(chunks_list[i])):
foo.append(chunks_list[i][j] - m)
chunks_list_deviations.append(foo)
chunks_list_deviations = chunks_list_deviations[0]
sum_deviations = []
for i in range(len(chunks_list_deviations)):
foo = 0
for j in range(i+1):
foo = foo + chunks_list_deviations[j]
sum_deviations.append(foo)
# now calculate the R/S for each chunk
for i in range(len(chunks_list)):
foo1 = np.ptp(sum_deviations)
foo2 = np.std(chunks_list[i])
chunks_H.append(foo1/foo2)
# average R/S and average size
avg_RS = np.average(chunks_H)
avg_size = np.average(chunks_size)
return [avg_RS, avg_size]
def Hurst_exponent(x, chunks = 4):
C = 1
H = 0
log_RS = []
log_size = []
for i in range(chunks):
[foo1, foo2] = find_range(x,i)
log_RS.append(np.log(foo1) - np.log(C))
log_size.append(np.log(foo2))
# do the linear fit of the log'ed vales here
fit = np.polyfit(log_size,log_RS,1)
#print(log_RS)
#print(log_size)
print(fit)
#plt.scatter(log_size, log_RS)
#plt.show()
return fit
print(i)
H = Hurst_exponent(temp)
if(H[0] < 1):
foo.append(H[0])
print("Average Hurst exponent")
print(np.average(foo))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment