Last active
September 4, 2024 16:11
-
-
Save ale-rinaldi/0ea0bc7eaf730e3448ef1829d5a57705 to your computer and use it in GitHub Desktop.
Python script to parse CSV files with 15-minutes precision, downloaded from the e-distribuzione (Italian energy dispatcher) private area, and import them into an InfluxDB database. It considers Daylight Saving Time changes and calculates the time slot (F1, F2 or F3) for each point.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import os | |
import dateutil | |
import pytz | |
from influxdb import InfluxDBClient | |
def is_festivo(date: datetime.datetime): | |
if date.month == 1 and date.day == 1: | |
return True | |
if date.month == 1 and date.day == 6: | |
return True | |
if date.month == 4 and date.day == 25: | |
return True | |
if date.month == 5 and date.day == 1: | |
return True | |
if date.month == 6 and date.day == 2: | |
return True | |
if date.month == 8 and date.day == 15: | |
return True | |
if date.month == 11 and date.day == 1: | |
return True | |
if date.month == 12 and date.day == 8: | |
return True | |
if date.month == 12 and date.day == 25: | |
return True | |
if date.month == 12 and date.day == 26: | |
return True | |
easter = dateutil.easter.easter(date.year) | |
day_after_easter = easter + datetime.timedelta(days=1) | |
if date.month == day_after_easter.month and date.day == day_after_easter.day: | |
return True | |
return False | |
def get_fascia_oraria(start_date: datetime.datetime): | |
if start_date.weekday() == 6 or is_festivo(start_date): | |
return 3 | |
if start_date.weekday() == 5: | |
if start_date.hour < 7 or start_date.hour >= 23: | |
return 3 | |
return 2 | |
if start_date.hour < 7 or start_date.hour >= 23: | |
return 3 | |
if start_date.hour < 8 or start_date.hour >= 19: | |
return 2 | |
return 1 | |
def get_dst_change(day: datetime): | |
start = datetime.datetime(day.year, day.month, day.day, 1) | |
end = datetime.datetime(day.year, day.month, day.day, 4) | |
tz = pytz.timezone("Europe/Rome") | |
start = tz.localize(start) | |
end = tz.localize(end, is_dst=False) | |
if start.dst() == end.dst(): | |
return 0 | |
return 1 if start.dst() < end.dst() else -1 | |
tz = pytz.timezone("Europe/Rome") | |
def parse_date(day, time, already_occurred): | |
time_str = f"{day}T{time}:00" | |
parsed = datetime.datetime.fromisoformat(time_str) | |
dst_change = get_dst_change(parsed) | |
if dst_change < 0: | |
res = tz.localize(parsed, is_dst=not already_occurred) | |
if time == "03:00" and not already_occurred: | |
res = res - datetime.timedelta(hours=1) | |
return res, dst_change | |
return tz.localize(parsed), dst_change | |
def parse_file(filename): | |
with open(filename) as f: | |
first_line = f.readline() | |
splitted = first_line.split(";") | |
splitted = splitted[1:] | |
hour_intervals = {} | |
for i in range(len(splitted)): | |
hour_intervals[i] = splitted[i].strip().split("-") | |
occurred_days = set() | |
for line in f: | |
splitted = line.split(";") | |
day_parts = splitted[0].strip("\"").split("/") | |
day = f"{day_parts[2]}-{day_parts[1]}-{day_parts[0]}" | |
if day in occurred_days: | |
already_occurred = True | |
else: | |
already_occurred = False | |
occurred_days.add(day) | |
splitted = splitted[1:] | |
for i in range(len(splitted)): | |
val = splitted[i].strip() | |
if val == "": | |
continue | |
start_time, _ = parse_date(day, hour_intervals[i][0], already_occurred) | |
end_time, dst_change = parse_date(day, hour_intervals[i][1], already_occurred) | |
if dst_change > 0 and 8 <= i < 12: | |
continue | |
if dst_change < 0 and not already_occurred and i > 11: | |
continue | |
if dst_change < 0 and already_occurred and i < 8: | |
continue | |
if end_time < start_time: | |
end_time = tz.localize(end_time.replace(tzinfo=None) + datetime.timedelta(days=1)) | |
yield (start_time, end_time, float(val.strip("\"").replace(",", "."))) | |
def import_folder(client, folder, start_time_measurement_name, end_time_measurement_name, database_name): | |
for file in sorted(os.listdir(folder)): | |
if not file.endswith(".csv"): | |
continue | |
for start_time, end_time, val in parse_file(os.path.join(folder, file)): | |
fascia = get_fascia_oraria(start_time) | |
print(start_time, end_time, val, f"F{fascia}") | |
json_body = [ | |
{ | |
"measurement": start_time_measurement_name, | |
"time": start_time, | |
"fields": { | |
"value": val, | |
"fascia": fascia | |
} | |
}, | |
{ | |
"measurement": end_time_measurement_name, | |
"time": end_time, | |
"fields": { | |
"value": val, | |
"fascia": fascia | |
} | |
} | |
] | |
client.write_points(json_body, database=database_name) | |
client = InfluxDBClient(host='localhost', port=8086) | |
client.create_database("casa") | |
import_folder(client, "Prelevata", "prelevata_start_time", "prelevata_end_time", "casa") | |
import_folder(client, "Prodotta", "prodotta_start_time", "prodotta_end_time", "casa") | |
import_folder(client, "Immessa", "immessa_start_time", "immessa_end_time", "casa") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment