Skip to content

Instantly share code, notes, and snippets.

@Ending2015a
Last active August 24, 2020 12:10
Show Gist options
  • Select an option

  • Save Ending2015a/257bc5dfa2a8745ed5fc14fa6724641b to your computer and use it in GitHub Desktop.

Select an option

Save Ending2015a/257bc5dfa2a8745ed5fc14fa6724641b to your computer and use it in GitHub Desktop.
This example shows how to use Python to retrieve PM2.5 data from the specific measuring station by using OpenData.epa API, and save them to CSV file
# --- built in ---
import os
import sys
import time
import json
import logging
import urllib.parse as urlparse
from typing import List
from datetime import datetime
from datetime import timezone
from collections import OrderedDict
# --- 3rd party ---
import requests
import pandas as pd
import dateutil.parser as dateparser
# (second)
UPDATE_PERIOD = 60
# === utils ===
def enc(string: str) -> str:
'''
Encode special characters in string using the %xx escape (URL form)
'''
return urlparse.quote(string)
def make_datetime(fmt='%Y-%m-%d_%H-%M-%S') -> str:
'''Make current datetime'''
return datetime.now().strftime(fmt)
def parse_iso8601(date_and_time: str, to_local=False, fmt: str ='%Y-%m-%d %H:%M:%S') -> str:
'''Parse ISO8601 datetime format to other datetime format
Args:
* date_and_time: (str) datetime string in ISO8601 format
* to_local: (bool) whether convert timezone to local timezone or not
* fmt: (str) format string
'''
dt = dateparser.parse(date_and_time)
# convert to local timezone
if to_local:
dt = to_local_timezone(dt)
if fmt == 'timestamp':
# unix timestamp
return dt.timestamp()
return dt.strftime(fmt)
def to_local_timezone(date_and_time: datetime) -> datetime:
'''
Convert zero timezone to local timezone
'''
return date_and_time.replace(tzinfo=timezone.utc).astimezone(tz=None)
# === PM25 CSV Writer ===
class PM25_CSVWriter(object):
# === static variables ===
SCHEME: str = 'https'
NETLOC: str = 'sta.ci.taiwan.gov.tw'
PATH: str = '/STA_AirQuality_EPAIoT/v1.0/Datastreams'
# === properties ===
@property
def url(self):
return self.query_url
@property
def query(self):
return self.query_string
# === main api ===
def __init__(self, stationIDs: List[str],
period=60,
root_path='./data',
log_path='./log',
log_level=logging.INFO):
'''Initialize PM2.5 CSV Writer'''
self.stationIDs = stationIDs
self.period = period
# directory to store sensor data
self.root_path = root_path
# logging path
self.log_path = os.path.join(log_path, make_datetime()+'.log')
# make query string
self.query_string = self.make_queries(stationIDs)
# make request url
self.query_url = self.make_url(self.query_string)
self.station_last_timestamp = {k:0 for k in stationIDs}
# create logger
os.makedirs(log_path, exist_ok=True)
logging.basicConfig(level=log_level,
handlers=[logging.FileHandler(self.log_path),
logging.StreamHandler()])
self.LOG = logging.getLogger('main')
def start(self):
while True:
# create directory if it does not exist
os.makedirs(self.root_path, exist_ok=True)
try:
# send request
self.LOG.info('Request for new sensor data')
sensor_data = self.request()
self.LOG.info('Sensor data retrieved')
for data in sensor_data:
# get last timestamp
last_timestamp = self.station_last_timestamp[data['stationID']]
# If the retrived data is newer than the last data, write to csv
if data['timestamp'] > last_timestamp:
# write to csv
self.write_to_csv(data)
# update last timestamp
self.station_last_timestamp[data['stationID']] = data['timestamp']
else:
self.LOG.debug('Duplicated data')
except:
self.LOG.exception('Failed to request data')
# sleep for self.period seconds
self.LOG.debug('Sleep for {} sec'.format(self.period))
time.sleep(self.period)
def request(self) -> dict:
# send request
self.LOG.debug('Sending request: ' + self.query_url)
r = requests.get(self.query_url)
# raise exception if status code != 200
r.raise_for_status()
# parse json to dict
j = json.loads(r.text)
sensor_data = []
for sensor in j.get('value', []):
# retrieve sensor data, stationID, observed time
stationID = sensor['Thing']['properties']['stationID']
phenomenonTime = sensor['Observations'][0]['phenomenonTime']
result = sensor['Observations'][0]['result']
# create dict/ append to list
sensor_data.append(
OrderedDict([('stationID', stationID),
('PM2.5', result),
('phenomenonTime', parse_iso8601(phenomenonTime, to_local=True)), # use local timezone
('timestamp', parse_iso8601(phenomenonTime, to_local=False, fmt='timestamp'))]))
self.LOG.debug('Sensor data retrieved: {}'.format(sensor_data))
return sensor_data
# === sub api ===
def write_to_csv(self, data):
csv_path = self.make_csv(data['timestamp'])
header = not os.path.isfile(csv_path)
pd.DataFrame(data, index=[0]).to_csv(csv_path,
index=False,
header=header,
mode='a',
encoding='utf-8')
def make_queries(self, stationIDs: List[str]) -> str:
'''Make query string
Args:
* stationIDs: (List[str]) A list of station IDs to track
Returns:
(str) query url
'''
filter_stationIDs = ' or '.join(["Thing/properties/stationID eq '{}'".format(ID)
for ID in stationIDs])
Expand = '$expand=Thing,Observations({})'.format(
';'.join([
'$orderby={}'.format(enc('phenomenonTime desc')),
'$top={}'.format(1)
])
)
Filter = '$filter={}'.format(enc("name eq 'PM2.5' and ({})".format(filter_stationIDs)))
Count = '$count=true'
query = '&'.join([Expand, Filter, Count])
return query
def make_url(self, query_string: str) -> str:
'''Make url with query string'''
return urlparse.urlunparse((self.SCHEME, # scheme: 'https'
self.NETLOC, # netloc: 'domain name'
self.PATH, # path
None, # params
query_string, # query
None)) # fragment
def make_csv(self, timestamp) -> str:
'''Make CSV file path'''
local_datetime = datetime.fromtimestamp(timestamp)
csv_name = 'pm2.5_' + local_datetime.strftime('%Y-%m-%d') + '.csv'
return os.path.join(self.root_path, csv_name)
if __name__ == '__main__':
writer = PM25_CSVWriter(stationIDs=['10112501197', '10112850902', '10116382792'],
period=UPDATE_PERIOD,
log_level=logging.DEBUG)
writer.start()
requests
pandas
python-dateutil
@Ending2015a
Copy link
Copy Markdown
Author

Install dependencies:

pip install -r requirements.txt

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment