Created
March 5, 2014 20:41
-
-
Save lamchau/9376151 to your computer and use it in GitHub Desktop.
USGS Earthquake Data - https://www.google.com/fusiontables/DataSource?snapid=S327323orMC
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime | |
import csv | |
import math | |
import sys | |
import time | |
class Record(object): | |
EARTH_RADIUS_IN_KM = 6371 | |
# cheney | |
START_COORDINATE = (47.4875, 117.5747) | |
date = None | |
coordinate = None | |
magnitude = None | |
depth = None | |
source = None | |
impact = None | |
distance = None | |
def __init__(self): | |
super(Record, self).__init__() | |
# def __getitem__(self, attr): | |
# return self[attr] | |
@staticmethod | |
def calculate_impact(record): | |
if not isinstance(record, Record): | |
raise TypeError("Invalid type '%s'" % type(record)) | |
return max(0, (1000 - record.depth) * record.magnitude) | |
@staticmethod | |
def convert_date(elements): | |
# pad zeros for time (hhmmss.mm) | |
elements[3] = elements[3].zfill(6) | |
date_str = ",".join(elements[0:4]) | |
formatter = Record.get_date_format(date_str) | |
date_object = time.strptime(date_str, formatter) | |
return datetime.fromtimestamp(time.mktime(date_object)) | |
@staticmethod | |
def get_date_format(s): | |
if "." in s: | |
return "%Y,%m,%d,%H%M%S.%f" | |
return "%Y,%m,%d,%H%M%S" | |
@staticmethod | |
def get_distance(start_coordinate, end_coordinate): | |
lat1, long1 = map(math.radians, start_coordinate) | |
lat2, long2 = map(math.radians, end_coordinate) | |
dlat = lat2 - lat1 | |
dlong = long2 - long1 | |
a = math.sin(dlat / 2) ** 2 + \ | |
math.cos(lat1) * math.cos(lat2) * \ | |
math.sin(dlong / 2) ** 2 | |
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) | |
return Record.EARTH_RADIUS_IN_KM * c | |
@staticmethod | |
def load(filename, limit=None, progress=2500): | |
all_magnitudes = [] | |
data = [] | |
sys.stdout.write("Loading file") | |
with open(filename, "r") as f: | |
# skip headers | |
f.readline() | |
for index, line in enumerate(csv.reader(f)): | |
if limit is not None and index > limit: | |
break | |
if index % progress == 0: | |
sys.stdout.write(".") | |
sys.stdout.flush() | |
try: | |
record = Record.parse(line) | |
if record: | |
data.append(record) | |
all_magnitudes.append(record.magnitude) | |
except: | |
pass | |
sys.stdout.write("\n") | |
return (data, all_magnitudes) | |
@staticmethod | |
def parse(elements): | |
if len(elements) != 10: | |
raise Exception("Invalid number of elements") | |
record = Record() | |
record.date = Record.convert_date(elements) | |
record.coordinate = tuple(map(float, elements[4:6])) | |
# coordinates as a string | |
# elements[6] | |
record.magnitude = float(elements[7]) | |
record.depth = int(elements[8]) if elements[8] else 0 | |
record.source = elements[9] | |
record.impact = Record.calculate_impact(record) | |
record.distance = Record.get_distance(Record.START_COORDINATE, | |
record.coordinate) | |
return record |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import math | |
class Statistics(object): | |
@staticmethod | |
def statistics(elements): | |
result = (Statistics.mean(elements), Statistics.median(elements), Statistics.std_dev(elements)) | |
print(""" | |
mean: %.2f | |
median: %.2f | |
std. dev: %.2f\n""" % result) | |
@staticmethod | |
def mean(elements): | |
return sum(elements) / len(elements) | |
@staticmethod | |
def median(elements): | |
elements = sorted(elements) | |
midpoint = int((1 + (len(elements) - 1)) / 2) | |
if len(elements) % 2 == 0: | |
return (elements[midpoint] + elements[midpoint - 1]) / 2.0 | |
return elements[midpoint / 2] | |
@staticmethod | |
def std_dev(elements): | |
diff = 0.0 | |
average = Statistics.mean(elements) | |
for x in elements: | |
diff += (x - average) ** 2 | |
return math.sqrt(diff / len(elements)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import os | |
import sys | |
import operator | |
from Record import Record | |
from Statistics import Statistics | |
class Application(object): | |
data = None | |
all_magnitudes = None | |
"""docstring for Application""" | |
def __init__(self, filename, limit=None, progress=2500): | |
super(Application, self).__init__() | |
self.menu = Menu() | |
self.data, self.all_magnitudes = Record.load(filename, limit, progress) | |
def select_option(self): | |
while True: | |
choice = raw_input(self.menu.display) | |
if choice: | |
item = self.menu.get(choice.upper()) | |
if item: | |
item.comparator(self.data if choice != "4" else self.all_magnitudes, item) | |
class Menu(object): | |
class Item(object): | |
"""docstring for Item""" | |
def __init__(self, option, title, comparator=None, renderer=None): | |
super(Menu.Item, self).__init__() | |
self.option = option | |
self.title = title | |
self.comparator = comparator | |
self.renderer = renderer | |
def render(self): | |
return " %s. %s" % (self.option, self.title) | |
"""docstring for Menu""" | |
def __init__(self): | |
super(Menu, self).__init__() | |
items = [ | |
self.Item("1", "Top 100 Magnitude Earthquakes (date, magnitude, depth)", | |
lambda data, item: Menu.sort("magnitude", data, item, 100), | |
lambda x: "%s\t%.2f\t%d" % (str(x.date), x.magnitude, x.depth)), | |
self.Item("2", "Top 50 Shallowest Earthquakes (date, magnitude, depth)", | |
lambda data, item: Menu.sort("depth", data, item, 50, True), | |
lambda x: "%s\t%.2f\t%d" % (str(x.date), x.magnitude, x.depth)), | |
self.Item("3", "Top 50 Earthquakes closest to Cheney, WA (date, coordinates, distance from Cheney)", | |
lambda data, item: Menu.sort("distance", data, item, 50, True), | |
lambda x: "%s\t%s\t%.2f" % (str(x.date), x.coordinate, x.distance)), | |
self.Item("4", "Mean, Midpoint, Median and Standard Deviation of Earthquake Magnitude", | |
lambda data, item: Statistics.statistics(data), | |
None), | |
self.Item("5", "Top 50 Earthquakes by Impact (date, magnitude, depth, impact)", | |
lambda data, item: Menu.sort("impact", data, item, 50), | |
lambda x: "%s\t%.2f\t%d\t%.2f" % (str(x.date), x.magnitude, x.depth, x.impact)), | |
self.Item("Q", "Exit", | |
lambda x, y: sys.exit(0)) | |
] | |
self.options = dict((x.option, x) for x in items) | |
self.display = "\n".join([x.render() for x in items]) + "\n\nSelect an option: " | |
def get(self, option): | |
return self.options.get(option) | |
def display(self): | |
return self.display | |
# todo: separate | |
@staticmethod | |
def sort(key_name, data, item, count=50, descending=False): | |
print(item.title) | |
if not item.renderer: | |
return | |
direction = False if descending else True | |
key_name = key_name.lower() | |
data.sort(key=operator.attrgetter(key_name), reverse=direction) | |
for x in data[:count]: | |
print(item.renderer(x)) | |
# return data[:count] | |
def main(): | |
app = Application("eqdata.csv", progress=20000) | |
os.system("cls" if os.name == "nt" else "clear") | |
app.select_option() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment