Created
January 16, 2018 13:01
-
-
Save tejinderss/ee56f01e9a428b989bca2a0c80f64d82 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import unittest | |
from functools import partial | |
from operator import itemgetter | |
from math import sin, cos, sqrt, asin, radians | |
from cStringIO import StringIO | |
def get_records(fl): | |
for line in fl: | |
record = json.loads(line) | |
record['coordinates'] = ( | |
float(record.pop('latitude')), | |
float(record.pop('longitude'))) | |
yield record | |
def search_records(records, coordinates, within_kms): | |
""" | |
:param records: iterable providing the records of customers | |
:param coordinates: tuple providing latitude and longitude | |
:param within_kms: float for within distance in kms | |
""" | |
distance_from_coords = partial(distance_between, coordinates) | |
for record in records: | |
if distance_from_coords(record['coordinates']) <= within_kms: | |
yield record | |
def distance_between(coords1, coords2): | |
""" | |
:param coords1: tuple providing latitude and longitude | |
:param coords2: tuple providing latitude and longitude | |
:returns: float | |
""" | |
coords1_lat, coords1_long = to_radians(coords1) | |
coords2_lat, coords2_long = to_radians(coords2) | |
lat_delta = coords2_lat - coords1_lat | |
long_delta = coords2_long - coords1_long | |
a = sin( | |
lat_delta/2)**2 + cos(coords1_lat) * cos( | |
coords2_lat) * sin(long_delta/2)**2 | |
c = 2 * asin(sqrt(a)) | |
# approximate radius of earth in km | |
radius = 6373.0 | |
return radius * c | |
def to_radians(coordinates): | |
""" | |
:param coordinates: tuple providing latitude and longitude | |
:returns: list of coordinates in radians | |
""" | |
return map(radians, coordinates) | |
def main(): | |
dublin_office_coords = (53.339428, -6.257664) | |
within_kms = 100.0 | |
with open('gistfile1.txt', 'r') as f: | |
records = get_records(f) | |
sorted_records = sorted( | |
search_records(records, dublin_office_coords, within_kms), | |
key=itemgetter('user_id') | |
) | |
# Note on sorted algorithm, this sorting is done | |
# in memory. If we need to optimize on this solution or if the | |
# input dataset is huge in the given file, we can use bisect | |
# module to sort the collection in place when we read the | |
# content from the file. Better yet use this abstraction: | |
# https://code.activestate.com/recipes/577197-sortedcollection/ | |
for record in sorted_records: | |
print record['user_id'], record['name'] | |
class TestSearchRecords(unittest.TestCase): | |
def setUp(self): | |
sample_lines = [ | |
'{"latitude": "52.986375", "user_id": 12, "name": "Christina McArdle", "longitude": "-6.043701"}', | |
'{"latitude": "51.8856167", "user_id": 2, "name": "Ian McArdle", "longitude": "-10.4240951"}', | |
'{"latitude": "52.966", "user_id": 15, "name": "Michael Ahearn", "longitude": "-6.463"}' | |
] | |
self.sample_data = StringIO('\n'.join(sample_lines)) | |
def test_search_records(self): | |
self.assertEqual( | |
len(list(search_records( | |
get_records(self.sample_data), | |
(53.339428, -6.257664), | |
100.0 | |
))), | |
2 | |
) | |
def test_search_non_existance(self): | |
self.assertEqual( | |
len(list(search_records( | |
get_records(self.sample_data), | |
(53.339428, -6.257664), | |
1.0 | |
))), | |
0 | |
) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment