Created
June 12, 2018 20:03
-
-
Save kleptog/572b529b84a1f0c40c3c69edaa18d670 to your computer and use it in GitHub Desktop.
A simple example for how to export data from the Enelogic API (WSSE)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# From: https://enelogic.docs.apiary.io/#introduction/option-1.-wsse-authentication | |
# Go to the developer center: enelogic.com/nl/developers. | |
# Add your app and choose a desired redirect url (only for OAuth2 purposes, if using WSSE just fill in a URL e.g. enelogic.com) | |
# Fill in values below | |
import hashlib | |
import json | |
import os | |
import requests | |
import random | |
import string | |
from datetime import datetime, timedelta | |
appid = "..." | |
appsecret = "..." | |
username = "...@..." | |
apikey = "..." | |
def wsse(username, password): | |
nonce = ''.join(random.choice(string.letters + string.digits) for _ in range(16)) | |
created = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") | |
hash = hashlib.sha1(nonce + created + password).digest().encode('base64').strip() | |
return 'UsernameToken Username="%s", PasswordDigest="%s", Nonce="%s", Created="%s"' % ( | |
username, hash, nonce.encode('base64').strip(), created) | |
def enelogic_wsse(appid, username, appsecret, apikey): | |
username = appid + "." + username | |
password = appsecret + "." + apikey | |
return wsse(username, password) | |
s = requests.Session() | |
s.headers.update({'Content-Type': 'application/json'}) | |
# Get buildings | |
# Note: you have to regenerate the WSSE every request... | |
s.headers.update({'X-WSSE': enelogic_wsse(appid, username, appsecret, apikey)}) | |
res = s.get("https://enelogic.com/api/buildings/") | |
res.raise_for_status() | |
buildings = {r['id']: r for r in res.json()} | |
#[ | |
# { | |
# "id": 1505, | |
# "label": "Standaard", | |
# "address": "Nijverheidsweg", | |
# "addressNo": 25, | |
# "addressAdd": "A", | |
# "zipCode": "4529PP", | |
# "location": "Eede", | |
# "region": "Zeeland", | |
# "country": "NL", | |
# "timezone": "Europe/Amsterdam", | |
# "main": true | |
# } | |
#] | |
# Get measuringpoints | |
s.headers.update({'X-WSSE': enelogic_wsse(appid, username, appsecret, apikey)}) | |
res = s.get("https://enelogic.com/api/measuringpoints/") | |
res.raise_for_status() | |
measurepoints = {r['id']: r for r in res.json()} | |
# { | |
# "id": 1651, | |
# "buildingId": 1505, | |
# "mdId": 1, | |
# "label": "Netbeheerder", | |
# "dayMin": "2013-03-14 00:00:00", | |
# "dayMax": "2014-02-27 23:45:00", | |
# "monthMin": "2013-02-15 00:00:00", | |
# "monthMax": "2014-02-27 00:00:00", | |
# "yearMin": "2013-02-01 00:00:00", | |
# "yearMax": "2014-02-27 00:00:00", | |
# "timezone": "Europe/Amsterdam" | |
# }, | |
for measurepoint in measurepoints: | |
curr = datetime.strptime(measurepoints[measurepoint]['dayMin'][:10], "%Y-%m-%d") | |
end = datetime.strptime(measurepoints[measurepoint]['dayMax'][:10], "%Y-%m-%d") | |
while curr < end: | |
fname = "enelogic/" + str(measurepoint) + "." + curr.strftime("%Y-%m-%d") + '.json' | |
next = curr + timedelta(days=1) | |
if os.path.exists(fname): | |
curr = next | |
continue | |
print fname | |
s.headers.update({'X-WSSE': enelogic_wsse(appid, username, appsecret, apikey)}) | |
res = s.get("https://enelogic.com/api/measuringpoints/%s/datapoints/%s/%s" % (measurepoint, curr.strftime("%Y-%m-%d"), next.strftime("%Y-%m-%d"))) | |
res.raise_for_status() | |
with open(fname, "w") as f: | |
f.write(res.content) | |
curr = next |
Based on the Enelogic API docs and some insights I got from the above, I have a running Python3 script that gets my metering data (based on the Enelogic measuringpoint_id) between 2 dates and saves the interval volumes (calculated between two consecutive readings from Enelogic) in a certain format in CSV.
The script does some basic checks but nothing special.
I use OAuth2.0, but it requires me to get a new accesstoken every 2 or 3 days. This is annoying so I'd like to learn how to refresh the accesstoken using the refreshtoken that is supplied via the Google OAuth 2.0 playground..... Any help appreciated.
#!python3
import requests
from urllib.parse import urlencode
import json
from datetime import datetime
from datetime import timedelta
from datetime import date
import operator
from decimal import Decimal
import ciso8601
import pytz
import csv
access_token = "from_google_oauth2.0_playground" ##UPDATE BEFORE USE OF SCRIPT
base_url = "https://enelogic.com/api"
meteringpoint = "measuringpoint_id_in enelogic" ##UPDATE BEFORE USE OF SCRIPT
meteringdaystart = (ciso8601.parse_datetime("2023-09-10")).astimezone() ##UPDATE BEFORE USE OF SCRIPT
meteringdaystop = (ciso8601.parse_datetime("2023-10-15")).astimezone() ##UPDATE BEFORE USE OF SCRIPT
#meteringdaystop = (ciso8601.parse_datetime(date.today().strftime("%Y-%m-%d"))-timedelta(days=1)).astimezone()
meterid_I="i_9999999999" ## hardcoded meterid used in csv file, import (levering) meter
meterid_E="e_9999999999" ##hardcoded meterid used in csv file, export (teruglevering) meter
################
#Error display #
################
def show_error():
ft = sys.exc_info()[0]
fv = sys.exc_info()[1]
print("Error type : %s" % ft )
print("Error value: %s" % fv )
return
def daterange(start_date, end_date):
for n in range(int((end_date.replace(tzinfo=None) - start_date.replace(tzinfo=None)).days)):
yield start_date + timedelta(n)
def get_interval_data(meterpointid,day):
s_day=day.strftime("%Y-%m-%d")
s_nextday = (day+timedelta(days=1)).strftime("%Y-%m-%d")
url=base_url+"/measuringpoints/"+meterpointid+"/datapoints/"+s_day+"/"+s_nextday+"?access_token="+access_token
headers = {"Content-type": "application/json",
"Accept": "*/*",
"Accept-Encoding" : "gzip,deflate,br",
"Connection" : "keep-alive"}
meteringdata=None
print(url)
try:
response = requests.get(url=url, headers=headers) # , data=json.dumps(body))
if response.status_code != 200:
print ("HTTP Error retrieving metering data. HTTP response: %s" % (response.status_code))
else:
print ("Metering data retrieved. HTTP response: %s" % (response.status_code))
meteringdata=response.json()
except:
show_error()
print ("Error retrieveing metering data")
return meteringdata
#################################################################
print(meteringdaystart.isoformat())
print(meteringdaystop.isoformat())
#meteringday = datetime.strptime("2023-08-21", "%Y-%m-%d")
#meteringday = (ciso8601.parse_datetime("2023-02-21")).astimezone()
#print(meteringday.isoformat())
for meteringday in daterange(meteringdaystart, meteringdaystop):
clean_interval_data = []
for i in get_interval_data(meteringpoint,meteringday):
if i not in clean_interval_data:
clean_interval_data.append(i)
else:
print("Warning, duplicate record on %s. Filtered out." % meteringday)
clean_interval_data.sort(key=operator.itemgetter('datetime','rate'))
import_interval_data = [d for d in clean_interval_data if d['rate'] == 180]
export_interval_data = [d for d in clean_interval_data if d['rate'] == 280]
#For a full day Enelogic provides 97 values
if len(import_interval_data) != 97 or len(export_interval_data) !=97:
print("Warning, unexpected number of reads on %s (97 expected)" % meteringday)
print("Import reads : %d"% len(import_interval_data))
print("Export reads : %d"% len(export_interval_data))
# exit(1)
utctz=pytz.timezone("UTC")
n=0
#Import
csv_filename="I_"+meterid_I+"_"+meteringdaystart.isoformat()+"_"+meteringdaystop.isoformat()+".csv"
try:
#If csv_file exists: open it
csv_file=open(csv_filename, 'rt')
csv_file.close()
csv_file=open(csv_filename, 'at', newline='', encoding="utf-8")
writer = csv.writer(csv_file, dialect='excel', delimiter=';', quoting=csv.QUOTE_NONE)
except IOError:
#Otherwise: create it
csv_file=open(csv_filename, 'wt', newline='', encoding="utf-8")
writer = csv.writer(csv_file, dialect='excel', delimiter=';', quoting=csv.QUOTE_NONE)
writer.writerow([
'MeterID',
'Resolution',
'IntervalStart',
'IsEstimated',
'EnergyWh'
])
for i in import_interval_data:
#print(i['datetime'], 'Import', i['quantity'])
if n==0:
old_quantity = i['quantity']
else: #ciso8601.parse_datetime(i['datetime'])
intervalstart=(ciso8601.parse_datetime(i['datetime'])-timedelta(minutes=15)).astimezone()
intervalstart=intervalstart.astimezone(utctz)
print(i['datetime'], 'Import', i['quantity'])
writer.writerow([
meterid_I,
'15min',
intervalstart.isoformat().replace("+00:00", "Z"),
'FALSE',
str(1000*(Decimal(i['quantity'])-Decimal(old_quantity)))
])
old_quantity = i['quantity']
n+=1
csv_file.close()
n=0
#Export
csv_filename="E_"+meterid_E+"_"+meteringdaystart.isoformat()+"_"+meteringdaystop.isoformat()+".csv"
try:
#If csv_file exists: open it
csv_file=open(csv_filename, 'rt')
csv_file.close()
csv_file=open(csv_filename, 'at', newline='', encoding="utf-8")
writer = csv.writer(csv_file, dialect='excel', delimiter=';', quoting=csv.QUOTE_NONE)
except IOError:
#Otherwise: create it
csv_file=open(csv_filename, 'wt', newline='', encoding="utf-8")
writer = csv.writer(csv_file, dialect='excel', delimiter=';', quoting=csv.QUOTE_NONE)
writer.writerow([
'MeterID',
'Resolution',
'IntervalStart',
'IsEstimated',
'EnergyWh'
])
for i in export_interval_data:
#print(i['datetime'], 'Export', i['quantity'])
if n==0:
old_quantity = i['quantity']
else:
intervalstart=(ciso8601.parse_datetime(i['datetime'])-timedelta(minutes=15)).astimezone()
intervalstart=intervalstart.astimezone(utctz)
print(i['datetime'], 'Export', i['quantity'])
writer.writerow([
meterid_E,
'15min',
intervalstart.isoformat().replace("+00:00", "Z"),
'FALSE',
str(1000*(Decimal(i['quantity'])-Decimal(old_quantity)))
])
old_quantity = i['quantity']
n+=1
csv_file.close()
##output format
##MeterID;Resolution;IntervalStart;IsEstimated;EnergyWh
##91998_I;15min;2021-12-31T23:00:00Z;FALSE;82.76184291
Anyone that have the refresh token using OAuth2 running??
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
for me this works:
I have one file pulling in all sorts of energy related data (energie_data.py) which uses my enelogic.py
energie_data.py:
enelogic.py: