Created
June 5, 2014 14:43
-
-
Save ssut/d438bb42c3991a39983c to your computer and use it in GitHub Desktop.
Get South Korea river status from koreawqi.go.kr
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#-*- coding: utf-8 -*- | |
import contextlib | |
import re | |
import urllib | |
from bs4 import BeautifulSoup | |
from datetime import datetime | |
from textwrap import dedent as trim | |
now = lambda: datetime.now().strftime("%Y%m%d%H") | |
class RiverStatusElement(object): | |
def __init__(self, name, temperature, ion, conductivity, oxygen, carbon, | |
nitrogen, phosphorus): | |
""" | |
Parameters Korean name: 수온, 수소이온, 전기전도도(EC), 용존산소량, | |
총 유기탄소, 총 질소, 총 인(nonmetalic element) | |
""" | |
self.name = name | |
self.temperature = temperature | |
self.ion = ion | |
self.conductivity = conductivity | |
self.oxygen = oxygen | |
self.carbon = carbon | |
self.nitrogen = nitrogen | |
self.phosphorus = phosphorus | |
def to_dict(self): | |
d = { | |
"temperature": self.temperature, | |
"ion": self.ion, | |
"conductivity": self.conductivity, | |
"oxygen": self.oxygen, | |
"carbon": self.carbon, | |
"nitrogen": self.nitrogen, | |
"phosphorus": self.phosphorus, | |
} | |
return d | |
def __unicode__(self): | |
return u"%s" % (self.name) | |
class RiverStatus(object): | |
URL = "http://www.koreawqi.go.kr/wQSCHomeLayout_D.wq?action_type=L" | |
RE_SCRIPT = re.compile(r'<(script).*?</\1>(?s)', re.M) | |
def __init__(self): | |
self.version = None | |
self._data = None | |
self.data = {} | |
self.update() | |
def _update_data(self): | |
""" | |
Get river status from koreawqi.go.kr | |
""" | |
# pass if local data version is up-to-date | |
if self.version and self.version == now(): | |
return | |
data = "" | |
with contextlib.closing(urllib.urlopen(self.URL)) as u: | |
data = u.read() | |
if data == "": | |
raise Exception("Cannot get data from {0}".format(self.URL)) | |
self._data = trim(data).replace("\t", "").replace("\r\n", "") \ | |
.replace(" ", "") | |
def _update_version(self): | |
""" | |
Update version from recent data | |
""" | |
if not self._data: | |
raise ValueError("Data not exists") | |
start = self._data.index("search_time") + 11 | |
end = self._data.index(";", start) | |
version_raw = self._data[start:end] | |
version = "".join([s for s in version_raw if s.isdigit()]) | |
if version == "": | |
raise Exception("Cannot get version from data") | |
else: | |
self.version = version | |
def _parse_data(self): | |
""" | |
Parse river data using BeautifulSoup | |
""" | |
# Remove all scripts | |
data = self.RE_SCRIPT.sub('', self._data) | |
# Reassemble invalid HTML to valid HTML | |
dtd_pos = data.index('<!DOCTYPE', 1) | |
data = data[dtd_pos:] | |
end_pos = data.index('</html>') + 7 | |
data = data[:end_pos] | |
# Parse using BeautifulSoup | |
data = BeautifulSoup(data) | |
# Select table rows | |
elements = data.select("#div_layer_btn2_r0 table tr") | |
final = {} | |
for el in elements: | |
cells = el.select("td") | |
d = { | |
"name": cells[0].text, | |
"temperature": (float(cells[1].text) if cells[1].text else 0.0), | |
"ion": (float(cells[2].text) if cells[2].text else 0.0), | |
"conductivity": (int(cells[3].text) if cells[3].text else 0), | |
"oxygen": (float(cells[4].text) if cells[4].text else 0.0), | |
"carbon": (float(cells[5].text) if cells[5].text else 0.0), | |
"nitrogen": (float(cells[6].text) if cells[6].text else 0.0), | |
"phosphorus": (float(cells[7].text) if cells[7].text else 0.0), | |
} | |
status = RiverStatusElement(**d) | |
final[status.name] = status | |
self.data = final | |
def update(self): | |
""" | |
Update data | |
0. check version | |
1. download data | |
2. update version | |
3. parse data | |
""" | |
# pass if local data version is up-to-date | |
if self.version and self.version == now(): | |
return False | |
self._update_data() | |
self._update_version() | |
self._parse_data() | |
return True | |
@property | |
def keys(self): | |
""" | |
Get all keys from parsed data | |
""" | |
return self.data.keys() | |
def exists(self, name): | |
""" | |
Check river exists | |
""" | |
if not self._data: | |
raise ValueError("Data not exists") | |
return (name in self.data) | |
def get(self, arg): | |
""" | |
Get river status from parsed data | |
If arg type is list: return dictionary (key: name, value: status instance) | |
Else if arg type is unicode string: return status instance | |
""" | |
if isinstance(arg, list): | |
d = {} | |
for name in arg: | |
if self.exists(name): | |
d[name] = self.get(name) | |
elif isinstance(arg, unicode) or isinstance(arg, str): | |
return (self.data[arg] if self.exists(arg) else None) | |
if __name__ == "__main__": | |
import sys | |
import pprint | |
pp = pprint.PrettyPrinter(indent=4) | |
river_status = RiverStatus() | |
print >> sys.stdout, "평창강 존재 여부: {0}".format( | |
"Yes" if river_status.exists(u"평창강") else "No") | |
for river in river_status.keys: | |
pass | |
if river_status.exists(u"평창강"): | |
d = river_status.get(u"평창강").to_dict() | |
pp.pprint(d) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment