Last active
October 26, 2024 18:56
-
-
Save testillano/05c59e9af7e434790bd53cab6487f7b9 to your computer and use it in GitHub Desktop.
Download and plot Spanish Regulated Market daily reports
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
%config InlineBackend.figure_format = 'svg' # para mejorar la calidad visual de las gráficas... | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
from urllib.error import HTTPError | |
import time | |
from datetime import datetime | |
#from IPython.display import display | |
# Calculo de area bajo curva de coste unitario (finalmente multiplicaremos por los KWh, por ejemplo 10A => 2300W => 4,6KWh en 2 horas | |
import numpy as np | |
# Método del trapecio: | |
from scipy.interpolate import interp1d | |
from scipy.integrate import trapz | |
# Método con splin cúbico: | |
from scipy.interpolate import CubicSpline | |
from scipy.integrate import quad | |
from scipy.interpolate import InterpolatedUnivariateSpline | |
DIA = datetime.fromtimestamp(datetime.timestamp(datetime.today())) | |
DIA_STR = DIA.strftime('%Y%m%d') | |
DIA_PRETTY_STR = DIA.strftime("%A, %B %d, %Y") | |
# Printar opcionalmente la curva para mañana si está disponible: | |
HORA_PUBLICACION = datetime.strptime("13:20", "%H:%M").time() | |
DIA2 = None | |
DIA2_STR = "" | |
HORA_ACTUAL = datetime.now().time() | |
if HORA_ACTUAL > HORA_PUBLICACION: | |
DIA2 = datetime.fromtimestamp(datetime.timestamp(datetime.today()) + 86400) | |
DIA2_STR = DIA2.strftime('%Y%m%d') | |
# Función que consigue la fuente de datos de la previsión desde Internet... | |
def consigue_fuente_datos(): | |
ret = None | |
try: | |
ret = pd.read_csv(URL_FUENTE_DATOS, sep=';', header=None, names=['year', 'month', 'day', 'hour', 'price1', 'price2', 'x']) | |
except HTTPError: | |
print(f'Error accediendo a fuente de datos: {URL_FUENTE_DATOS}\nNo se puede continuar.\n') | |
return ret | |
# Función que consigue los datos filtrados de la previsión... | |
def consigue_datos_filtrados(fuente_datos): | |
ret = None | |
if (isinstance(fuente_datos, pd.DataFrame)): | |
ret = fuente_datos.copy() | |
# me quedo sólo con las filas de 1 a 24, las columnas de hora y precio1... | |
ret = ret.drop([0, 25]).drop(columns=['year', 'month', 'day', 'price2', 'x']) | |
else: | |
print('Error: la fuente de datos no es un DataFrame.\n') | |
return ret | |
# Función que pinta la gráfica de los datos de la previsión... | |
def pinta_grafica_datos(datos, prices, datos2, prices2, pinta2): | |
if (isinstance(datos, pd.DataFrame)): | |
plt.figure(figsize=(10, 6)) | |
plt.plot(datos["hour"], prices, '.-', label=f'today ({DIA_PRETTY_STR})') | |
#plt.plot(hdatos["hour"], prices, '.-', label='today (cubic splin)', t, ius(t), 'r') | |
#plt.plot(t, integral, 'g') | |
if pinta2: plt.plot(datos2["hour"], prices2, '.-', label='tomorrow') | |
plt.title(f'Daily evolution of the price per kWh in the regulated market') | |
plt.xlabel("hour") | |
plt.ylabel("Euros/kWh") | |
plt.xticks(datos["hour"]) | |
plt.grid(True) | |
plt.tight_layout() | |
plt.legend() | |
plt.show() | |
else: | |
print('Error: los datos no son un DataFrame.\n') | |
def dame_hora_de_inicio_de_carga_optima(horas, precios, duracion_carga, h_inicial = 1, h_final = 24, resolucion=1000): | |
ius = InterpolatedUnivariateSpline(horas, precios) | |
t = np.linspace(h_inicial, h_final, resolucion+1) | |
integral = [ius.integral(h_inicial, ti) for ti in t] | |
dt = t[1]-t[0] | |
I = [] | |
for i in range(resolucion+1): | |
if t[i]+duracion_carga < t[-1]: | |
I.append(integral[i+round(duracion_carga/dt)] - integral[i]) | |
return [t[np.argmin(I)], np.min(I)] | |
def read_input(literal, default): | |
ret = input(literal) | |
if ret.strip() == "": | |
return default | |
return ret | |
def convertir_a_horas_minutos(decimal): | |
# Separar la parte entera (horas) y la parte decimal (minutos) | |
horas = int(decimal) | |
minutos = (decimal - horas) * 60 | |
# Redondear los minutos para evitar decimales | |
minutos = round(minutos) | |
return f"{horas:02d}:{minutos:02d}" | |
def convertir_hora_a_decimal(hora_str): | |
if ":" in hora_str: | |
hora, minuto = map(int, hora_str.split(":")) | |
else: | |
hora = int(hora_str) | |
minuto = 0 # Asignamos el minuto a 0 si no se proporciona | |
# Convertir los minutos a una fracción de hora | |
fraccion_minutos = minuto / 60 | |
# Sumar las horas y la fracción de minutos | |
hora_decimal = hora + fraccion_minutos | |
return hora_decimal | |
# Se procede a descargar y filtrar la fuente de datos de la previsión... | |
URL_FUENTE_DATOS = f'https://www.omie.es/es/file-download?parents%5B0%5D=marginalpdbc&filename=marginalpdbc_{DIA_STR}.1' | |
datos = consigue_datos_filtrados(consigue_fuente_datos()) | |
# (*) convierto €/MWh a €/KWh: | |
prices = [] | |
for p in datos["price1"]: | |
prices.append(float(p)/1000) | |
datos2 = None | |
prices2 = [] | |
pinta2 = False | |
if DIA2: | |
URL_FUENTE_DATOS = f'https://www.omie.es/es/file-download?parents%5B0%5D=marginalpdbc&filename=marginalpdbc_{DIA2_STR}.1' | |
try: | |
datos2 = consigue_datos_filtrados(consigue_fuente_datos()) | |
pinta2 = True | |
# (*) convierto €/MWh a €/KWh: | |
for p in datos2["price1"]: | |
prices2.append(float(p)/1000) | |
except: | |
print("WARNING: tomorrow report failed to download ! Check https://www.omie.es/es/file-access-list?parents%5B0%5D=/&parents%5B1%5D=Mercado%20Diario&parents%5B2%5D=1.%20Precios&dir=Precios%20horarios%20del%20mercado%20diario%20en%20Espa%C3%B1a&realdir=marginalpdbc") | |
datos2 = None | |
# Se pinta la gráfica de los datos del activo... | |
pinta_grafica_datos(datos, prices, datos2, prices2, pinta2) | |
TOTAL_BATTERY_LOAD_WH = 13800 # Toyota CH-R | |
print("\nCost in time interval (for today)\n") | |
duracion = input("Available time duration for load (i.e. 3, 3:00, 2:30, etc.): ") | |
duracion = convertir_hora_a_decimal(duracion) | |
pct = int(read_input("Input current battery percentage (-1: skip) [60]: ", "60")) # usualmente consumo 40% al día y me da igual en que punto | |
# esté pero quiero que cargue un 40% (20->60% o 60->100%) | |
current_hour = datetime.now().hour | |
optimal_init, optimal_load = dame_hora_de_inicio_de_carga_optima(np.array(datos["hour"]), np.array(prices), duracion, current_hour) | |
optimal_init_str = convertir_a_horas_minutos(optimal_init) | |
# Amperes' hints: | |
A_forToyotaTotalLoad = (TOTAL_BATTERY_LOAD_WH / 230) / duracion | |
Amperios_dflt = 15 | |
pending_pct = 100-pct | |
CURRENT_BATTERY_LOAD_WH = TOTAL_BATTERY_LOAD_WH * pct/100 | |
PENDING_BATTERY_LOAD_WH = TOTAL_BATTERY_LOAD_WH * pending_pct/100 | |
A_forToyotaPendingLoad = (PENDING_BATTERY_LOAD_WH / 230) / duracion | |
if pct != -1: Amperios_dflt = A_forToyotaPendingLoad | |
print("\nAmperes' hints for selected duration:") | |
print(f' {A_forToyotaTotalLoad:.3f} A would load {TOTAL_BATTERY_LOAD_WH} Wh') | |
if pct != -1: print(f' {A_forToyotaPendingLoad:.3f}A would load {PENDING_BATTERY_LOAD_WH} Wh (pending load of {pending_pct}%)') | |
Amperios = float(read_input(f'\nInput load amperes [{Amperios_dflt:.3f}]: ', str(Amperios_dflt))) | |
potencia = (Amperios*230)/1000 # I * 230V / 1000 = kilovatios de potencia | |
carga_KWh = potencia * duracion # KWh | |
# Mostrar datos de optimización: | |
print(f'\nFor the given duration ({duracion:.3f} hours), the optimal hour to minimize the cost is {optimal_init_str}, and the optimal load area is: {optimal_load:.2f}') | |
print(f'That is calculated between current hour ({current_hour}) and the end of the day (24)') | |
# Cálculo de costes | |
t1 = read_input(f'\nInput t1 [{optimal_init_str}]: ', optimal_init_str) | |
t1 = convertir_hora_a_decimal(t1) | |
t2 = optimal_init + duracion | |
t2_str = convertir_a_horas_minutos(t2) | |
print(f'Estimated final time will be t2 = {t2_str}') | |
# Mostrar resultados | |
print(f'\nPower: {potencia:.3f} KW') | |
if pct != -1: | |
print(f"Initial battery load: {pct}% ({CURRENT_BATTERY_LOAD_WH} Wh)") | |
pct_final = pct + 100 * 1000 * carga_KWh / TOTAL_BATTERY_LOAD_WH | |
finished_before_str = "" | |
if pct_final > 100: | |
pct_final = 100 # ocurre cuando pongo más amperaje que el sugerido | |
duracion_real = (PENDING_BATTERY_LOAD_WH/1000)/potencia | |
real_t2 = t1 + duracion_real | |
real_t2_str = convertir_a_horas_minutos(real_t2) | |
finished_before_str = f'; indeed, the load finished before t2, at {real_t2_str}' | |
t2 = real_t2 # calculo real del coste3 | |
duracion = duracion_real | |
carga_KWh = potencia * duracion # KWh | |
FINAL_BATTERY_LOAD_WH = TOTAL_BATTERY_LOAD_WH * pct_final/100 | |
print(f"Final battery load: {pct_final:.3f}% ({FINAL_BATTERY_LOAD_WH} Wh{finished_before_str})") | |
print(f'Effective duration: {duracion:.3f} h') | |
print(f'Load incremented: {potencia:.3f} x {duracion:.3f} = {carga_KWh:.3f} KWh') | |
# METODO DEL TRAPECIO ################# | |
# Interpolar los datos (today) para obtener valores intermedios3 | |
interp_func = interp1d(datos["hour"], prices, kind='linear', fill_value="extrapolate") | |
# Crear una malla fina entre t1 y t2 para integrar | |
t_vals = np.linspace(t1, t2, 1000) # 1000 puntos entre t1 y t2 # KWh | |
y_vals = interp_func(t_vals) # Valores interpolados en esos puntos | |
# Calcular el área bajo la curva usando el método del trapecio | |
area = trapz(y_vals, t_vals) | |
# METODO SPLIN CUBICO ################# | |
cs = CubicSpline(datos["hour"], prices) | |
area2, _ = quad(cs, t1, t2) | |
t1_str = convertir_a_horas_minutos(t1) | |
t2_str = convertir_a_horas_minutos(t2) | |
print(f"\nArea under today's function within time range [{t1_str}, {t2_str}]:") | |
print(f' Trapezoidal method: {area:.3f} €/kW') | |
print(f' Cubic splin method: {area2:.3f} €/kW <--- use this (more accurate)') | |
coste = area * potencia | |
print(f"\nCost deduced: {area2:.3f} €/KW x {potencia:.3f} KW = {coste:.3f} €") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import argparse | |
from datetime import datetime, timedelta | |
import time | |
import os, sys | |
# Download automation: | |
import pandas as pd | |
from urllib.error import HTTPError | |
# Plot data: | |
import matplotlib.pyplot as plt | |
# OMIE daily files: # https://www.omie.es/es/file-access-list?parents%5B0%5D=/&parents%5B1%5D=Mercado%20Diario&parents%5B2%5D=1.%20Precios&dir=Precios%20horarios%20del%20mercado%20diario%20en%20Espa%C3%B1a&realdir=marginalpdbc | |
# Browse URL, i.e. https://www.omie.es/es/file-download?parents%5B0%5D=marginalpdbc&filename=marginalpdbc_20240705.1 | |
COLUMNS = {'hour': 'hour', 'price1': '€/kWh'} # data comes in €/MWh (*) | |
KCOLUMNS = list(COLUMNS) # keys | |
VCOLUMNS = list(COLUMNS.values()) # values | |
PUBLISH_TIME_STR = "13:20" | |
PUBLISH_TIME = datetime.strptime(PUBLISH_TIME_STR, "%H:%M").time() | |
URL_SOURCE = None | |
DAY_PRETTY_STR = None | |
def get_data_source(): | |
ret = None | |
global URL_SOURCE | |
try: | |
ret = pd.read_csv(URL_SOURCE, sep=';', header=None, names=['year', 'month', 'day', 'hour', 'price1', 'price2', 'x']) | |
except HTTPError: | |
print(f'ERROR: failed to access online data from OMIE ({URL_SOURCE}).\n') | |
sys.exit(1) | |
return ret | |
def filter_data(data): | |
ret = None | |
if (isinstance(data, pd.DataFrame)): | |
ret = data.copy() | |
# Remove first and last row, and keep only hour/price1 columns: | |
ret = ret.drop([ret.index[0], ret.index[-1]]).drop(columns=['year', 'month', 'day', 'price2', 'x']) | |
else: | |
print('ERROR: data source is not valid DataFrame.\n') | |
sys.exit(1) | |
return ret | |
def plot(data, quiet, file_path): | |
global DAY_PRETTY_STR | |
if (isinstance(data, pd.DataFrame)): | |
# (*) convert €/MWh to €/KWh: | |
prices_KWh = [] | |
prices_MWh = data[KCOLUMNS[1]] | |
for p in prices_MWh: | |
prices_KWh.append(float(p)/1000) | |
plt.figure(figsize=(10, 6)) | |
plt.plot(data[KCOLUMNS[0]], prices_KWh, '.-') | |
plt.title(f'Daily evolution of the price (€) per kWh in the regulated market ({DAY_PRETTY_STR})') | |
plt.xlabel(VCOLUMNS[0]) | |
plt.ylabel(VCOLUMNS[1]) | |
plt.xticks(data[KCOLUMNS[0]]) | |
plt.grid(True) | |
plt.tight_layout() | |
# Save png file | |
plt.savefig(file_path) | |
if not quiet: plt.show() | |
else: | |
print('ERROR: data source is not valid DataFrame.\n') | |
def get_reports(downloads_path, quiet, idate, fdate): | |
global URL_SOURCE | |
global DAY_PRETTY_STR | |
auxDate = idate | |
dates = [] | |
while (auxDate <= fdate): | |
auxDateStr = auxDate.strftime("%Y%m%d") | |
file_path = downloads_path + '/marginalpdbc_' + auxDateStr + '.1.png' | |
if os.path.exists(file_path): | |
print(f'INFO: skipping ' + file_path + ' ... (already cached). Use firefox or any other viewer to see.') | |
else: | |
print(f'INFO: generating ' + file_path + ' ...') | |
URL_SOURCE = f'https://www.omie.es/es/file-download?parents%5B0%5D=marginalpdbc&filename=marginalpdbc_{auxDateStr}.1' | |
DAY_PRETTY_STR = auxDate.strftime("%A, %B %d, %Y") | |
# Download prevision and prepare data: | |
myData = filter_data(get_data_source()) | |
# Create plot from data retrieved: | |
plot(myData, quiet, file_path) | |
symlink_path = downloads_path + '/last.png' | |
if os.path.islink(symlink_path): os.unlink(symlink_path) | |
os.symlink(file_path, symlink_path) | |
dates.append(auxDate) | |
auxDate += timedelta(days=1) | |
def main(quiet, idate, fdate): | |
# Downloads path | |
script_dir = os.path.dirname(os.path.abspath(__file__)) | |
downloads_path = script_dir + '/pngs' | |
try: os.makedirs(downloads_path) # just in case | |
except: pass | |
get_reports(downloads_path, quiet, idate, fdate) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="Download/plot OMIE daily reports. Files cached on './pngs'.") | |
parser.add_argument('--date', help="downloaded day in format <YYYYMMDD>. If missing, current one is assumed (and after " + PUBLISH_TIME_STR + ", also tomorrow report is retrieved).") | |
parser.add_argument('--days', '-d', help="Download last N days. After " + PUBLISH_TIME_STR + ", also tomorrow report is retrieved. Value '-1' means tomorrow (use after publish time).") | |
parser.add_argument('--quiet', '-q', help="Don't show plot on pop window", action='store_true') # on/off flag | |
args = parser.parse_args() | |
date_now = datetime.now() | |
idate = date_now | |
fdate = date_now | |
if args.days: | |
daysOffset = int(args.days) | |
idate += timedelta(days=-daysOffset) | |
if daysOffset == -1: fdate = idate # tomorrow special case | |
else: | |
if args.date: | |
idate = datetime.strptime(args.date, '%Y%m%d') | |
fdate = idate | |
# Extra day condition: | |
if date_now.time() > PUBLISH_TIME: | |
if fdate == date_now: | |
print(f'It is after {PUBLISH_TIME_STR} (publish time), so tomorrow report shall be also retrieved !') | |
fdate += timedelta(days=1) | |
main(args.quiet, idate, fdate) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
# Capacidad de bateria en Wh | |
capacidad=13800 | |
echo | |
echo "Introduce el amperaje de carga (A) [15]:" | |
read amp | |
[ -z "${amp}" ] && amp=15 | |
potencia=$((amp*230)) | |
echo "Potencia de carga = ${amp}A * 230V = ${potencia}W" | |
echo "Capacidad de bateria = ${capacidad}Wh" | |
t100=$(echo "scale=3; ${capacidad}/${potencia}" | bc) | |
echo "Tiempo de carga completa (100%): ${t100} horas" | |
echo | |
echo "Introduce el porcentaje de bateria en el inicio de la carga:" | |
read pct | |
while [ -z "${pct}" ]; do read pct ; done | |
trestante=$(echo "scale=3; ${t100}*(100-${pct})/100" | bc) | |
echo "Tiempo de carga necesaria ($((100-pct))%): ${trestante} horas" | |
echo | |
echo "Introduce el tiempo inicio de la carga (HH:MM):" | |
read inicio | |
while [ -z "${pct}" ]; do read pct ; done | |
trestante=$(echo "scale=3; ${t100}*(100-${pct})/100" | bc) | |
trestante_secs=$(echo "3600*${trestante}" | bc) | |
prestante=$((100-pct)) | |
echo "Tiempo de carga necesaria (${prestante}%): ${trestante} horas" | |
echo | |
echo "Calculo de y(x)=mx + n, con y=%carga y x=timestamp unix" | |
x0=$(date -d "${inicio}" +"%s") | |
echo "Punto inicial (x0,y0): (${x0},${pct})" | |
x1=$(date -d "${inicio} + ${trestante_secs} seconds" +"%s") | |
echo "Punto final (x1,y1): (${x1},100)" | |
# Obtener el offset de UTC en segundos (horas * 3600 + minutos * 60) | |
offset_utc=$(date +%z) # Ejemplo: +0200 para UTC+2 | |
offset_sign=${offset_utc:0:1} # Obtener el signo (+ o -) | |
offset_hours=${offset_utc:1:2} # Obtener las horas | |
offset_minutes=${offset_utc:3:2} # Obtener los minutos | |
offset_seconds=$(echo "$offset_hours*3600 + $offset_minutes*60" | bc) # Convertir a segundos | |
x0=$((x0${offset_sign}${offset_seconds})) | |
x1=$((x1${offset_sign}${offset_seconds})) | |
cat << EOF > .predict.gp | |
# Configurar la salida como un archivo PNG | |
set terminal pngcairo size 800,600 enhanced font 'Arial,12' | |
set output png_path | |
# Configuración del formato de tiempo y datos | |
set xdata time | |
set timefmt "%s" # Formato de tiempo como timestamp Unix | |
set format x "%H:%M" # Mostrar el eje X en formato HH:MM | |
set xlabel "Tiempo (HH:MM)" | |
set ylabel "Porcentaje (%)" | |
# Configurar el rango del eje Y (porcentaje de 0 a 100) | |
set yrange [0:100] | |
# Activar el grid y personalizarlo | |
set grid xtics ytics # Activa el grid en ambos ejes (X e Y) | |
set ytics 5 # Coloca tics (divisiones) en el eje Y cada 5 unidades | |
# Configurar el rango del eje X basado en los timestamps Unix | |
set xrange [x0:x1] | |
# Definir la ecuación de la recta con los valores de m y n | |
f(x) = m * x + n | |
# Graficar la recta | |
plot f(x) title "Recta: y(x) = m*x + n" | |
# Cerrar la salida del archivo PNG | |
unset output | |
EOF | |
# Calcular m y n | |
m=$(echo "scale=8; ${prestante} / ($x1 - $x0)" | bc) | |
n=$(echo "${pct} - ${m} * $x0" | bc) | |
# Pasar los valores a gnuplot | |
gnuplot -e "x0=$x0; x1=$x1; m=$m; n=$n; png_path='./predict.png'" .predict.gp | |
firefox ./predict.png |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment