Skip to content

Instantly share code, notes, and snippets.

@Justasic
Created October 3, 2018 08:04
Show Gist options
  • Save Justasic/95225f92867efa835434d42ed91f986a to your computer and use it in GitHub Desktop.
Save Justasic/95225f92867efa835434d42ed91f986a to your computer and use it in GitHub Desktop.
This is my script I use for my IRC channel that announces different earthquakes around the world
#!/usr/bin/env python3
import requests, json, time, feedparser, re
from datetime import datetime
from dateutil.parser import parse
from humanize import naturaltime
import shorturl
# This script parses the JSON data from https://earthquake.usgs.gov/fdsnws/event/1/
# and data from https://www.emsc-csem.org/service/rss/rss.php?typ=emsc
# and displays two feeds, one is to the #Earthquakes.all channel and the other
# is in the #Earthquakes.major channel.
# This script should be executed every minute
lastfeed = None
etag = None
def NormalizeSpaces(stringdata):
re.sub(' +', ' ', stringdata)
return stringdata
def feed_modified_date(feed):
# this is the last-modified value in the response header
# do not confuse this with the time that is in each feed as the server
# may be using a different timezone for last-resposne headers than it
# uses for the publish date
modified = feed.get('modified')
if modified is not None:
return modified
return None
def max_entry_date(feed):
entry_pub_dates = (e.get('updated_parsed') for e in feed.entries)
entry_pub_dates = tuple(e for e in entry_pub_dates if e is not None)
if len(entry_pub_dates) > 0:
return max(entry_pub_dates)
return None
def entries_with_dates_after(feed, date):
response = []
for entry in feed.entries:
if entry.get('updated_parsed') > date:
response.append(entry)
return response
def GetQuakeData():
global lastfeed
# Enter an event loop
while True:
if not lastfeed:
f = feedparser.parse("https://www.emsc-csem.org/service/rss/rss.php?typ=emsc")
else:
etag = lastfeed.get('etag', None)
modified = feed_modified_date(lastfeed)
f = feedparser.parse("https://www.emsc-csem.org/service/rss/rss.php?typ=emsc", etag=etag, modified=modified)
# Skip if this is first start, we don't need to spam a fuckton of data.
if len(f.entries) > 0:
if not lastfeed:
lastfeed = f
# sleep for exactly 1 minute
time.sleep(60)
continue
entries = entries_with_dates_after(f, max_entry_date(f))
for e in f.entries:
if e not in lastfeed.entries:
entries.append(e)
if len(entries) > 0:
print("%d new earthquake(s) were announced" % len(entries))
for e in entries:
PostAllQuakes(e)
PostMajorQuakes(e)
lastfeed = f
#else:
#print("Feed has no etag or modified support...")
time.sleep(60)
def CommonPost(channel, data):
t = parse(data['emsc_time'], ignoretz=True)
short = shorturl.minimize(data['id'])
message = "[\00302{0}\017 | \002\0034{1}\017] \0038DEPTH {3} KM\017 {2} -- {4}".format(naturaltime(datetime.utcnow() - t), data['emsc_magnitude'], " ".join(NormalizeSpaces(data['title']).split(' ')[2:]), data['emsc_depth'], short)
data = [{"channel": channel, 'message': message}]
r = requests.post("http://localhost:8099/", json=data)
if r.status_code != 200:
print("There was an HTTP error: %d" % r.status_code)
def PostAllQuakes(data):
CommonPost('#Earthquakes.all', data)
def PostMajorQuakes(data):
try:
print(data['emsc_magnitude'])
mag = float(NormalizeSpaces(data['emsc_magnitude']).split(' ')[1])
if mag >= 6.0:
CommonPost('#Earthquakes.major', data)
except ValueError:
print("Caught value error, ignoring...")
if __name__ == "__main__":
GetQuakeData()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment