Last active
April 22, 2016 18:13
-
-
Save claws/5482174 to your computer and use it in GitHub Desktop.
Publish Australian weather observations to MQTT
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
''' | |
Chris Laws 2013 | |
Publish Australian weather observations over MQTT. | |
This script periodically retrieves weather observations from the | |
Australia Bureau of Meteorology. The source data comes from a file | |
that has the designator IDY03021 - which represents the National | |
Meteorological Operations Centre. This file contains observation | |
data for all Australian states and territories. | |
The information is parsed into station data for each state and | |
published on topics constructed from state name and station name. | |
An example is: bomau/South Australia/Adelaide | |
Requires: | |
- https://github.com/adamvr/MQTT-For-Twisted-Python | |
''' | |
import datetime | |
import json | |
import logging | |
from MQTT import MQTTProtocol | |
from twisted.application.service import Service | |
from twisted.internet import reactor, defer | |
from twisted.internet.protocol import ClientCreator | |
from twisted.internet.task import LoopingCall | |
from twisted.python.log import PythonLoggingObserver | |
from twisted.web.client import getPage | |
# There are currently two public MQTT test servers. | |
mqtt_broker = "test.mosquitto.org" | |
#mqtt_broker = "m2m.eclipse.org" | |
observations_url = 'http://www.bom.gov.au/cgi-bin/wrap_fwo.pl?IDY03021.txt' | |
# Attempting to comply with BoM secondary distribution guildelines. These | |
# items will accompany any data published by this applicaiton. | |
copyright_statement = "Copyright Commonwealth of Australia , Bureau of Meteorology (ABN 92 637 533 532)" | |
data_source = 'http://www.bom.gov.au/' | |
NA = 'N/A' | |
CALM = 'Calm' | |
Precis_Expansion = {'----': NA, | |
'Cldy': 'Cloudy', | |
'Clr': 'Clear', | |
'Drzl': 'Drizzle', | |
'Fine': 'Fine', | |
'Haze': 'Haze', | |
'Fog': 'Foggy', | |
'Ocst': 'Overcast', | |
'Rain': 'Rain', | |
'Smke': 'Smoky', | |
'Snow': 'Snow', | |
'Wndy': 'Windy'} | |
WIND = 'wind' | |
NAME = 'name' | |
# | |
CLOUD_COVER = 'cloud_cover' | |
DAY = 'day' | |
HOUR = 'hour' | |
LATITUDE = 'latitude' | |
LONGITUDE = 'longitude' | |
PRECIS = 'precis' | |
PRESSURE = 'pressure_hpa' | |
RAIN = 'rain_mm' | |
RELATIVE_HUMIDITY = 'rel_humidity' | |
TEMPERATURE = 'temperature' | |
TEMPERATURE_MAX = 'temperature_max' | |
TEMPERATURE_MIN = 'temperature_min' | |
UPDATED = 'updated' | |
VISIBILITY = 'visibility_km' | |
WIND_DIRECTION = 'wind_direction' | |
WIND_SPEED = 'wind_speed_kmh' | |
# Sometimes weather stations are offline. In these cases a set of | |
# fields contain default data indicating no data is available. | |
# Define a set of fields which are expected to contain data that | |
# changes from update to update. We can use this to filter out | |
# stations that contain no data. | |
CHANGING_DATA_FIELDS = [DAY, HOUR, VISIBILITY, CLOUD_COVER, WIND_DIRECTION, | |
WIND_SPEED, TEMPERATURE, RELATIVE_HUMIDITY, | |
PRESSURE, RAIN, PRECIS, TEMPERATURE_MAX, | |
TEMPERATURE_MIN] | |
# The following dict is used to crack each station line. The dict | |
# contains the field name and a start and stop index. The characters | |
# between these markers will get extracted to represent the value for | |
# the named field. | |
FIELDS = {NAME: (0, 10), | |
LATITUDE: (11, 16), | |
LONGITUDE: (17, 22), | |
DAY: (23, 26), | |
HOUR: (26, 30), | |
VISIBILITY: (31, 33), | |
CLOUD_COVER: (35, 36), | |
WIND: (37, 44), | |
TEMPERATURE: (44, 47), | |
RELATIVE_HUMIDITY: (47, 51), | |
PRESSURE: (51, 58), | |
RAIN: (59, 65), | |
PRECIS: (67, 71), | |
TEMPERATURE_MAX: (72, 75), | |
TEMPERATURE_MIN: (75, 78)} | |
class MQTTPublisher(MQTTProtocol): | |
pingPeriod = 60 | |
def __init__(self, client_id, onBrokerConnected): | |
self.client_id = client_id | |
self.onBrokerConnected = onBrokerConnected | |
def connectionMade(self): | |
''' Physical connection made to broker, now perform protocol connection ''' | |
log.info('Connected to MQTT Broker') | |
self.connect(self.client_id, keepalive=self.pingPeriod * 1000) | |
reactor.callLater(self.pingPeriod, self.pingreq) | |
def connectionLost(self, reason): | |
''' ''' | |
log.info("Disconnected from MQTT Broker: %s" % reason) | |
def pingrespReceived(self): | |
log.info('Ping received from MQTT broker') | |
reactor.callLater(self.pingPeriod, self.pingreq) | |
def connackReceived(self, status): | |
if status == 0: | |
self.onBrokerConnected() | |
else: | |
log.info('Connection to MQTT broker failed') | |
class BomauObservationsClient(object): | |
""" | |
The observations client allows a user to obtain BoM observations for a | |
specified URL. The client can operate in two modes. | |
The first mode simply retrieves the BoM observations whenever the | |
Client's get_observations method is called. | |
The second mode will keep the client's observations attribute | |
up to date by running a periodic task that retrieves the latest observation. | |
The update routine used in this mode is optimized to make the fewest | |
update requests as possible to maintain current data. It inspects the first | |
response and determines the appropriate time to begin the periodic | |
observations retrieval such that the minimum number of requests are | |
made to keep the observations up to date. | |
""" | |
def __init__(self, observation_url): | |
self.observation_url = observation_url | |
# A reference to the task that periodically requests an | |
# observation update. This is needed so the task can be | |
# stopped later. | |
self.periodicRetrievalTask = None | |
self.mqttConnection = None | |
# This list is used to temporarily hold the list of publish | |
# messages | |
self.publishQueue = [] | |
@defer.inlineCallbacks | |
def start(self): | |
''' Start the weather observation MQTT publisher ''' | |
log.info('BoM Observation Client starting') | |
clientCreator = ClientCreator(reactor, | |
MQTTPublisher, | |
"BomAuPub", | |
self.onMQTTBrokerCommunicationEstablished) | |
log.info('Creating MQTT client') | |
self.mqttConnection = yield clientCreator.connectTCP(mqtt_broker, 1883) | |
defer.returnValue(True) | |
def stop(self): | |
""" | |
Stop monitoring sensors in and around the home environment | |
""" | |
log.info('BoM Observation Client stopping') | |
if self.mqttConnection: | |
self.mqttConnection.transport.loseConnection() | |
if self.periodicRetrievalTask: | |
if self.periodicRetrievalTask.active(): | |
self.periodicRetrievalTask.cancel() | |
def onMQTTBrokerCommunicationEstablished(self): | |
''' | |
Upon connection to MQTT Broker begin periodic weather | |
observation retrievals. | |
''' | |
self.retrieveObservations() | |
@defer.inlineCallbacks | |
def retrieveObservations(self): | |
""" | |
Retrieve the latest BoM observation and store it | |
""" | |
observations = yield self.get_observations() | |
if observations: | |
log.info("BoM observations for %i regions retrieved successfully" % len(observations)) | |
# convert observations into MQTT format messages and add them | |
# into a queue for publishing. | |
for state in observations: | |
# for this demonstration only publish stations for the | |
# state of South Australia. | |
if state in ['South Australia']: | |
log.info("%s has %i stations" %(state, len(observations[state]))) | |
for station in observations[state]: | |
# for this demonstration only publish the Adelaide | |
# station data. | |
if station == 'Adelaide': | |
payload = json.dumps(observations[state][station]) | |
topic = 'bomau/%s/%s' % (state, station) | |
self.publishQueue.append((topic, payload)) | |
if self.publishQueue: | |
log.info("Beginning to publish %i updates to MQTT Broker" % len(self.publishQueue)) | |
self.publishUpdates() | |
defer.returnValue(None) | |
@defer.inlineCallbacks | |
def get_observations(self): | |
""" | |
Retrieve the latest observations from the BOM in JSON format. | |
Returns a deferred that will eventually return an Observation | |
object with attributes populated from parsing the JSON update. | |
@return: A deferred that returns an Observations object | |
@rtype: defer.Deferred | |
""" | |
try: | |
logging.debug("Requesting new observation data from: %s" % self.observation_url) | |
pageHtml = yield getPage(self.observation_url) | |
log.debug("Retrieved new observation data") | |
# observations is a dict of state name keys holding dicts | |
# as values. Each state holds a dict of station names with | |
# a dict value containing observation data. | |
observations = {} | |
lines = pageHtml.split('\n') | |
start = lines.index('<pre style="font: Courier;">') | |
end = lines.index('</pre>') | |
lines = lines[start+1:end] | |
# Search through the lines (from bottom to top so we encounter | |
# the Antarctica section first which has time in UTC) to find | |
# the UTC time that this update was recorded so a timer can | |
# be set to perform the next retrieval. | |
for line in reversed(lines): | |
if 'Updated: ' in line: | |
line = line[1:-1] | |
line = line.replace('Updated: ', '') | |
line = line.split('(')[0] | |
update_time_utc = line.strip() | |
# Eg. Mon Apr 29 04:10:24 UTC 2013 (0410 UTC) | |
format = "%a %b %d %H:%M:%S UTC %Y" | |
updated_utc = datetime.datetime.strptime(update_time_utc, | |
format) | |
# Add five minutes to the next measurement update time to | |
# allow the data to be updated on the BoM website. | |
next_update_time = updated_utc + datetime.timedelta(minutes=65) | |
now_utc = datetime.datetime.utcnow() | |
time_until_next_update = next_update_time - now_utc | |
log.info("Next observation retrieval will occur after delay of: %s" % time_until_next_update) | |
delay_in_seconds = time_until_next_update.total_seconds() | |
self.periodicRetrievalTask = reactor.callLater(delay_in_seconds, | |
self.retrieveObservations) | |
break | |
# recombine lines so they can be split on region (state) | |
# boundaries. | |
data = '\n'.join(lines) | |
state_sections = data.split('\nIDY')[1:] | |
print "detected %i state sections" % len(state_sections) | |
for state_section in state_sections: | |
lines = state_section.split('\n')[1:] | |
state = None | |
station = None | |
for line in lines: | |
if True in [line.startswith(x) for x in ['+', '|']]: | |
# this is a header banner line | |
if 'Weather Observations : ' in line: | |
state = line[1:-1] | |
state = state.replace('Weather Observations : ', '') | |
state = state.strip() | |
observations[state] = {} | |
else: | |
# initialise station data to contain some mandatory fields | |
d = {'updated' : update_time_utc, | |
'copyright': copyright_statement, | |
'source': data_source} | |
for field, (start, stop) in FIELDS.items(): | |
value = line[start:stop] | |
value = value.strip() | |
# The wind field is always separated into | |
# two fields representing direction and speed. | |
if value.startswith('-') and value.endswith('-'): | |
# This field has no data | |
if field == WIND: | |
d[WIND_DIRECTION] = NA | |
d[WIND_SPEED] = NA | |
else: | |
d[field] = NA | |
else: | |
# This field has data | |
if field == PRECIS: | |
value = Precis_Expansion.get(value, value) | |
d[field] = value | |
elif field == WIND: | |
if value == CALM: | |
d[WIND_DIRECTION] = value | |
d[WIND_SPEED] = '0' | |
else: | |
wind_dir, wind_speed_kt = value.split('/') | |
if wind_dir == '---': | |
wind_dir = NA | |
if wind_speed_kt == "---": | |
wind_speed_kmh = NA | |
else: | |
# convert kt to km/h | |
wind_speed_kmh = int(wind_speed_kt) * 0.539956803456 | |
wind_speed_kmh = "%.2f" % wind_speed_kmh | |
d[WIND_DIRECTION] = wind_dir | |
d[WIND_SPEED] = wind_speed_kmh | |
elif field == NAME: | |
if value.endswith('AP') or value.endswith('Ap'): | |
value = value[:-2] | |
if not value.endswith(' '): | |
value += ' ' | |
value += 'Airport' | |
station = value | |
else: | |
d[field] = value | |
# Only add station data if there was some useful data. | |
station_data = [d[k] != NA for k in CHANGING_DATA_FIELDS] | |
station_data_available = True in station_data | |
if station_data_available: | |
# Stations are sometimes listed twice. | |
if station in observations[state]: | |
identical_day = observations[state][station][DAY] == d[DAY] | |
identical_hour = observations[state][station][HOUR] == d[HOUR] | |
if identical_hour and identical_hour: | |
# Blend the values if the times are | |
# identical. Only take values from | |
# the second instance if existing | |
# value is 'N/A' | |
for k, v in observations[state][station].items(): | |
if v == NA: | |
observations[state][station][k] = d[k] | |
elif d[DAY] != observations[state][station][DAY]: | |
# If the days do not match then take the | |
# most recent day which is generally the | |
# larger number, expect at month rollover. | |
if (d[DAY] > observations[state][station][DAY]) or (d[DAY] == 1): | |
# This is the end of month rollover or the | |
# end of night rollover. Overwrite the earlier | |
# data with more up to date data. | |
observations[state][station] = d | |
else: | |
observations[state][station] = d | |
else: | |
log.debug("Ignoring %s - no data available" % station) | |
log.debug("%s contained %i stations" % (state, len(observations[state]))) | |
defer.returnValue(observations) | |
except Exception, ex: | |
log.error("Unable to retrieve observations data:") | |
log.exception(ex) | |
defer.returnValue(None) | |
def publishUpdates(self): | |
''' | |
Publish updates to MQTT Broker. | |
Publish one message fromthe queue at a time. Reschedule | |
calls back to this function until all messages are sent. | |
This approach gives some time back to the reactor to handle | |
other things. | |
''' | |
if self.publishQueue: | |
topic, message = self.publishQueue.pop() | |
self.mqttConnection.publish(topic, message) | |
if self.publishQueue: | |
reactor.callLater(0.1, self.publishUpdates) | |
else: | |
log.info("Completed publishing observation updates.") | |
if __name__ == '__main__': | |
logging.basicConfig(format='%(asctime)-15s: %(levelname)s - %(filename)s:%(lineno)d - %(message)s', | |
level=logging.DEBUG) | |
log = logging.getLogger(__name__) | |
# Send twisted log output to common logger too | |
PythonLoggingObserver().start() | |
bomau_weather = BomauObservationsClient(observations_url) | |
reactor.callWhenRunning(bomau_weather.start) | |
reactor.run() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
''' | |
Chris Laws 2013 | |
This script subscribes for MQTT publications of weather observations | |
for Adelaide, South Australia. | |
''' | |
import logging | |
from MQTT import MQTTProtocol | |
from twisted.internet import reactor | |
from twisted.internet.protocol import ClientCreator | |
from twisted.python.log import PythonLoggingObserver | |
class MQTTListener(MQTTProtocol): | |
pingPeriod = 60 # seconds | |
def connectionMade(self): | |
log.info('Connected to MQTT Broker') | |
self.connect("BomAuSub", keepalive=self.pingPeriod * 1000) | |
reactor.callLater(self.pingPeriod, self.pingreq) | |
def pingrespReceived(self): | |
log.info('Ping response received from MQTT broker') | |
reactor.callLater(self.pingPeriod, self.pingreq) | |
def connackReceived(self, status): | |
if status == 0: | |
topic = "bomau/South Australia/Adelaide" | |
self.subscribe(topic) | |
log.info("Subscribed for topic: %s" % topic) | |
else: | |
log.error('Connection to MQTT broker failed') | |
def publishReceived(self, topic, message, qos, dup, retain, messageId): | |
# Received a publish on an output topic | |
log.info('Update received. Topic: %s, Message: %s' % (topic, message)) | |
if __name__ == '__main__': | |
logging.basicConfig(format='%(asctime)-15s: %(levelname)s - %(filename)s:%(lineno)d - %(message)s', | |
level=logging.DEBUG) | |
log = logging.getLogger(__name__) | |
# Send twisted log output to common logger too | |
PythonLoggingObserver().start() | |
# There are currently two public MQTT test servers. | |
mqtt_broker = "test.mosquitto.org" | |
#mqtt_broker = "m2m.eclipse.org" | |
clientCreator = ClientCreator(reactor, | |
MQTTListener) | |
log.info('Creating MQTT client') | |
reactor.callWhenRunning(clientCreator.connectTCP, mqtt_broker, 1883) | |
reactor.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment