Created
January 10, 2017 11:11
-
-
Save koma5/499bf51442ebf23bac718c3c041c15a7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pymongo, mosquitto, os | |
from pymongo import MongoClient | |
from datetime import datetime, time | |
mqttHost = 'mqtt' | |
mongoHost = '127.0.0.1' | |
pidFile = '/var/run/record2MongodbPid' | |
def isRunning(pid): | |
try: | |
pid = int(pid) | |
os.kill(pid, 0) | |
return True | |
except (OSError, ValueError): | |
return False | |
def castValue(value): | |
try: | |
42 + value # Test for a number (float or int) | |
r = value | |
except TypeError: | |
try: | |
r = int(value) | |
except ValueError: # it is eventually a float | |
try: | |
r = float(value) | |
except ValueError: | |
r = value | |
return r | |
def on_message(a, b, msg): | |
jetzt = datetime.now() | |
if (msg.topic != "vw/gps"): | |
collMessages.insert( | |
{ 'time': jetzt, | |
'tpc': msg.topic, | |
'msg': castValue(msg.payload) | |
}) | |
# update 5 minute average | |
# update 10 minute average | |
# update 15 minute average | |
# update 30 minute average | |
# update daily average | |
valueToAverage = castValue(msg.payload) | |
if not isinstance(valueToAverage, str): | |
currentHour = datetime.combine(jetzt, time(jetzt.hour)) | |
hourlyData = { | |
'$inc': { 'msgCount': 1, 'msgSum': valueToAverage } | |
} | |
query = { | |
'tpc': msg.topic, | |
'time': currentHour, | |
'rate': 'hourly', | |
'currentHour': jetzt.hour | |
} | |
collAverages.update(query, hourlyData, True) | |
# update daily average | |
midnight = datetime.combine(jetzt, time(0)) | |
dayOfWeek = jetzt.isoweekday() | |
dailyData = { | |
'$inc': { 'msgCount': 1, 'msgSum': valueToAverage } | |
} | |
query = { | |
'tpc': msg.topic, | |
'time': midnight, | |
'rate': 'daily', | |
'dayOfWeek': dayOfWeek | |
} | |
collAverages.update(query, dailyData, True) | |
print(str(jetzt) + " : " + msg.topic + ": " + msg.payload) | |
mqttc = mosquitto.Mosquitto('mqttRecord-' + str(os.getpid())) | |
mqttc.on_message = on_message | |
try: | |
f = open(pidFile,'r') | |
runningPid = f.read() | |
f.close() | |
pidFileExists = True | |
except IOError: | |
pidFileExists = False | |
#not allready running | |
if (not pidFileExists or not isRunning(runningPid)): | |
mqttc.connect(mqttHost, 1883, 60) | |
topics = ['#'] | |
[mqttc.subscribe(t, 0) for t in topics] | |
mongo = MongoClient(mongoHost) | |
db = mongo.mqttRecord | |
collMessages = db.messages | |
collAverages = db.averages | |
# create pid file | |
f = open(pidFile,'w') | |
f.write(str(os.getpid())) | |
f.close() | |
else: | |
print "not supposed to run, allready running here: " + runningPid | |
raise SystemExit(0) | |
try: | |
while True: | |
mqttc.loop() | |
except (KeyboardInterrupt): | |
print "\ntime to die.." | |
mqttc.disconnect() | |
os.remove(pidFile) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment