Created
November 19, 2013 17:18
-
-
Save raddy/7548933 to your computer and use it in GitHub Desktop.
cython sniper helper methods
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import numpy as np | |
cimport numpy as np | |
cimport cython | |
from bsm_implieds import bs_tv,delta_py | |
ctypedef np.double_t DTYPE_t | |
cdef extern from "math.h": | |
bint isnan(double x) | |
cdef inline double double_max(double a, double b): return a if a >= b else b | |
cdef inline double double_min(double a, double b): return a if a <= b else b | |
cdef inline double abz(double a) : return a if a >= 0. else -1 * a | |
cdef inline long labz(long a) : return a if a >= 0 else -1 * a | |
def find_triggers(np.ndarray[long,ndim=2] futs,long slug_size,double denominator): | |
cdef: | |
long trade_size,flen = futs.shape[0] | |
np.ndarray[long,ndim=1] prev_bbo = np.zeros(4,dtype=long) | |
np.ndarray[long, ndim=1] res = np.zeros(flen, dtype=long) | |
for i from 0<=i<flen: | |
if futs[i,8]>0 and prev_bbo[0]>0 and prev_bbo[2]>0: | |
trade_size = futs[i,9] | |
if labz(trade_size)>slug_size: #now just need to check prev bbo to make sure its > 50% | |
if trade_size>0: #unfortunately this is pita and requires a lot of checks | |
if futs[i,8]!=prev_bbo[2]: | |
print 'SERIOUS ASSUMPTION ERROR' | |
continue | |
if labz(trade_size)>=(prev_bbo[3]/denominator): #so trade was against prev offer and greater than 50% | |
res[i] = futs[i,8] | |
else: | |
if futs[i,8]!=prev_bbo[0]: | |
print 'SERIOUS ASSUMPTION ERROR' | |
continue | |
if labz(trade_size)>=(prev_bbo[1]/denominator): #so trade was against prev bid and greater than 50% | |
res[i] = -1 * futs[i,8] | |
prev_bbo[0] = futs[i,0] | |
prev_bbo[1] = futs[i,1] | |
prev_bbo[2] = futs[i,2] | |
prev_bbo[3] = futs[i,3] | |
return res | |
cdef int no_md(double bid1,double ask1): | |
return not (bid1>0 and ask1>0) | |
cdef int no_underlying(double und): | |
return not und>0 | |
#[bid1[i],bid1s[i],ask1[i],ask1s[i],vols[i],strikes[i],ttes[i],types[i]] | |
def cy_theo_dict(some_info_dict,spot,rf,edge,buy_trigger,score,fire_time): | |
keys = some_info_dict.keys() | |
res = dict() | |
for k in keys: | |
stuff = some_info_dict[k] | |
tv = bs_tv(spot,stuff[-3],stuff[-2],stuff[-4], rf, stuff[-1]) | |
mydelta = delta_py(spot,stuff[-3],stuff[-2],stuff[-4], rf, stuff[-1]) | |
if buy_trigger: | |
if stuff[-1] == 1 and tv>(stuff[2]+edge): #calls to buy | |
res[k] = (stuff[2],stuff[3],tv,mydelta,'buy_trigger',score,fire_time) | |
elif stuff[-1] == -1 and tv<(stuff[0]-edge): #puts to sell | |
res[k] = (stuff[0],stuff[1],tv,mydelta,'buy_trigger',score,fire_time) | |
else: | |
if stuff[-1] == 1 and tv<(stuff[0]-edge): #calls to sell | |
res[k] = (stuff[0],stuff[1],tv,mydelta,'sell_trigger',score,fire_time) | |
elif stuff[-1] == -1 and tv>(stuff[2]+edge): #puts to buy | |
res[k] = (stuff[2],stuff[3],tv,mydelta,'sell_trigger',score,fire_time) | |
return res | |
def top_of_book_fires(np.ndarray[long, ndim=1] times,np.ndarray[object, ndim=1] syms, np.ndarray[long, ndim=1] bid1,np.ndarray[long, ndim=1] bid1s, | |
np.ndarray[long, ndim=1] ask1,np.ndarray[long, ndim=1] ask1s, np.ndarray[DTYPE_t,ndim=1] basis,np.ndarray[DTYPE_t,ndim=1] vols, | |
np.ndarray[long,ndim=1] strikes,np.ndarray[DTYPE_t,ndim=1] ttes,np.ndarray[long,ndim=1] types,np.ndarray[long,ndim=1] totes_volume, | |
np.ndarray[long,ndim=1] triggers,np.ndarray[long,ndim=1] scores, double rf,double edge): | |
cdef: | |
long sym_len = len(syms) | |
object sym | |
dict last_info = {} | |
dict theos = {} | |
double und | |
dict res = {} | |
for i from 0 <= i < sym_len: | |
sym = syms[i] | |
if no_md(bid1[i],ask1[i]): | |
last_info[sym] = [np.NaN,np.NaN,np.NaN,np.NaN,np.NaN,np.NaN,np.NaN,np.NaN] | |
continue | |
if triggers[i]!=0: | |
und = basis[i]+labz(triggers[i]) | |
theos = cy_theo_dict(last_info,und,rf,edge,triggers[i]>0,scores[i],times[i]) | |
if len(theos.keys())>0: | |
res[totes_volume[i]] = theos | |
last_info[sym] = [bid1[i],bid1s[i],ask1[i],ask1s[i],vols[i],strikes[i],ttes[i],types[i]] | |
return res | |
def bid_ask_next_turns(np.ndarray[long,ndim=1] bids,np.ndarray[long,ndim=1] asks): | |
cdef: | |
long blen = bids.shape[0] | |
np.ndarray[long, ndim=1] res = np.zeros(blen, dtype=long) | |
long ref_bid,ref_ask | |
long i,j | |
long next_tick | |
for i from 0<=i<blen: | |
if not (bids[i]>0 and asks[i]>0): | |
continue | |
ref_bid = bids[i] | |
ref_ask = asks[i] | |
next_tick = 0 | |
for j from i<=j<blen: | |
if not (bids[j]>0 and asks[j]>0): | |
continue | |
if bids[j]>=ref_ask: # we at some point uptick | |
next_tick = 1 | |
break | |
if asks[j]<=ref_bid: | |
next_tick = -1 | |
break | |
res[i] = next_tick | |
return res | |
def ref_next_turns_single_turn(np.ndarray[long,ndim=1] bids,np.ndarray[long,ndim=1] asks,np.ndarray[long,ndim=1] refs,np.ndarray[long,ndim=1] sizes): | |
cdef: | |
long blen = bids.shape[0] | |
np.ndarray[long, ndim=1] res = np.zeros(blen, dtype=long) | |
long ref,sz | |
long i,j | |
long next_tick | |
for i from 0<=i<blen: | |
if not (bids[i]>0 and asks[i]>0 and refs[i]>0): | |
continue | |
ref = refs[i] | |
sz = sizes[i] | |
next_tick = 0 | |
for j from i<=j<blen: | |
if not (bids[j]>0 and asks[j]>0): | |
continue | |
if sz>0: | |
if bids[j]>=ref: | |
next_tick = 1 | |
break | |
if asks[j]<ref: | |
next_tick = -1 | |
break | |
else: | |
if asks[j]<=ref: | |
next_tick = -1 | |
break | |
if bids[j]>ref: | |
next_tick = 1 | |
break | |
res[i] = next_tick | |
return res | |
def ref_next_turns(np.ndarray[long,ndim=1] bids,np.ndarray[long,ndim=1] asks,np.ndarray[long,ndim=1] refs): | |
cdef: | |
long blen = bids.shape[0] | |
np.ndarray[long, ndim=1] res = np.zeros(blen, dtype=long) | |
long ref | |
long i,j | |
long next_tick | |
for i from 0<=i<blen: | |
if not (bids[i]>0 and asks[i]>0 and refs[i]>0): | |
continue | |
ref = refs[i] | |
next_tick = 0 | |
for j from i<=j<blen: | |
if not (bids[j]>0 and asks[j]>0): | |
continue | |
if bids[j]>ref: | |
next_tick = 1 | |
break | |
if asks[j]<ref: | |
next_tick = -1 | |
break | |
res[i] = next_tick | |
return res | |
def find_turns(np.ndarray[double,ndim=1] bids,np.ndarray[double,ndim=1] asks): | |
cdef: | |
long blen = bids.shape[0] | |
np.ndarray[double, ndim=2] res = np.zeros([blen,2], dtype=np.double) | |
double prev_bid,prev_ask | |
for i from 0<=i<blen: | |
if not (bids[i]>0 and asks[i]>0): | |
continue | |
prev_bid = bids[i] | |
prev_ask = asks[i] | |
break | |
for i from 1<= i < blen: | |
if not (bids[i]>0 and asks[i]>0): | |
continue | |
if bids[i]>=prev_ask: | |
res[i][0] = 1 | |
res[i][1] = bids[i] | |
elif asks[i]<=prev_bid: | |
res[i][0] = -1 | |
res[i][1] = asks[i] | |
else: | |
continue | |
prev_bid = bids[i] | |
prev_ask = asks[i] | |
return res | |
def wavgDeltas(np.ndarray[long,ndim=1] prices,np.ndarray[long,ndim=1] sizes, long deltas): | |
cdef: | |
long plen = prices.shape[0],count,i,j,sz | |
np.ndarray[double, ndim=1] res = np.zeros(plen, dtype=np.double) | |
double avg_price | |
for i from 0<=i<plen: | |
count = 0 | |
avg_price =0 | |
for j from i>=j>=0: | |
sz = labz(sizes[j]) | |
if sz>0: #trade! | |
count+=sz | |
avg_price+=prices[j]*sz | |
if count > deltas: | |
avg_price -= (prices[j] * (count-deltas)) | |
count = deltas | |
j=-1 | |
if count>0: | |
avg_price/=count | |
res[i] = avg_price | |
return res |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment