-
-
Save 1328/015bb2afa8dde9a3d1e8 to your computer and use it in GitHub Desktop.
fifties
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import unittest | |
| import json | |
| import time | |
| from pprint import pprint | |
| from collections import defaultdict, namedtuple | |
| from itertools import product | |
| import numpy as np | |
| Record = namedtuple('Record', ['id', 'start', 'stop', 'value']) | |
| # we use named tuples to make working with the complex data a bit simplier, so | |
| # instead of parallel lists, we have one list of each fob, where | |
| # list[elem].value = the value, or list[elem].start is the start, etc. | |
| def get_data(fn): | |
| ''' | |
| This just loads up some sample data from a json file, since the code is nice | |
| and simple, and a bit easier that parsing a tab or comma separated file | |
| ''' | |
| with open(fn, mode = 'r') as fh: | |
| data = json.load(fh) | |
| return [Record(*r) for r in data] | |
| def old_fifties(fob1, fob2): | |
| ''' | |
| refactored version of your old code, which goes through each setoff from | |
| -4400 -> 4400 at steps of 50 and crosses all fob1 and fob2 | |
| uses itertools.product to make the code a bit simpler | |
| ''' | |
| start = time.time() | |
| averages = {} | |
| for zero in range(-4400, 4401, 50): | |
| values = [] | |
| for a,b in product(fob1, fob2): | |
| if a.id != b.id: | |
| continue | |
| if find_offset_overlap(a,b, zero)< 26: | |
| continue | |
| values.append(b.value) | |
| if values: | |
| avg = np.mean(values) | |
| averages[zero] = avg | |
| print('old_fifties took {}'.format( | |
| (time.time() - start))) | |
| return averages | |
| def int_50(n): | |
| ''' like int(float), but instead truncates a number at the next lowest | |
| multiple of 50 ''' | |
| return int(n/50)*50 | |
| def find_offset_overlap(a,b, offset = 0): | |
| ''' | |
| find the amount of overlap between segments a,b after offsetting seg a | |
| : a,b are two objects with parameters .start and .stop | |
| : offset is the amount to increment segment a | |
| ''' | |
| start = max(a.start + offset, b.start) | |
| stop = min(a.stop + offset, b.stop) | |
| overlap = stop-start | |
| return overlap | |
| def find_setoffs(a,b, min_overlap = 26): | |
| ''' find the set offs necessary to cause a,b to overlap, in increments of 50 | |
| : a,b must be segment type objects with start and stop parems | |
| : min_overlap is the minimum desired amount of overlap | |
| ''' | |
| boundaries = [ int_50(b.start - a.stop), | |
| int_50(b.stop - a.start), | |
| ] | |
| # since we will be using range, we need to find the smaller of the two | |
| # boundaries to start at | |
| # we further limit the boundaries by -4400, 4400 | |
| start = min(boundaries) | |
| start = max(start, -4400) | |
| stop = max(boundaries) | |
| stop = min(stop, 4400) | |
| result = [] | |
| for offset in range(start, stop+50, 50): | |
| overlap = find_offset_overlap(a,b, offset) | |
| #print('\t{}->{}'.format(offset, overlap)) | |
| if overlap >= 26: | |
| result.append(offset) | |
| return result | |
| def new_fifties(fob1, fob2): | |
| ''' | |
| another way of doing the 50s function. Instead of building it by stepping | |
| through the x_asis, we do so by building the x_asis stepping through a cross | |
| of fob1 and fob2. | |
| Since each element of fob1 will cross every element of fob2 exactly once | |
| (for varying lengths of time), we find the overlap of each and plug this | |
| into a dictionary of x_asis: values | |
| Once that dictionary is built we average out each element. | |
| Note: Largest advantage is when fob1/fob2 has smaller segments. For larger | |
| segments, new_fifties performs like old_fifties | |
| ''' | |
| start = time.time() | |
| values = defaultdict(list) | |
| for a,b in product(fob1, fob2): | |
| if a.id != b.id: | |
| # NOTE: This function could be made faster by putting the | |
| # different record id's into different lists so that only those | |
| # records with identical identities are crossed against one another, | |
| # e.g., if there are two groups of 10 A records and 10 B records, | |
| # crossing them all 20*20 = 400 iterations, while just crossing all | |
| # A and B records 10*10 + 10*10 = 200 iterations | |
| # but since we run off this check at the front, and it is not clear | |
| # on the distribution, it is unceartain how much time might be saved | |
| continue | |
| setoffs = find_setoffs(a,b) | |
| for setoff in setoffs: | |
| values[setoff].append(b.value) | |
| averages = {k:np.mean(v) for k,v in values.items()} | |
| print('new_fifties took {}'.format( | |
| (time.time() - start))) | |
| return averages | |
| def main(): | |
| fob1 = get_data('t1') [:400] | |
| fob2 = get_data('t2') [:400] | |
| revised = new_fifties(fob1, fob2) | |
| #pprint(revised) | |
| # do an average of all averages as a sanity test | |
| revised_average = np.mean(list(revised.values())) | |
| print('revised average = {}'.format(revised_average)) | |
| print() | |
| old = old_fifties(fob1, fob2) | |
| #pprint(old) | |
| # do an average of all averages as a sanity test | |
| old_average = np.mean(list(old.values())) | |
| print('old_average = {}'.format(old_average)) | |
| print() | |
| class TestNew(unittest.TestCase): | |
| ''' | |
| use unittest to make sure each individual function works | |
| ''' | |
| def test_int_50(self): | |
| self.assertEqual(int_50(100), 100) | |
| self.assertEqual(int_50(99), 50) | |
| self.assertEqual(int_50(-99), -50) | |
| self.assertEqual(int_50(251), 250) | |
| def test_find_setoffs(self): | |
| self.assertEqual( | |
| find_setoffs( | |
| Record('a', 0, 100, 10), | |
| Record('b', 100, 200, 10), | |
| ), [50, 100, 150]) | |
| self.assertEqual( | |
| find_setoffs( | |
| Record('a', -100, 0, 10), | |
| Record('b', -200, -100, 10), | |
| ), [-150, -100, -50]) | |
| def test_find_overlap(self): | |
| self.assertEqual( | |
| find_offset_overlap( | |
| Record('a', 0, 150, 10), | |
| Record('b', 100, 200, 10), | |
| 0 | |
| ), 50) | |
| self.assertEqual( | |
| find_offset_overlap( | |
| Record('a', -150, -0, 10), | |
| Record('b', -200, -100, 10), | |
| 0 | |
| ), 50) | |
| if __name__ == '__main__': | |
| main() | |
| #unittest.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment