-
-
Save kleptog/572b529b84a1f0c40c3c69edaa18d670 to your computer and use it in GitHub Desktop.
# From: https://enelogic.docs.apiary.io/#introduction/option-1.-wsse-authentication | |
# Go to the developer center: enelogic.com/nl/developers. | |
# Add your app and choose a desired redirect url (only for OAuth2 purposes, if using WSSE just fill in a URL e.g. enelogic.com) | |
# Fill in values below | |
import hashlib | |
import json | |
import os | |
import requests | |
import random | |
import string | |
from datetime import datetime, timedelta | |
appid = "..." | |
appsecret = "..." | |
username = "...@..." | |
apikey = "..." | |
def wsse(username, password): | |
nonce = ''.join(random.choice(string.letters + string.digits) for _ in range(16)) | |
created = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") | |
hash = hashlib.sha1(nonce + created + password).digest().encode('base64').strip() | |
return 'UsernameToken Username="%s", PasswordDigest="%s", Nonce="%s", Created="%s"' % ( | |
username, hash, nonce.encode('base64').strip(), created) | |
def enelogic_wsse(appid, username, appsecret, apikey): | |
username = appid + "." + username | |
password = appsecret + "." + apikey | |
return wsse(username, password) | |
s = requests.Session() | |
s.headers.update({'Content-Type': 'application/json'}) | |
# Get buildings | |
# Note: you have to regenerate the WSSE every request... | |
s.headers.update({'X-WSSE': enelogic_wsse(appid, username, appsecret, apikey)}) | |
res = s.get("https://enelogic.com/api/buildings/") | |
res.raise_for_status() | |
buildings = {r['id']: r for r in res.json()} | |
#[ | |
# { | |
# "id": 1505, | |
# "label": "Standaard", | |
# "address": "Nijverheidsweg", | |
# "addressNo": 25, | |
# "addressAdd": "A", | |
# "zipCode": "4529PP", | |
# "location": "Eede", | |
# "region": "Zeeland", | |
# "country": "NL", | |
# "timezone": "Europe/Amsterdam", | |
# "main": true | |
# } | |
#] | |
# Get measuringpoints | |
s.headers.update({'X-WSSE': enelogic_wsse(appid, username, appsecret, apikey)}) | |
res = s.get("https://enelogic.com/api/measuringpoints/") | |
res.raise_for_status() | |
measurepoints = {r['id']: r for r in res.json()} | |
# { | |
# "id": 1651, | |
# "buildingId": 1505, | |
# "mdId": 1, | |
# "label": "Netbeheerder", | |
# "dayMin": "2013-03-14 00:00:00", | |
# "dayMax": "2014-02-27 23:45:00", | |
# "monthMin": "2013-02-15 00:00:00", | |
# "monthMax": "2014-02-27 00:00:00", | |
# "yearMin": "2013-02-01 00:00:00", | |
# "yearMax": "2014-02-27 00:00:00", | |
# "timezone": "Europe/Amsterdam" | |
# }, | |
for measurepoint in measurepoints: | |
curr = datetime.strptime(measurepoints[measurepoint]['dayMin'][:10], "%Y-%m-%d") | |
end = datetime.strptime(measurepoints[measurepoint]['dayMax'][:10], "%Y-%m-%d") | |
while curr < end: | |
fname = "enelogic/" + str(measurepoint) + "." + curr.strftime("%Y-%m-%d") + '.json' | |
next = curr + timedelta(days=1) | |
if os.path.exists(fname): | |
curr = next | |
continue | |
print fname | |
s.headers.update({'X-WSSE': enelogic_wsse(appid, username, appsecret, apikey)}) | |
res = s.get("https://enelogic.com/api/measuringpoints/%s/datapoints/%s/%s" % (measurepoint, curr.strftime("%Y-%m-%d"), next.strftime("%Y-%m-%d"))) | |
res.raise_for_status() | |
with open(fname, "w") as f: | |
f.write(res.content) | |
curr = next |
Hi gjmeer,
You have an oauth2 script working? Would you like to share the code (i can't get it working)?
regards
JDear J,
It is long time ago i tried this Pythonscript in Power BI. The OAuth2 did work, but I didn't recieve the data I want.
I followed the instruction on enelogic.docs.apiary.io to create an appid, appscret and apikey.
Do you have a working script beside the OAuth?
The Pythonscript is as follows:# From: https://enelogic.docs.apiary.io/#introduction/option-2.-oauth2-authentication-(recommended) # Go to the developer center: enelogic.com/nl/developers. # Add your app and choose a desired redirect url (only for OAuth2 purposes, if using WSSE just fill in a URL e.g. enelogic.com) # Fill in values below import hashlib import json import os import requests import random import string import base64 from datetime import datetime, date, time, timedelta appid = "Generated by Google Developers" appsecret = "Generated by Google Developers" username = "username from Enelogic" apikey = "Generated by Google Developers" def wsse(username, password): nonce = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(16)) nonce = nonce.encode('utf-8') created = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") created = created.encode('utf-8') hash = base64.b64encode(hashlib.sha1(nonce+created+password).digest()).strip() return 'UsernameToken Username="%s", PasswordDigest="%s", Nonce="%s", Created="%s"' % (username, str(hash, 'utf8'), str(base64.b64encode(nonce).strip(),'utf8'), str(created,'utf8')) return(wsse) def enelogic_wsse(appid, username, appsecret, apikey): username = appid + "." + username password = appsecret + "." + apikey password = password.encode('utf8') return wsse(username, password) s = requests.Session() s.headers.update({'Content-Type': 'application/json'}) # Get buildings # Note: you have to regenerate the WSSE every request... s.headers.update({'X-WSSE': enelogic_wsse(appid, username, appsecret, apikey)}) res = s.get("https://enelogic.com/api/buildings/") res.raise_for_status() buildings = {r['id']: r for r in res.json()} #[ # { # "id": 1505, # "label": "Standaard", # "address": "Nijverheidsweg", # "addressNo": 25, # "addressAdd": "A", # "zipCode": "4529PP", # "location": "Eede", # "region": "Zeeland", # "country": "NL", # "timezone": "Europe/Amsterdam", # "main": true # } #] # Get measuringpoints s.headers.update({'X-WSSE': enelogic_wsse(appid, username, appsecret, apikey)}) res = s.get("https://enelogic.com/api/measuringpoints/") res.raise_for_status() measurepoints = {r['id']: r for r in res.json()} # { # "id": 1651, # "buildingId": 1505, # "mdId": 1, # "label": "Netbeheerder", # "dayMin": "2013-03-14 00:00:00", # "dayMax": "2014-02-27 23:45:00", # "monthMin": "2013-02-15 00:00:00", # "monthMax": "2014-02-27 00:00:00", # "yearMin": "2013-02-01 00:00:00", # "yearMax": "2014-02-27 00:00:00", # "timezone": "Europe/Amsterdam" # }, for measurepoint in measurepoints: curr = datetime.strptime(measurepoints[measurepoint]['dayMin'][:10], "%Y-%m-%d") end = datetime.strptime(measurepoints[measurepoint]['dayMax'][:10], "%Y-%m-%d") while curr < end: fname = os.path.join(dirname, "enelogic/" + str(measurepoint) + "." + curr.strftime("%Y-%m-%d") + '.json' next = curr + timedelta(days=1) if os.path.exists(fname): curr = next continue print (fname) s.headers.update({'X-WSSE': enelogic_wsse(appid, username, appsecret, apikey)}) res = s.get("https://enelogic.com/api/measuringpoints/%s/datapoints/%s/%s" % (measurepoint, curr.strftime("%Y-%m-%d"), next.strftime("%Y-%m-%d"))) res.raise_for_status() with open(fname, "w") as f: f.write(res.content) curr = next
dear gjmeer
Thanks for the reply. I already had the wsse script working, but it seems a little unstable (authentication errors now and then). That is why i was searching for an oauth2 solution. Mine works fine with the inintal access token generated via the 'documented' google playground method, but i can't get token refreshed from a python script.
Thanks for your reply, i will struggle on :-)
Dear Jeroen,
As ypu have a working version of a WSSE script could you help me. The OAUTH2 is too complex for me. I would like to just execute the python script on my windows laptop. But I get some errors. I generated the keys with https://enelogic.com/nl/developers/
appid = "10105_327wdyv10aiow4occc8c0ds08wskodc4scsckoo8dkks4cckkk8"
appsecret = "1rjn5swy4ux3406sw0kk048kc4wwcolwcgskwkws0so0wo084g"
username = "[email protected]"
apikey = "4mqjcj35oq04occcskkssc4oc00cog0"
And used the apikey from the enelogic account.
Is this the correct way to obtain the parameters.
The script is not working not on windows and not on raspberry linux.
Could you share your script and information, I know it is a little unstable.
Thanks for your help.
for me this works:
I have one file pulling in all sorts of energy related data (energie_data.py) which uses my enelogic.py
energie_data.py:
import requests
import enelogic
...
...
# enelogic (ruwe) data opvragen buiten de functie, anders failt de wsse authenticatie steeds...
wsse_header = enelogic.wsse_autorisatie_header()
with requests.Session() as s:
s.headers.update({'Content-Type': 'application/json'})
# WSSE header moet bij ieder request ververst worden:
s.headers.update({'X-WSSE': wsse_header})
enelogic_url = "https://enelogic.com/api/measuringpoints/3*****0/datapoint/days/{}/{}".format(datum['enelogic_start_date'], datum['enelogic_end_date'])
res = s.get(enelogic_url)
# Terug komt json, dus deze kunnen we gaan parsen
enelogic_json = res.json()
print(enelogic_json)
......
enelogic.py:
import hashlib
import random
import string
import base64
from datetime import datetime
def wsse_autorisatie_header():
username = u"955**********w4s.jer*********.com". #format = app_id.username
password = "14j********k8c.isf******s8k" # format= app_secret.api_key
password = password.encode('utf8')
nonce = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(16))
nonce = nonce.encode('utf-8')
created = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
created = created.encode('utf-8')
hash = base64.b64encode(hashlib.sha1(nonce+created+password).digest()).strip()
wsse = 'UsernameToken Username="%s", PasswordDigest="%s", Nonce="%s", Created="%s"' % (username, str(hash, 'utf8'), str(base64.b64encode(nonce).strip(),'utf8'), str(created,'utf8'))
return(wsse)
def process_enelogic_data(json_data):
#sorteren van de list op basis van de key uit de dicts die in de list zitten
#eerst op datum dan op rate (id is niet betrouwbaar)
json_data = sorted(json_data, key = lambda i:(i['date'],i['rate']))
print("json_data:",json_data)
#extraheren van data uit de dictionary:
......
Based on the Enelogic API docs and some insights I got from the above, I have a running Python3 script that gets my metering data (based on the Enelogic measuringpoint_id) between 2 dates and saves the interval volumes (calculated between two consecutive readings from Enelogic) in a certain format in CSV.
The script does some basic checks but nothing special.
I use OAuth2.0, but it requires me to get a new accesstoken every 2 or 3 days. This is annoying so I'd like to learn how to refresh the accesstoken using the refreshtoken that is supplied via the Google OAuth 2.0 playground..... Any help appreciated.
#!python3
import requests
from urllib.parse import urlencode
import json
from datetime import datetime
from datetime import timedelta
from datetime import date
import operator
from decimal import Decimal
import ciso8601
import pytz
import csv
access_token = "from_google_oauth2.0_playground" ##UPDATE BEFORE USE OF SCRIPT
base_url = "https://enelogic.com/api"
meteringpoint = "measuringpoint_id_in enelogic" ##UPDATE BEFORE USE OF SCRIPT
meteringdaystart = (ciso8601.parse_datetime("2023-09-10")).astimezone() ##UPDATE BEFORE USE OF SCRIPT
meteringdaystop = (ciso8601.parse_datetime("2023-10-15")).astimezone() ##UPDATE BEFORE USE OF SCRIPT
#meteringdaystop = (ciso8601.parse_datetime(date.today().strftime("%Y-%m-%d"))-timedelta(days=1)).astimezone()
meterid_I="i_9999999999" ## hardcoded meterid used in csv file, import (levering) meter
meterid_E="e_9999999999" ##hardcoded meterid used in csv file, export (teruglevering) meter
################
#Error display #
################
def show_error():
ft = sys.exc_info()[0]
fv = sys.exc_info()[1]
print("Error type : %s" % ft )
print("Error value: %s" % fv )
return
def daterange(start_date, end_date):
for n in range(int((end_date.replace(tzinfo=None) - start_date.replace(tzinfo=None)).days)):
yield start_date + timedelta(n)
def get_interval_data(meterpointid,day):
s_day=day.strftime("%Y-%m-%d")
s_nextday = (day+timedelta(days=1)).strftime("%Y-%m-%d")
url=base_url+"/measuringpoints/"+meterpointid+"/datapoints/"+s_day+"/"+s_nextday+"?access_token="+access_token
headers = {"Content-type": "application/json",
"Accept": "*/*",
"Accept-Encoding" : "gzip,deflate,br",
"Connection" : "keep-alive"}
meteringdata=None
print(url)
try:
response = requests.get(url=url, headers=headers) # , data=json.dumps(body))
if response.status_code != 200:
print ("HTTP Error retrieving metering data. HTTP response: %s" % (response.status_code))
else:
print ("Metering data retrieved. HTTP response: %s" % (response.status_code))
meteringdata=response.json()
except:
show_error()
print ("Error retrieveing metering data")
return meteringdata
#################################################################
print(meteringdaystart.isoformat())
print(meteringdaystop.isoformat())
#meteringday = datetime.strptime("2023-08-21", "%Y-%m-%d")
#meteringday = (ciso8601.parse_datetime("2023-02-21")).astimezone()
#print(meteringday.isoformat())
for meteringday in daterange(meteringdaystart, meteringdaystop):
clean_interval_data = []
for i in get_interval_data(meteringpoint,meteringday):
if i not in clean_interval_data:
clean_interval_data.append(i)
else:
print("Warning, duplicate record on %s. Filtered out." % meteringday)
clean_interval_data.sort(key=operator.itemgetter('datetime','rate'))
import_interval_data = [d for d in clean_interval_data if d['rate'] == 180]
export_interval_data = [d for d in clean_interval_data if d['rate'] == 280]
#For a full day Enelogic provides 97 values
if len(import_interval_data) != 97 or len(export_interval_data) !=97:
print("Warning, unexpected number of reads on %s (97 expected)" % meteringday)
print("Import reads : %d"% len(import_interval_data))
print("Export reads : %d"% len(export_interval_data))
# exit(1)
utctz=pytz.timezone("UTC")
n=0
#Import
csv_filename="I_"+meterid_I+"_"+meteringdaystart.isoformat()+"_"+meteringdaystop.isoformat()+".csv"
try:
#If csv_file exists: open it
csv_file=open(csv_filename, 'rt')
csv_file.close()
csv_file=open(csv_filename, 'at', newline='', encoding="utf-8")
writer = csv.writer(csv_file, dialect='excel', delimiter=';', quoting=csv.QUOTE_NONE)
except IOError:
#Otherwise: create it
csv_file=open(csv_filename, 'wt', newline='', encoding="utf-8")
writer = csv.writer(csv_file, dialect='excel', delimiter=';', quoting=csv.QUOTE_NONE)
writer.writerow([
'MeterID',
'Resolution',
'IntervalStart',
'IsEstimated',
'EnergyWh'
])
for i in import_interval_data:
#print(i['datetime'], 'Import', i['quantity'])
if n==0:
old_quantity = i['quantity']
else: #ciso8601.parse_datetime(i['datetime'])
intervalstart=(ciso8601.parse_datetime(i['datetime'])-timedelta(minutes=15)).astimezone()
intervalstart=intervalstart.astimezone(utctz)
print(i['datetime'], 'Import', i['quantity'])
writer.writerow([
meterid_I,
'15min',
intervalstart.isoformat().replace("+00:00", "Z"),
'FALSE',
str(1000*(Decimal(i['quantity'])-Decimal(old_quantity)))
])
old_quantity = i['quantity']
n+=1
csv_file.close()
n=0
#Export
csv_filename="E_"+meterid_E+"_"+meteringdaystart.isoformat()+"_"+meteringdaystop.isoformat()+".csv"
try:
#If csv_file exists: open it
csv_file=open(csv_filename, 'rt')
csv_file.close()
csv_file=open(csv_filename, 'at', newline='', encoding="utf-8")
writer = csv.writer(csv_file, dialect='excel', delimiter=';', quoting=csv.QUOTE_NONE)
except IOError:
#Otherwise: create it
csv_file=open(csv_filename, 'wt', newline='', encoding="utf-8")
writer = csv.writer(csv_file, dialect='excel', delimiter=';', quoting=csv.QUOTE_NONE)
writer.writerow([
'MeterID',
'Resolution',
'IntervalStart',
'IsEstimated',
'EnergyWh'
])
for i in export_interval_data:
#print(i['datetime'], 'Export', i['quantity'])
if n==0:
old_quantity = i['quantity']
else:
intervalstart=(ciso8601.parse_datetime(i['datetime'])-timedelta(minutes=15)).astimezone()
intervalstart=intervalstart.astimezone(utctz)
print(i['datetime'], 'Export', i['quantity'])
writer.writerow([
meterid_E,
'15min',
intervalstart.isoformat().replace("+00:00", "Z"),
'FALSE',
str(1000*(Decimal(i['quantity'])-Decimal(old_quantity)))
])
old_quantity = i['quantity']
n+=1
csv_file.close()
##output format
##MeterID;Resolution;IntervalStart;IsEstimated;EnergyWh
##91998_I;15min;2021-12-31T23:00:00Z;FALSE;82.76184291
Anyone that have the refresh token using OAuth2 running??
dear gjmeer
Thanks for the reply. I already had the wsse script working, but it seems a little unstable (authentication errors now and then). That is why i was searching for an oauth2 solution. Mine works fine with the inintal access token generated via the 'documented' google playground method, but i can't get token refreshed from a python script.
Thanks for your reply, i will struggle on :-)