Skip to content

Instantly share code, notes, and snippets.

@kleptog
Created June 12, 2018 20:03
Show Gist options
  • Save kleptog/572b529b84a1f0c40c3c69edaa18d670 to your computer and use it in GitHub Desktop.
Save kleptog/572b529b84a1f0c40c3c69edaa18d670 to your computer and use it in GitHub Desktop.
A simple example for how to export data from the Enelogic API (WSSE)
# From: https://enelogic.docs.apiary.io/#introduction/option-1.-wsse-authentication
# Go to the developer center: enelogic.com/nl/developers.
# Add your app and choose a desired redirect url (only for OAuth2 purposes, if using WSSE just fill in a URL e.g. enelogic.com)
# Fill in values below
import hashlib
import json
import os
import requests
import random
import string
from datetime import datetime, timedelta
appid = "..."
appsecret = "..."
username = "...@..."
apikey = "..."
def wsse(username, password):
nonce = ''.join(random.choice(string.letters + string.digits) for _ in range(16))
created = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
hash = hashlib.sha1(nonce + created + password).digest().encode('base64').strip()
return 'UsernameToken Username="%s", PasswordDigest="%s", Nonce="%s", Created="%s"' % (
username, hash, nonce.encode('base64').strip(), created)
def enelogic_wsse(appid, username, appsecret, apikey):
username = appid + "." + username
password = appsecret + "." + apikey
return wsse(username, password)
s = requests.Session()
s.headers.update({'Content-Type': 'application/json'})
# Get buildings
# Note: you have to regenerate the WSSE every request...
s.headers.update({'X-WSSE': enelogic_wsse(appid, username, appsecret, apikey)})
res = s.get("https://enelogic.com/api/buildings/")
res.raise_for_status()
buildings = {r['id']: r for r in res.json()}
#[
# {
# "id": 1505,
# "label": "Standaard",
# "address": "Nijverheidsweg",
# "addressNo": 25,
# "addressAdd": "A",
# "zipCode": "4529PP",
# "location": "Eede",
# "region": "Zeeland",
# "country": "NL",
# "timezone": "Europe/Amsterdam",
# "main": true
# }
#]
# Get measuringpoints
s.headers.update({'X-WSSE': enelogic_wsse(appid, username, appsecret, apikey)})
res = s.get("https://enelogic.com/api/measuringpoints/")
res.raise_for_status()
measurepoints = {r['id']: r for r in res.json()}
# {
# "id": 1651,
# "buildingId": 1505,
# "mdId": 1,
# "label": "Netbeheerder",
# "dayMin": "2013-03-14 00:00:00",
# "dayMax": "2014-02-27 23:45:00",
# "monthMin": "2013-02-15 00:00:00",
# "monthMax": "2014-02-27 00:00:00",
# "yearMin": "2013-02-01 00:00:00",
# "yearMax": "2014-02-27 00:00:00",
# "timezone": "Europe/Amsterdam"
# },
for measurepoint in measurepoints:
curr = datetime.strptime(measurepoints[measurepoint]['dayMin'][:10], "%Y-%m-%d")
end = datetime.strptime(measurepoints[measurepoint]['dayMax'][:10], "%Y-%m-%d")
while curr < end:
fname = "enelogic/" + str(measurepoint) + "." + curr.strftime("%Y-%m-%d") + '.json'
next = curr + timedelta(days=1)
if os.path.exists(fname):
curr = next
continue
print fname
s.headers.update({'X-WSSE': enelogic_wsse(appid, username, appsecret, apikey)})
res = s.get("https://enelogic.com/api/measuringpoints/%s/datapoints/%s/%s" % (measurepoint, curr.strftime("%Y-%m-%d"), next.strftime("%Y-%m-%d")))
res.raise_for_status()
with open(fname, "w") as f:
f.write(res.content)
curr = next
@jeroenf1973
Copy link

Hi gjmeer,
You have an oauth2 script working? Would you like to share the code (i can't get it working)?
regards
J

Dear J,

It is long time ago i tried this Pythonscript in Power BI. The OAuth2 did work, but I didn't recieve the data I want.
I followed the instruction on enelogic.docs.apiary.io to create an appid, appscret and apikey.

Do you have a working script beside the OAuth?

The Pythonscript is as follows:

# From: https://enelogic.docs.apiary.io/#introduction/option-2.-oauth2-authentication-(recommended)

# Go to the developer center: enelogic.com/nl/developers.
# Add your app and choose a desired redirect url (only for OAuth2 purposes, if using WSSE just fill in a URL e.g. enelogic.com)
# Fill in values below

import hashlib
import json
import os
import requests
import random
import string
import base64
from datetime import datetime, date, time, timedelta

appid = "Generated by Google Developers"
appsecret = "Generated by Google Developers"
username = "username from Enelogic"
apikey = "Generated by Google Developers"

def wsse(username, password):
    nonce = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(16))
    nonce = nonce.encode('utf-8')    

    created = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
    created = created.encode('utf-8')

    hash = base64.b64encode(hashlib.sha1(nonce+created+password).digest()).strip()
    
    return 'UsernameToken Username="%s", PasswordDigest="%s", Nonce="%s", Created="%s"' % (username, str(hash, 'utf8'), str(base64.b64encode(nonce).strip(),'utf8'), str(created,'utf8'))
    return(wsse)

def enelogic_wsse(appid, username, appsecret, apikey):
    username = appid + "." + username
    password = appsecret + "." + apikey
    password = password.encode('utf8')    

    return wsse(username, password)

s = requests.Session()
s.headers.update({'Content-Type': 'application/json'})    

# Get buildings

# Note: you have to regenerate the WSSE every request...
s.headers.update({'X-WSSE': enelogic_wsse(appid, username, appsecret, apikey)})    

res = s.get("https://enelogic.com/api/buildings/")
res.raise_for_status()

buildings = {r['id']: r for r in res.json()}
#[
#  {
#    "id": 1505,
#    "label": "Standaard",
#    "address": "Nijverheidsweg",
#    "addressNo": 25,
#    "addressAdd": "A",
#    "zipCode": "4529PP",
#    "location": "Eede",
#    "region": "Zeeland",
#    "country": "NL",
#    "timezone": "Europe/Amsterdam",
#    "main": true
#  }
#]

# Get measuringpoints
s.headers.update({'X-WSSE': enelogic_wsse(appid, username, appsecret, apikey)})    

res = s.get("https://enelogic.com/api/measuringpoints/")
res.raise_for_status()

measurepoints = {r['id']: r for r in res.json()}
#  {
#    "id": 1651,
#    "buildingId": 1505,
#    "mdId": 1,
#    "label": "Netbeheerder",
#    "dayMin": "2013-03-14 00:00:00",
#    "dayMax": "2014-02-27 23:45:00",
#    "monthMin": "2013-02-15 00:00:00",
#    "monthMax": "2014-02-27 00:00:00",
#    "yearMin": "2013-02-01 00:00:00",
#    "yearMax": "2014-02-27 00:00:00",
#    "timezone": "Europe/Amsterdam"
#  },
  
for measurepoint in measurepoints:
    curr = datetime.strptime(measurepoints[measurepoint]['dayMin'][:10], "%Y-%m-%d")
    end = datetime.strptime(measurepoints[measurepoint]['dayMax'][:10], "%Y-%m-%d")
    
    while curr < end:
        fname = os.path.join(dirname, "enelogic/" + str(measurepoint) + "." + curr.strftime("%Y-%m-%d") + '.json'        
        next = curr + timedelta(days=1)

        if os.path.exists(fname):
            curr = next
            continue
        
        print (fname)
        s.headers.update({'X-WSSE': enelogic_wsse(appid, username, appsecret, apikey)})    
        res = s.get("https://enelogic.com/api/measuringpoints/%s/datapoints/%s/%s" % (measurepoint, curr.strftime("%Y-%m-%d"), next.strftime("%Y-%m-%d")))
        res.raise_for_status()
        
        with open(fname, "w") as f:
            f.write(res.content)
        
        curr = next

dear gjmeer

Thanks for the reply. I already had the wsse script working, but it seems a little unstable (authentication errors now and then). That is why i was searching for an oauth2 solution. Mine works fine with the inintal access token generated via the 'documented' google playground method, but i can't get token refreshed from a python script.

Thanks for your reply, i will struggle on :-)

@HarryVerjans
Copy link

Hi gjmeer,
You have an oauth2 script working? Would you like to share the code (i can't get it working)?
regards
J

Dear J,
It is long time ago i tried this Pythonscript in Power BI. The OAuth2 did work, but I didn't recieve the data I want.
I followed the instruction on enelogic.docs.apiary.io to create an appid, appscret and apikey.
Do you have a working script beside the OAuth?
The Pythonscript is as follows:

# From: https://enelogic.docs.apiary.io/#introduction/option-2.-oauth2-authentication-(recommended)

# Go to the developer center: enelogic.com/nl/developers.
# Add your app and choose a desired redirect url (only for OAuth2 purposes, if using WSSE just fill in a URL e.g. enelogic.com)
# Fill in values below

import hashlib
import json
import os
import requests
import random
import string
import base64
from datetime import datetime, date, time, timedelta

appid = "Generated by Google Developers"
appsecret = "Generated by Google Developers"
username = "username from Enelogic"
apikey = "Generated by Google Developers"

def wsse(username, password):
    nonce = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(16))
    nonce = nonce.encode('utf-8')    

    created = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
    created = created.encode('utf-8')

    hash = base64.b64encode(hashlib.sha1(nonce+created+password).digest()).strip()
    
    return 'UsernameToken Username="%s", PasswordDigest="%s", Nonce="%s", Created="%s"' % (username, str(hash, 'utf8'), str(base64.b64encode(nonce).strip(),'utf8'), str(created,'utf8'))
    return(wsse)

def enelogic_wsse(appid, username, appsecret, apikey):
    username = appid + "." + username
    password = appsecret + "." + apikey
    password = password.encode('utf8')    

    return wsse(username, password)

s = requests.Session()
s.headers.update({'Content-Type': 'application/json'})    

# Get buildings

# Note: you have to regenerate the WSSE every request...
s.headers.update({'X-WSSE': enelogic_wsse(appid, username, appsecret, apikey)})    

res = s.get("https://enelogic.com/api/buildings/")
res.raise_for_status()

buildings = {r['id']: r for r in res.json()}
#[
#  {
#    "id": 1505,
#    "label": "Standaard",
#    "address": "Nijverheidsweg",
#    "addressNo": 25,
#    "addressAdd": "A",
#    "zipCode": "4529PP",
#    "location": "Eede",
#    "region": "Zeeland",
#    "country": "NL",
#    "timezone": "Europe/Amsterdam",
#    "main": true
#  }
#]

# Get measuringpoints
s.headers.update({'X-WSSE': enelogic_wsse(appid, username, appsecret, apikey)})    

res = s.get("https://enelogic.com/api/measuringpoints/")
res.raise_for_status()

measurepoints = {r['id']: r for r in res.json()}
#  {
#    "id": 1651,
#    "buildingId": 1505,
#    "mdId": 1,
#    "label": "Netbeheerder",
#    "dayMin": "2013-03-14 00:00:00",
#    "dayMax": "2014-02-27 23:45:00",
#    "monthMin": "2013-02-15 00:00:00",
#    "monthMax": "2014-02-27 00:00:00",
#    "yearMin": "2013-02-01 00:00:00",
#    "yearMax": "2014-02-27 00:00:00",
#    "timezone": "Europe/Amsterdam"
#  },
  
for measurepoint in measurepoints:
    curr = datetime.strptime(measurepoints[measurepoint]['dayMin'][:10], "%Y-%m-%d")
    end = datetime.strptime(measurepoints[measurepoint]['dayMax'][:10], "%Y-%m-%d")
    
    while curr < end:
        fname = os.path.join(dirname, "enelogic/" + str(measurepoint) + "." + curr.strftime("%Y-%m-%d") + '.json'        
        next = curr + timedelta(days=1)

        if os.path.exists(fname):
            curr = next
            continue
        
        print (fname)
        s.headers.update({'X-WSSE': enelogic_wsse(appid, username, appsecret, apikey)})    
        res = s.get("https://enelogic.com/api/measuringpoints/%s/datapoints/%s/%s" % (measurepoint, curr.strftime("%Y-%m-%d"), next.strftime("%Y-%m-%d")))
        res.raise_for_status()
        
        with open(fname, "w") as f:
            f.write(res.content)
        
        curr = next

dear gjmeer

Thanks for the reply. I already had the wsse script working, but it seems a little unstable (authentication errors now and then). That is why i was searching for an oauth2 solution. Mine works fine with the inintal access token generated via the 'documented' google playground method, but i can't get token refreshed from a python script.

Thanks for your reply, i will struggle on :-)

Dear Jeroen,
As ypu have a working version of a WSSE script could you help me. The OAUTH2 is too complex for me. I would like to just execute the python script on my windows laptop. But I get some errors. I generated the keys with https://enelogic.com/nl/developers/
appid = "10105_327wdyv10aiow4occc8c0ds08wskodc4scsckoo8dkks4cckkk8"
appsecret = "1rjn5swy4ux3406sw0kk048kc4wwcolwcgskwkws0so0wo084g"
username = "[email protected]"
apikey = "4mqjcj35oq04occcskkssc4oc00cog0"
And used the apikey from the enelogic account.
Is this the correct way to obtain the parameters.
The script is not working not on windows and not on raspberry linux.
Could you share your script and information, I know it is a little unstable.
Thanks for your help.

@jeroenf1973
Copy link

jeroenf1973 commented Aug 10, 2022

for me this works:
I have one file pulling in all sorts of energy related data (energie_data.py) which uses my enelogic.py

energie_data.py:

import requests
import enelogic

...
...
# enelogic (ruwe) data opvragen buiten de functie, anders failt de wsse authenticatie steeds...
wsse_header = enelogic.wsse_autorisatie_header()

with requests.Session() as s:
    s.headers.update({'Content-Type': 'application/json'})
    # WSSE header moet bij ieder request ververst worden:
    s.headers.update({'X-WSSE': wsse_header})
    enelogic_url = "https://enelogic.com/api/measuringpoints/3*****0/datapoint/days/{}/{}".format(datum['enelogic_start_date'], datum['enelogic_end_date'])
    res = s.get(enelogic_url)
    # Terug komt json, dus deze kunnen we gaan parsen
    enelogic_json = res.json()
    print(enelogic_json)
......

enelogic.py:

import hashlib
import random
import string
import base64
from datetime import datetime

def wsse_autorisatie_header():
    username = u"955**********w4s.jer*********.com".  #format = app_id.username
    password = "14j********k8c.isf******s8k" # format= app_secret.api_key
    password = password.encode('utf8')
    nonce = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(16))
    nonce = nonce.encode('utf-8')
    created = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
    created = created.encode('utf-8')
    hash = base64.b64encode(hashlib.sha1(nonce+created+password).digest()).strip()
    wsse = 'UsernameToken Username="%s", PasswordDigest="%s", Nonce="%s", Created="%s"' % (username, str(hash, 'utf8'), str(base64.b64encode(nonce).strip(),'utf8'), str(created,'utf8'))
    return(wsse)

def process_enelogic_data(json_data):
    #sorteren van de list op basis van de key uit de dicts die in de list zitten
    #eerst op datum dan op rate (id is niet betrouwbaar)
    json_data = sorted(json_data, key = lambda i:(i['date'],i['rate']))
    print("json_data:",json_data)
    #extraheren van data uit de dictionary:
    ......

@edsub
Copy link

edsub commented Oct 16, 2023

Based on the Enelogic API docs and some insights I got from the above, I have a running Python3 script that gets my metering data (based on the Enelogic measuringpoint_id) between 2 dates and saves the interval volumes (calculated between two consecutive readings from Enelogic) in a certain format in CSV.
The script does some basic checks but nothing special.
I use OAuth2.0, but it requires me to get a new accesstoken every 2 or 3 days. This is annoying so I'd like to learn how to refresh the accesstoken using the refreshtoken that is supplied via the Google OAuth 2.0 playground..... Any help appreciated.

#!python3
import requests
from urllib.parse import urlencode
import json
from datetime import datetime
from datetime import timedelta
from datetime import date
import operator
from decimal import Decimal
import ciso8601
import pytz
import csv

access_token = "from_google_oauth2.0_playground" ##UPDATE BEFORE USE OF SCRIPT
base_url = "https://enelogic.com/api"
meteringpoint = "measuringpoint_id_in enelogic" ##UPDATE BEFORE USE OF SCRIPT
meteringdaystart = (ciso8601.parse_datetime("2023-09-10")).astimezone()   ##UPDATE BEFORE USE OF SCRIPT
meteringdaystop =  (ciso8601.parse_datetime("2023-10-15")).astimezone()  ##UPDATE BEFORE USE OF SCRIPT
#meteringdaystop = (ciso8601.parse_datetime(date.today().strftime("%Y-%m-%d"))-timedelta(days=1)).astimezone()
meterid_I="i_9999999999"  ## hardcoded meterid used in csv file, import (levering) meter
meterid_E="e_9999999999" ##hardcoded meterid used in csv file, export (teruglevering) meter


################
#Error display #
################
def show_error():
    ft = sys.exc_info()[0]
    fv = sys.exc_info()[1]
    print("Error type : %s" % ft )
    print("Error value: %s" % fv )
    return

def daterange(start_date, end_date):
    for n in range(int((end_date.replace(tzinfo=None) - start_date.replace(tzinfo=None)).days)):
        yield start_date + timedelta(n)

def get_interval_data(meterpointid,day):
    s_day=day.strftime("%Y-%m-%d")
    s_nextday = (day+timedelta(days=1)).strftime("%Y-%m-%d")
    url=base_url+"/measuringpoints/"+meterpointid+"/datapoints/"+s_day+"/"+s_nextday+"?access_token="+access_token
    headers = {"Content-type": "application/json",
       "Accept": "*/*",
       "Accept-Encoding" : "gzip,deflate,br",
       "Connection" : "keep-alive"}
    meteringdata=None
    print(url)
    try:
        response = requests.get(url=url, headers=headers) # , data=json.dumps(body))
        if response.status_code != 200:
            print ("HTTP Error retrieving metering data. HTTP response: %s" % (response.status_code))
        else:
            print ("Metering data retrieved. HTTP response: %s" % (response.status_code))
            meteringdata=response.json()
    except:
        show_error()
        print ("Error retrieveing metering data")
    return meteringdata

#################################################################
print(meteringdaystart.isoformat())
print(meteringdaystop.isoformat())
#meteringday = datetime.strptime("2023-08-21", "%Y-%m-%d")
#meteringday = (ciso8601.parse_datetime("2023-02-21")).astimezone()
#print(meteringday.isoformat())

for meteringday in daterange(meteringdaystart, meteringdaystop):
  clean_interval_data = []
  for i in get_interval_data(meteringpoint,meteringday):
      if i not in clean_interval_data:
          clean_interval_data.append(i)
      else:
          print("Warning, duplicate record on %s. Filtered out." % meteringday)
  clean_interval_data.sort(key=operator.itemgetter('datetime','rate'))
  import_interval_data = [d for d in clean_interval_data if d['rate'] == 180]
  export_interval_data = [d for d in clean_interval_data if d['rate'] == 280]
  #For a full day Enelogic provides 97 values
  if  len(import_interval_data) != 97 or  len(export_interval_data) !=97:
     print("Warning, unexpected number of reads on %s (97 expected)" % meteringday)
     print("Import reads : %d"% len(import_interval_data))
     print("Export reads : %d"% len(export_interval_data))
#     exit(1)
  utctz=pytz.timezone("UTC")
  n=0
#Import
  csv_filename="I_"+meterid_I+"_"+meteringdaystart.isoformat()+"_"+meteringdaystop.isoformat()+".csv"
  try:
#If csv_file exists: open it
          csv_file=open(csv_filename, 'rt')
          csv_file.close()
          csv_file=open(csv_filename, 'at', newline='', encoding="utf-8")
          writer = csv.writer(csv_file, dialect='excel', delimiter=';', quoting=csv.QUOTE_NONE)
  except IOError:
#Otherwise: create it
          csv_file=open(csv_filename, 'wt', newline='', encoding="utf-8")
          writer = csv.writer(csv_file, dialect='excel', delimiter=';', quoting=csv.QUOTE_NONE)
          writer.writerow([
           'MeterID',
           'Resolution',
           'IntervalStart',
           'IsEstimated',
           'EnergyWh'
           ])
  for i in  import_interval_data:
      #print(i['datetime'], 'Import', i['quantity'])
       if n==0:
          old_quantity = i['quantity']
       else: #ciso8601.parse_datetime(i['datetime'])
          intervalstart=(ciso8601.parse_datetime(i['datetime'])-timedelta(minutes=15)).astimezone()
          intervalstart=intervalstart.astimezone(utctz)
          print(i['datetime'], 'Import', i['quantity'])
          writer.writerow([
            meterid_I,
            '15min',
            intervalstart.isoformat().replace("+00:00", "Z"),
            'FALSE',
            str(1000*(Decimal(i['quantity'])-Decimal(old_quantity)))
            ])
          old_quantity = i['quantity']
       n+=1
  csv_file.close()
  n=0
#Export
  csv_filename="E_"+meterid_E+"_"+meteringdaystart.isoformat()+"_"+meteringdaystop.isoformat()+".csv"
  try:
#If csv_file exists: open it
          csv_file=open(csv_filename, 'rt')
          csv_file.close()
          csv_file=open(csv_filename, 'at', newline='', encoding="utf-8")
          writer = csv.writer(csv_file, dialect='excel', delimiter=';', quoting=csv.QUOTE_NONE)
  except IOError:
#Otherwise: create it
          csv_file=open(csv_filename, 'wt', newline='', encoding="utf-8")
          writer = csv.writer(csv_file, dialect='excel', delimiter=';', quoting=csv.QUOTE_NONE)
          writer.writerow([
           'MeterID',
           'Resolution',
           'IntervalStart',
           'IsEstimated',
           'EnergyWh'
           ])
  for i in  export_interval_data:
       #print(i['datetime'], 'Export', i['quantity'])
       if n==0:
          old_quantity = i['quantity']
       else:
          intervalstart=(ciso8601.parse_datetime(i['datetime'])-timedelta(minutes=15)).astimezone()
          intervalstart=intervalstart.astimezone(utctz)
          print(i['datetime'], 'Export', i['quantity'])
          writer.writerow([
            meterid_E,
            '15min',
            intervalstart.isoformat().replace("+00:00", "Z"),
            'FALSE',
            str(1000*(Decimal(i['quantity'])-Decimal(old_quantity)))
            ])
          old_quantity = i['quantity']
       n+=1
  csv_file.close()
 
##output format
##MeterID;Resolution;IntervalStart;IsEstimated;EnergyWh
##91998_I;15min;2021-12-31T23:00:00Z;FALSE;82.76184291

@edsub
Copy link

edsub commented Jan 25, 2024

Anyone that have the refresh token using OAuth2 running??

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment