Last active
August 24, 2020 12:10
-
-
Save Ending2015a/257bc5dfa2a8745ed5fc14fa6724641b to your computer and use it in GitHub Desktop.
This example shows how to use Python to retrieve PM2.5 data from the specific measuring station by using OpenData.epa API, and save them to CSV file
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # --- built in --- | |
| import os | |
| import sys | |
| import time | |
| import json | |
| import logging | |
| import urllib.parse as urlparse | |
| from typing import List | |
| from datetime import datetime | |
| from datetime import timezone | |
| from collections import OrderedDict | |
| # --- 3rd party --- | |
| import requests | |
| import pandas as pd | |
| import dateutil.parser as dateparser | |
| # (second) | |
| UPDATE_PERIOD = 60 | |
| # === utils === | |
| def enc(string: str) -> str: | |
| ''' | |
| Encode special characters in string using the %xx escape (URL form) | |
| ''' | |
| return urlparse.quote(string) | |
| def make_datetime(fmt='%Y-%m-%d_%H-%M-%S') -> str: | |
| '''Make current datetime''' | |
| return datetime.now().strftime(fmt) | |
| def parse_iso8601(date_and_time: str, to_local=False, fmt: str ='%Y-%m-%d %H:%M:%S') -> str: | |
| '''Parse ISO8601 datetime format to other datetime format | |
| Args: | |
| * date_and_time: (str) datetime string in ISO8601 format | |
| * to_local: (bool) whether convert timezone to local timezone or not | |
| * fmt: (str) format string | |
| ''' | |
| dt = dateparser.parse(date_and_time) | |
| # convert to local timezone | |
| if to_local: | |
| dt = to_local_timezone(dt) | |
| if fmt == 'timestamp': | |
| # unix timestamp | |
| return dt.timestamp() | |
| return dt.strftime(fmt) | |
| def to_local_timezone(date_and_time: datetime) -> datetime: | |
| ''' | |
| Convert zero timezone to local timezone | |
| ''' | |
| return date_and_time.replace(tzinfo=timezone.utc).astimezone(tz=None) | |
| # === PM25 CSV Writer === | |
| class PM25_CSVWriter(object): | |
| # === static variables === | |
| SCHEME: str = 'https' | |
| NETLOC: str = 'sta.ci.taiwan.gov.tw' | |
| PATH: str = '/STA_AirQuality_EPAIoT/v1.0/Datastreams' | |
| # === properties === | |
| @property | |
| def url(self): | |
| return self.query_url | |
| @property | |
| def query(self): | |
| return self.query_string | |
| # === main api === | |
| def __init__(self, stationIDs: List[str], | |
| period=60, | |
| root_path='./data', | |
| log_path='./log', | |
| log_level=logging.INFO): | |
| '''Initialize PM2.5 CSV Writer''' | |
| self.stationIDs = stationIDs | |
| self.period = period | |
| # directory to store sensor data | |
| self.root_path = root_path | |
| # logging path | |
| self.log_path = os.path.join(log_path, make_datetime()+'.log') | |
| # make query string | |
| self.query_string = self.make_queries(stationIDs) | |
| # make request url | |
| self.query_url = self.make_url(self.query_string) | |
| self.station_last_timestamp = {k:0 for k in stationIDs} | |
| # create logger | |
| os.makedirs(log_path, exist_ok=True) | |
| logging.basicConfig(level=log_level, | |
| handlers=[logging.FileHandler(self.log_path), | |
| logging.StreamHandler()]) | |
| self.LOG = logging.getLogger('main') | |
| def start(self): | |
| while True: | |
| # create directory if it does not exist | |
| os.makedirs(self.root_path, exist_ok=True) | |
| try: | |
| # send request | |
| self.LOG.info('Request for new sensor data') | |
| sensor_data = self.request() | |
| self.LOG.info('Sensor data retrieved') | |
| for data in sensor_data: | |
| # get last timestamp | |
| last_timestamp = self.station_last_timestamp[data['stationID']] | |
| # If the retrived data is newer than the last data, write to csv | |
| if data['timestamp'] > last_timestamp: | |
| # write to csv | |
| self.write_to_csv(data) | |
| # update last timestamp | |
| self.station_last_timestamp[data['stationID']] = data['timestamp'] | |
| else: | |
| self.LOG.debug('Duplicated data') | |
| except: | |
| self.LOG.exception('Failed to request data') | |
| # sleep for self.period seconds | |
| self.LOG.debug('Sleep for {} sec'.format(self.period)) | |
| time.sleep(self.period) | |
| def request(self) -> dict: | |
| # send request | |
| self.LOG.debug('Sending request: ' + self.query_url) | |
| r = requests.get(self.query_url) | |
| # raise exception if status code != 200 | |
| r.raise_for_status() | |
| # parse json to dict | |
| j = json.loads(r.text) | |
| sensor_data = [] | |
| for sensor in j.get('value', []): | |
| # retrieve sensor data, stationID, observed time | |
| stationID = sensor['Thing']['properties']['stationID'] | |
| phenomenonTime = sensor['Observations'][0]['phenomenonTime'] | |
| result = sensor['Observations'][0]['result'] | |
| # create dict/ append to list | |
| sensor_data.append( | |
| OrderedDict([('stationID', stationID), | |
| ('PM2.5', result), | |
| ('phenomenonTime', parse_iso8601(phenomenonTime, to_local=True)), # use local timezone | |
| ('timestamp', parse_iso8601(phenomenonTime, to_local=False, fmt='timestamp'))])) | |
| self.LOG.debug('Sensor data retrieved: {}'.format(sensor_data)) | |
| return sensor_data | |
| # === sub api === | |
| def write_to_csv(self, data): | |
| csv_path = self.make_csv(data['timestamp']) | |
| header = not os.path.isfile(csv_path) | |
| pd.DataFrame(data, index=[0]).to_csv(csv_path, | |
| index=False, | |
| header=header, | |
| mode='a', | |
| encoding='utf-8') | |
| def make_queries(self, stationIDs: List[str]) -> str: | |
| '''Make query string | |
| Args: | |
| * stationIDs: (List[str]) A list of station IDs to track | |
| Returns: | |
| (str) query url | |
| ''' | |
| filter_stationIDs = ' or '.join(["Thing/properties/stationID eq '{}'".format(ID) | |
| for ID in stationIDs]) | |
| Expand = '$expand=Thing,Observations({})'.format( | |
| ';'.join([ | |
| '$orderby={}'.format(enc('phenomenonTime desc')), | |
| '$top={}'.format(1) | |
| ]) | |
| ) | |
| Filter = '$filter={}'.format(enc("name eq 'PM2.5' and ({})".format(filter_stationIDs))) | |
| Count = '$count=true' | |
| query = '&'.join([Expand, Filter, Count]) | |
| return query | |
| def make_url(self, query_string: str) -> str: | |
| '''Make url with query string''' | |
| return urlparse.urlunparse((self.SCHEME, # scheme: 'https' | |
| self.NETLOC, # netloc: 'domain name' | |
| self.PATH, # path | |
| None, # params | |
| query_string, # query | |
| None)) # fragment | |
| def make_csv(self, timestamp) -> str: | |
| '''Make CSV file path''' | |
| local_datetime = datetime.fromtimestamp(timestamp) | |
| csv_name = 'pm2.5_' + local_datetime.strftime('%Y-%m-%d') + '.csv' | |
| return os.path.join(self.root_path, csv_name) | |
| if __name__ == '__main__': | |
| writer = PM25_CSVWriter(stationIDs=['10112501197', '10112850902', '10116382792'], | |
| period=UPDATE_PERIOD, | |
| log_level=logging.DEBUG) | |
| writer.start() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| requests | |
| pandas | |
| python-dateutil |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Install dependencies: