Skip to content

Instantly share code, notes, and snippets.

@1328
Created February 17, 2015 20:09
Show Gist options
  • Select an option

  • Save 1328/015bb2afa8dde9a3d1e8 to your computer and use it in GitHub Desktop.

Select an option

Save 1328/015bb2afa8dde9a3d1e8 to your computer and use it in GitHub Desktop.
fifties
import unittest
import json
import time
from pprint import pprint
from collections import defaultdict, namedtuple
from itertools import product
import numpy as np
Record = namedtuple('Record', ['id', 'start', 'stop', 'value'])
# we use named tuples to make working with the complex data a bit simplier, so
# instead of parallel lists, we have one list of each fob, where
# list[elem].value = the value, or list[elem].start is the start, etc.
def get_data(fn):
'''
This just loads up some sample data from a json file, since the code is nice
and simple, and a bit easier that parsing a tab or comma separated file
'''
with open(fn, mode = 'r') as fh:
data = json.load(fh)
return [Record(*r) for r in data]
def old_fifties(fob1, fob2):
'''
refactored version of your old code, which goes through each setoff from
-4400 -> 4400 at steps of 50 and crosses all fob1 and fob2
uses itertools.product to make the code a bit simpler
'''
start = time.time()
averages = {}
for zero in range(-4400, 4401, 50):
values = []
for a,b in product(fob1, fob2):
if a.id != b.id:
continue
if find_offset_overlap(a,b, zero)< 26:
continue
values.append(b.value)
if values:
avg = np.mean(values)
averages[zero] = avg
print('old_fifties took {}'.format(
(time.time() - start)))
return averages
def int_50(n):
''' like int(float), but instead truncates a number at the next lowest
multiple of 50 '''
return int(n/50)*50
def find_offset_overlap(a,b, offset = 0):
'''
find the amount of overlap between segments a,b after offsetting seg a
: a,b are two objects with parameters .start and .stop
: offset is the amount to increment segment a
'''
start = max(a.start + offset, b.start)
stop = min(a.stop + offset, b.stop)
overlap = stop-start
return overlap
def find_setoffs(a,b, min_overlap = 26):
''' find the set offs necessary to cause a,b to overlap, in increments of 50
: a,b must be segment type objects with start and stop parems
: min_overlap is the minimum desired amount of overlap
'''
boundaries = [ int_50(b.start - a.stop),
int_50(b.stop - a.start),
]
# since we will be using range, we need to find the smaller of the two
# boundaries to start at
# we further limit the boundaries by -4400, 4400
start = min(boundaries)
start = max(start, -4400)
stop = max(boundaries)
stop = min(stop, 4400)
result = []
for offset in range(start, stop+50, 50):
overlap = find_offset_overlap(a,b, offset)
#print('\t{}->{}'.format(offset, overlap))
if overlap >= 26:
result.append(offset)
return result
def new_fifties(fob1, fob2):
'''
another way of doing the 50s function. Instead of building it by stepping
through the x_asis, we do so by building the x_asis stepping through a cross
of fob1 and fob2.
Since each element of fob1 will cross every element of fob2 exactly once
(for varying lengths of time), we find the overlap of each and plug this
into a dictionary of x_asis: values
Once that dictionary is built we average out each element.
Note: Largest advantage is when fob1/fob2 has smaller segments. For larger
segments, new_fifties performs like old_fifties
'''
start = time.time()
values = defaultdict(list)
for a,b in product(fob1, fob2):
if a.id != b.id:
# NOTE: This function could be made faster by putting the
# different record id's into different lists so that only those
# records with identical identities are crossed against one another,
# e.g., if there are two groups of 10 A records and 10 B records,
# crossing them all 20*20 = 400 iterations, while just crossing all
# A and B records 10*10 + 10*10 = 200 iterations
# but since we run off this check at the front, and it is not clear
# on the distribution, it is unceartain how much time might be saved
continue
setoffs = find_setoffs(a,b)
for setoff in setoffs:
values[setoff].append(b.value)
averages = {k:np.mean(v) for k,v in values.items()}
print('new_fifties took {}'.format(
(time.time() - start)))
return averages
def main():
fob1 = get_data('t1') [:400]
fob2 = get_data('t2') [:400]
revised = new_fifties(fob1, fob2)
#pprint(revised)
# do an average of all averages as a sanity test
revised_average = np.mean(list(revised.values()))
print('revised average = {}'.format(revised_average))
print()
old = old_fifties(fob1, fob2)
#pprint(old)
# do an average of all averages as a sanity test
old_average = np.mean(list(old.values()))
print('old_average = {}'.format(old_average))
print()
class TestNew(unittest.TestCase):
'''
use unittest to make sure each individual function works
'''
def test_int_50(self):
self.assertEqual(int_50(100), 100)
self.assertEqual(int_50(99), 50)
self.assertEqual(int_50(-99), -50)
self.assertEqual(int_50(251), 250)
def test_find_setoffs(self):
self.assertEqual(
find_setoffs(
Record('a', 0, 100, 10),
Record('b', 100, 200, 10),
), [50, 100, 150])
self.assertEqual(
find_setoffs(
Record('a', -100, 0, 10),
Record('b', -200, -100, 10),
), [-150, -100, -50])
def test_find_overlap(self):
self.assertEqual(
find_offset_overlap(
Record('a', 0, 150, 10),
Record('b', 100, 200, 10),
0
), 50)
self.assertEqual(
find_offset_overlap(
Record('a', -150, -0, 10),
Record('b', -200, -100, 10),
0
), 50)
if __name__ == '__main__':
main()
#unittest.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment