Skip to content

Instantly share code, notes, and snippets.

@ale-rinaldi
Last active September 4, 2024 16:11
Show Gist options
  • Save ale-rinaldi/0ea0bc7eaf730e3448ef1829d5a57705 to your computer and use it in GitHub Desktop.
Save ale-rinaldi/0ea0bc7eaf730e3448ef1829d5a57705 to your computer and use it in GitHub Desktop.
Python script to parse CSV files with 15-minutes precision, downloaded from the e-distribuzione (Italian energy dispatcher) private area, and import them into an InfluxDB database. It considers Daylight Saving Time changes and calculates the time slot (F1, F2 or F3) for each point.
import datetime
import os
import dateutil
import pytz
from influxdb import InfluxDBClient
def is_festivo(date: datetime.datetime):
if date.month == 1 and date.day == 1:
return True
if date.month == 1 and date.day == 6:
return True
if date.month == 4 and date.day == 25:
return True
if date.month == 5 and date.day == 1:
return True
if date.month == 6 and date.day == 2:
return True
if date.month == 8 and date.day == 15:
return True
if date.month == 11 and date.day == 1:
return True
if date.month == 12 and date.day == 8:
return True
if date.month == 12 and date.day == 25:
return True
if date.month == 12 and date.day == 26:
return True
easter = dateutil.easter.easter(date.year)
day_after_easter = easter + datetime.timedelta(days=1)
if date.month == day_after_easter.month and date.day == day_after_easter.day:
return True
return False
def get_fascia_oraria(start_date: datetime.datetime):
if start_date.weekday() == 6 or is_festivo(start_date):
return 3
if start_date.weekday() == 5:
if start_date.hour < 7 or start_date.hour >= 23:
return 3
return 2
if start_date.hour < 7 or start_date.hour >= 23:
return 3
if start_date.hour < 8 or start_date.hour >= 19:
return 2
return 1
def get_dst_change(day: datetime):
start = datetime.datetime(day.year, day.month, day.day, 1)
end = datetime.datetime(day.year, day.month, day.day, 4)
tz = pytz.timezone("Europe/Rome")
start = tz.localize(start)
end = tz.localize(end, is_dst=False)
if start.dst() == end.dst():
return 0
return 1 if start.dst() < end.dst() else -1
tz = pytz.timezone("Europe/Rome")
def parse_date(day, time, already_occurred):
time_str = f"{day}T{time}:00"
parsed = datetime.datetime.fromisoformat(time_str)
dst_change = get_dst_change(parsed)
if dst_change < 0:
res = tz.localize(parsed, is_dst=not already_occurred)
if time == "03:00" and not already_occurred:
res = res - datetime.timedelta(hours=1)
return res, dst_change
return tz.localize(parsed), dst_change
def parse_file(filename):
with open(filename) as f:
first_line = f.readline()
splitted = first_line.split(";")
splitted = splitted[1:]
hour_intervals = {}
for i in range(len(splitted)):
hour_intervals[i] = splitted[i].strip().split("-")
occurred_days = set()
for line in f:
splitted = line.split(";")
day_parts = splitted[0].strip("\"").split("/")
day = f"{day_parts[2]}-{day_parts[1]}-{day_parts[0]}"
if day in occurred_days:
already_occurred = True
else:
already_occurred = False
occurred_days.add(day)
splitted = splitted[1:]
for i in range(len(splitted)):
val = splitted[i].strip()
if val == "":
continue
start_time, _ = parse_date(day, hour_intervals[i][0], already_occurred)
end_time, dst_change = parse_date(day, hour_intervals[i][1], already_occurred)
if dst_change > 0 and 8 <= i < 12:
continue
if dst_change < 0 and not already_occurred and i > 11:
continue
if dst_change < 0 and already_occurred and i < 8:
continue
if end_time < start_time:
end_time = tz.localize(end_time.replace(tzinfo=None) + datetime.timedelta(days=1))
yield (start_time, end_time, float(val.strip("\"").replace(",", ".")))
def import_folder(client, folder, start_time_measurement_name, end_time_measurement_name, database_name):
for file in sorted(os.listdir(folder)):
if not file.endswith(".csv"):
continue
for start_time, end_time, val in parse_file(os.path.join(folder, file)):
fascia = get_fascia_oraria(start_time)
print(start_time, end_time, val, f"F{fascia}")
json_body = [
{
"measurement": start_time_measurement_name,
"time": start_time,
"fields": {
"value": val,
"fascia": fascia
}
},
{
"measurement": end_time_measurement_name,
"time": end_time,
"fields": {
"value": val,
"fascia": fascia
}
}
]
client.write_points(json_body, database=database_name)
client = InfluxDBClient(host='localhost', port=8086)
client.create_database("casa")
import_folder(client, "Prelevata", "prelevata_start_time", "prelevata_end_time", "casa")
import_folder(client, "Prodotta", "prodotta_start_time", "prodotta_end_time", "casa")
import_folder(client, "Immessa", "immessa_start_time", "immessa_end_time", "casa")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment