Skip to content

Instantly share code, notes, and snippets.

@testillano
Last active October 26, 2024 18:56
Show Gist options
  • Save testillano/05c59e9af7e434790bd53cab6487f7b9 to your computer and use it in GitHub Desktop.
Save testillano/05c59e9af7e434790bd53cab6487f7b9 to your computer and use it in GitHub Desktop.
Download and plot Spanish Regulated Market daily reports
%config InlineBackend.figure_format = 'svg' # para mejorar la calidad visual de las gráficas...
import pandas as pd
import matplotlib.pyplot as plt
from urllib.error import HTTPError
import time
from datetime import datetime
#from IPython.display import display
# Calculo de area bajo curva de coste unitario (finalmente multiplicaremos por los KWh, por ejemplo 10A => 2300W => 4,6KWh en 2 horas
import numpy as np
# Método del trapecio:
from scipy.interpolate import interp1d
from scipy.integrate import trapz
# Método con splin cúbico:
from scipy.interpolate import CubicSpline
from scipy.integrate import quad
from scipy.interpolate import InterpolatedUnivariateSpline
DIA = datetime.fromtimestamp(datetime.timestamp(datetime.today()))
DIA_STR = DIA.strftime('%Y%m%d')
DIA_PRETTY_STR = DIA.strftime("%A, %B %d, %Y")
# Printar opcionalmente la curva para mañana si está disponible:
HORA_PUBLICACION = datetime.strptime("13:20", "%H:%M").time()
DIA2 = None
DIA2_STR = ""
HORA_ACTUAL = datetime.now().time()
if HORA_ACTUAL > HORA_PUBLICACION:
DIA2 = datetime.fromtimestamp(datetime.timestamp(datetime.today()) + 86400)
DIA2_STR = DIA2.strftime('%Y%m%d')
# Función que consigue la fuente de datos de la previsión desde Internet...
def consigue_fuente_datos():
ret = None
try:
ret = pd.read_csv(URL_FUENTE_DATOS, sep=';', header=None, names=['year', 'month', 'day', 'hour', 'price1', 'price2', 'x'])
except HTTPError:
print(f'Error accediendo a fuente de datos: {URL_FUENTE_DATOS}\nNo se puede continuar.\n')
return ret
# Función que consigue los datos filtrados de la previsión...
def consigue_datos_filtrados(fuente_datos):
ret = None
if (isinstance(fuente_datos, pd.DataFrame)):
ret = fuente_datos.copy()
# me quedo sólo con las filas de 1 a 24, las columnas de hora y precio1...
ret = ret.drop([0, 25]).drop(columns=['year', 'month', 'day', 'price2', 'x'])
else:
print('Error: la fuente de datos no es un DataFrame.\n')
return ret
# Función que pinta la gráfica de los datos de la previsión...
def pinta_grafica_datos(datos, prices, datos2, prices2, pinta2):
if (isinstance(datos, pd.DataFrame)):
plt.figure(figsize=(10, 6))
plt.plot(datos["hour"], prices, '.-', label=f'today ({DIA_PRETTY_STR})')
#plt.plot(hdatos["hour"], prices, '.-', label='today (cubic splin)', t, ius(t), 'r')
#plt.plot(t, integral, 'g')
if pinta2: plt.plot(datos2["hour"], prices2, '.-', label='tomorrow')
plt.title(f'Daily evolution of the price per kWh in the regulated market')
plt.xlabel("hour")
plt.ylabel("Euros/kWh")
plt.xticks(datos["hour"])
plt.grid(True)
plt.tight_layout()
plt.legend()
plt.show()
else:
print('Error: los datos no son un DataFrame.\n')
def dame_hora_de_inicio_de_carga_optima(horas, precios, duracion_carga, h_inicial = 1, h_final = 24, resolucion=1000):
ius = InterpolatedUnivariateSpline(horas, precios)
t = np.linspace(h_inicial, h_final, resolucion+1)
integral = [ius.integral(h_inicial, ti) for ti in t]
dt = t[1]-t[0]
I = []
for i in range(resolucion+1):
if t[i]+duracion_carga < t[-1]:
I.append(integral[i+round(duracion_carga/dt)] - integral[i])
return [t[np.argmin(I)], np.min(I)]
def read_input(literal, default):
ret = input(literal)
if ret.strip() == "":
return default
return ret
def convertir_a_horas_minutos(decimal):
# Separar la parte entera (horas) y la parte decimal (minutos)
horas = int(decimal)
minutos = (decimal - horas) * 60
# Redondear los minutos para evitar decimales
minutos = round(minutos)
return f"{horas:02d}:{minutos:02d}"
def convertir_hora_a_decimal(hora_str):
if ":" in hora_str:
hora, minuto = map(int, hora_str.split(":"))
else:
hora = int(hora_str)
minuto = 0 # Asignamos el minuto a 0 si no se proporciona
# Convertir los minutos a una fracción de hora
fraccion_minutos = minuto / 60
# Sumar las horas y la fracción de minutos
hora_decimal = hora + fraccion_minutos
return hora_decimal
# Se procede a descargar y filtrar la fuente de datos de la previsión...
URL_FUENTE_DATOS = f'https://www.omie.es/es/file-download?parents%5B0%5D=marginalpdbc&filename=marginalpdbc_{DIA_STR}.1'
datos = consigue_datos_filtrados(consigue_fuente_datos())
# (*) convierto €/MWh a €/KWh:
prices = []
for p in datos["price1"]:
prices.append(float(p)/1000)
datos2 = None
prices2 = []
pinta2 = False
if DIA2:
URL_FUENTE_DATOS = f'https://www.omie.es/es/file-download?parents%5B0%5D=marginalpdbc&filename=marginalpdbc_{DIA2_STR}.1'
try:
datos2 = consigue_datos_filtrados(consigue_fuente_datos())
pinta2 = True
# (*) convierto €/MWh a €/KWh:
for p in datos2["price1"]:
prices2.append(float(p)/1000)
except:
print("WARNING: tomorrow report failed to download ! Check https://www.omie.es/es/file-access-list?parents%5B0%5D=/&parents%5B1%5D=Mercado%20Diario&parents%5B2%5D=1.%20Precios&dir=Precios%20horarios%20del%20mercado%20diario%20en%20Espa%C3%B1a&realdir=marginalpdbc")
datos2 = None
# Se pinta la gráfica de los datos del activo...
pinta_grafica_datos(datos, prices, datos2, prices2, pinta2)
TOTAL_BATTERY_LOAD_WH = 13800 # Toyota CH-R
print("\nCost in time interval (for today)\n")
duracion = input("Available time duration for load (i.e. 3, 3:00, 2:30, etc.): ")
duracion = convertir_hora_a_decimal(duracion)
pct = int(read_input("Input current battery percentage (-1: skip) [60]: ", "60")) # usualmente consumo 40% al día y me da igual en que punto
# esté pero quiero que cargue un 40% (20->60% o 60->100%)
current_hour = datetime.now().hour
optimal_init, optimal_load = dame_hora_de_inicio_de_carga_optima(np.array(datos["hour"]), np.array(prices), duracion, current_hour)
optimal_init_str = convertir_a_horas_minutos(optimal_init)
# Amperes' hints:
A_forToyotaTotalLoad = (TOTAL_BATTERY_LOAD_WH / 230) / duracion
Amperios_dflt = 15
pending_pct = 100-pct
CURRENT_BATTERY_LOAD_WH = TOTAL_BATTERY_LOAD_WH * pct/100
PENDING_BATTERY_LOAD_WH = TOTAL_BATTERY_LOAD_WH * pending_pct/100
A_forToyotaPendingLoad = (PENDING_BATTERY_LOAD_WH / 230) / duracion
if pct != -1: Amperios_dflt = A_forToyotaPendingLoad
print("\nAmperes' hints for selected duration:")
print(f' {A_forToyotaTotalLoad:.3f} A would load {TOTAL_BATTERY_LOAD_WH} Wh')
if pct != -1: print(f' {A_forToyotaPendingLoad:.3f}A would load {PENDING_BATTERY_LOAD_WH} Wh (pending load of {pending_pct}%)')
Amperios = float(read_input(f'\nInput load amperes [{Amperios_dflt:.3f}]: ', str(Amperios_dflt)))
potencia = (Amperios*230)/1000 # I * 230V / 1000 = kilovatios de potencia
carga_KWh = potencia * duracion # KWh
# Mostrar datos de optimización:
print(f'\nFor the given duration ({duracion:.3f} hours), the optimal hour to minimize the cost is {optimal_init_str}, and the optimal load area is: {optimal_load:.2f}')
print(f'That is calculated between current hour ({current_hour}) and the end of the day (24)')
# Cálculo de costes
t1 = read_input(f'\nInput t1 [{optimal_init_str}]: ', optimal_init_str)
t1 = convertir_hora_a_decimal(t1)
t2 = optimal_init + duracion
t2_str = convertir_a_horas_minutos(t2)
print(f'Estimated final time will be t2 = {t2_str}')
# Mostrar resultados
print(f'\nPower: {potencia:.3f} KW')
if pct != -1:
print(f"Initial battery load: {pct}% ({CURRENT_BATTERY_LOAD_WH} Wh)")
pct_final = pct + 100 * 1000 * carga_KWh / TOTAL_BATTERY_LOAD_WH
finished_before_str = ""
if pct_final > 100:
pct_final = 100 # ocurre cuando pongo más amperaje que el sugerido
duracion_real = (PENDING_BATTERY_LOAD_WH/1000)/potencia
real_t2 = t1 + duracion_real
real_t2_str = convertir_a_horas_minutos(real_t2)
finished_before_str = f'; indeed, the load finished before t2, at {real_t2_str}'
t2 = real_t2 # calculo real del coste3
duracion = duracion_real
carga_KWh = potencia * duracion # KWh
FINAL_BATTERY_LOAD_WH = TOTAL_BATTERY_LOAD_WH * pct_final/100
print(f"Final battery load: {pct_final:.3f}% ({FINAL_BATTERY_LOAD_WH} Wh{finished_before_str})")
print(f'Effective duration: {duracion:.3f} h')
print(f'Load incremented: {potencia:.3f} x {duracion:.3f} = {carga_KWh:.3f} KWh')
# METODO DEL TRAPECIO #################
# Interpolar los datos (today) para obtener valores intermedios3
interp_func = interp1d(datos["hour"], prices, kind='linear', fill_value="extrapolate")
# Crear una malla fina entre t1 y t2 para integrar
t_vals = np.linspace(t1, t2, 1000) # 1000 puntos entre t1 y t2 # KWh
y_vals = interp_func(t_vals) # Valores interpolados en esos puntos
# Calcular el área bajo la curva usando el método del trapecio
area = trapz(y_vals, t_vals)
# METODO SPLIN CUBICO #################
cs = CubicSpline(datos["hour"], prices)
area2, _ = quad(cs, t1, t2)
t1_str = convertir_a_horas_minutos(t1)
t2_str = convertir_a_horas_minutos(t2)
print(f"\nArea under today's function within time range [{t1_str}, {t2_str}]:")
print(f' Trapezoidal method: {area:.3f} €/kW')
print(f' Cubic splin method: {area2:.3f} €/kW <--- use this (more accurate)')
coste = area * potencia
print(f"\nCost deduced: {area2:.3f} €/KW x {potencia:.3f} KW = {coste:.3f} €")
#!/usr/bin/python3
import argparse
from datetime import datetime, timedelta
import time
import os, sys
# Download automation:
import pandas as pd
from urllib.error import HTTPError
# Plot data:
import matplotlib.pyplot as plt
# OMIE daily files: # https://www.omie.es/es/file-access-list?parents%5B0%5D=/&parents%5B1%5D=Mercado%20Diario&parents%5B2%5D=1.%20Precios&dir=Precios%20horarios%20del%20mercado%20diario%20en%20Espa%C3%B1a&realdir=marginalpdbc
# Browse URL, i.e. https://www.omie.es/es/file-download?parents%5B0%5D=marginalpdbc&filename=marginalpdbc_20240705.1
COLUMNS = {'hour': 'hour', 'price1': '€/kWh'} # data comes in €/MWh (*)
KCOLUMNS = list(COLUMNS) # keys
VCOLUMNS = list(COLUMNS.values()) # values
PUBLISH_TIME_STR = "13:20"
PUBLISH_TIME = datetime.strptime(PUBLISH_TIME_STR, "%H:%M").time()
URL_SOURCE = None
DAY_PRETTY_STR = None
def get_data_source():
ret = None
global URL_SOURCE
try:
ret = pd.read_csv(URL_SOURCE, sep=';', header=None, names=['year', 'month', 'day', 'hour', 'price1', 'price2', 'x'])
except HTTPError:
print(f'ERROR: failed to access online data from OMIE ({URL_SOURCE}).\n')
sys.exit(1)
return ret
def filter_data(data):
ret = None
if (isinstance(data, pd.DataFrame)):
ret = data.copy()
# Remove first and last row, and keep only hour/price1 columns:
ret = ret.drop([ret.index[0], ret.index[-1]]).drop(columns=['year', 'month', 'day', 'price2', 'x'])
else:
print('ERROR: data source is not valid DataFrame.\n')
sys.exit(1)
return ret
def plot(data, quiet, file_path):
global DAY_PRETTY_STR
if (isinstance(data, pd.DataFrame)):
# (*) convert €/MWh to €/KWh:
prices_KWh = []
prices_MWh = data[KCOLUMNS[1]]
for p in prices_MWh:
prices_KWh.append(float(p)/1000)
plt.figure(figsize=(10, 6))
plt.plot(data[KCOLUMNS[0]], prices_KWh, '.-')
plt.title(f'Daily evolution of the price (€) per kWh in the regulated market ({DAY_PRETTY_STR})')
plt.xlabel(VCOLUMNS[0])
plt.ylabel(VCOLUMNS[1])
plt.xticks(data[KCOLUMNS[0]])
plt.grid(True)
plt.tight_layout()
# Save png file
plt.savefig(file_path)
if not quiet: plt.show()
else:
print('ERROR: data source is not valid DataFrame.\n')
def get_reports(downloads_path, quiet, idate, fdate):
global URL_SOURCE
global DAY_PRETTY_STR
auxDate = idate
dates = []
while (auxDate <= fdate):
auxDateStr = auxDate.strftime("%Y%m%d")
file_path = downloads_path + '/marginalpdbc_' + auxDateStr + '.1.png'
if os.path.exists(file_path):
print(f'INFO: skipping ' + file_path + ' ... (already cached). Use firefox or any other viewer to see.')
else:
print(f'INFO: generating ' + file_path + ' ...')
URL_SOURCE = f'https://www.omie.es/es/file-download?parents%5B0%5D=marginalpdbc&filename=marginalpdbc_{auxDateStr}.1'
DAY_PRETTY_STR = auxDate.strftime("%A, %B %d, %Y")
# Download prevision and prepare data:
myData = filter_data(get_data_source())
# Create plot from data retrieved:
plot(myData, quiet, file_path)
symlink_path = downloads_path + '/last.png'
if os.path.islink(symlink_path): os.unlink(symlink_path)
os.symlink(file_path, symlink_path)
dates.append(auxDate)
auxDate += timedelta(days=1)
def main(quiet, idate, fdate):
# Downloads path
script_dir = os.path.dirname(os.path.abspath(__file__))
downloads_path = script_dir + '/pngs'
try: os.makedirs(downloads_path) # just in case
except: pass
get_reports(downloads_path, quiet, idate, fdate)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Download/plot OMIE daily reports. Files cached on './pngs'.")
parser.add_argument('--date', help="downloaded day in format <YYYYMMDD>. If missing, current one is assumed (and after " + PUBLISH_TIME_STR + ", also tomorrow report is retrieved).")
parser.add_argument('--days', '-d', help="Download last N days. After " + PUBLISH_TIME_STR + ", also tomorrow report is retrieved. Value '-1' means tomorrow (use after publish time).")
parser.add_argument('--quiet', '-q', help="Don't show plot on pop window", action='store_true') # on/off flag
args = parser.parse_args()
date_now = datetime.now()
idate = date_now
fdate = date_now
if args.days:
daysOffset = int(args.days)
idate += timedelta(days=-daysOffset)
if daysOffset == -1: fdate = idate # tomorrow special case
else:
if args.date:
idate = datetime.strptime(args.date, '%Y%m%d')
fdate = idate
# Extra day condition:
if date_now.time() > PUBLISH_TIME:
if fdate == date_now:
print(f'It is after {PUBLISH_TIME_STR} (publish time), so tomorrow report shall be also retrieved !')
fdate += timedelta(days=1)
main(args.quiet, idate, fdate)
#!/bin/bash
# Capacidad de bateria en Wh
capacidad=13800
echo
echo "Introduce el amperaje de carga (A) [15]:"
read amp
[ -z "${amp}" ] && amp=15
potencia=$((amp*230))
echo "Potencia de carga = ${amp}A * 230V = ${potencia}W"
echo "Capacidad de bateria = ${capacidad}Wh"
t100=$(echo "scale=3; ${capacidad}/${potencia}" | bc)
echo "Tiempo de carga completa (100%): ${t100} horas"
echo
echo "Introduce el porcentaje de bateria en el inicio de la carga:"
read pct
while [ -z "${pct}" ]; do read pct ; done
trestante=$(echo "scale=3; ${t100}*(100-${pct})/100" | bc)
echo "Tiempo de carga necesaria ($((100-pct))%): ${trestante} horas"
echo
echo "Introduce el tiempo inicio de la carga (HH:MM):"
read inicio
while [ -z "${pct}" ]; do read pct ; done
trestante=$(echo "scale=3; ${t100}*(100-${pct})/100" | bc)
trestante_secs=$(echo "3600*${trestante}" | bc)
prestante=$((100-pct))
echo "Tiempo de carga necesaria (${prestante}%): ${trestante} horas"
echo
echo "Calculo de y(x)=mx + n, con y=%carga y x=timestamp unix"
x0=$(date -d "${inicio}" +"%s")
echo "Punto inicial (x0,y0): (${x0},${pct})"
x1=$(date -d "${inicio} + ${trestante_secs} seconds" +"%s")
echo "Punto final (x1,y1): (${x1},100)"
# Obtener el offset de UTC en segundos (horas * 3600 + minutos * 60)
offset_utc=$(date +%z) # Ejemplo: +0200 para UTC+2
offset_sign=${offset_utc:0:1} # Obtener el signo (+ o -)
offset_hours=${offset_utc:1:2} # Obtener las horas
offset_minutes=${offset_utc:3:2} # Obtener los minutos
offset_seconds=$(echo "$offset_hours*3600 + $offset_minutes*60" | bc) # Convertir a segundos
x0=$((x0${offset_sign}${offset_seconds}))
x1=$((x1${offset_sign}${offset_seconds}))
cat << EOF > .predict.gp
# Configurar la salida como un archivo PNG
set terminal pngcairo size 800,600 enhanced font 'Arial,12'
set output png_path
# Configuración del formato de tiempo y datos
set xdata time
set timefmt "%s" # Formato de tiempo como timestamp Unix
set format x "%H:%M" # Mostrar el eje X en formato HH:MM
set xlabel "Tiempo (HH:MM)"
set ylabel "Porcentaje (%)"
# Configurar el rango del eje Y (porcentaje de 0 a 100)
set yrange [0:100]
# Activar el grid y personalizarlo
set grid xtics ytics # Activa el grid en ambos ejes (X e Y)
set ytics 5 # Coloca tics (divisiones) en el eje Y cada 5 unidades
# Configurar el rango del eje X basado en los timestamps Unix
set xrange [x0:x1]
# Definir la ecuación de la recta con los valores de m y n
f(x) = m * x + n
# Graficar la recta
plot f(x) title "Recta: y(x) = m*x + n"
# Cerrar la salida del archivo PNG
unset output
EOF
# Calcular m y n
m=$(echo "scale=8; ${prestante} / ($x1 - $x0)" | bc)
n=$(echo "${pct} - ${m} * $x0" | bc)
# Pasar los valores a gnuplot
gnuplot -e "x0=$x0; x1=$x1; m=$m; n=$n; png_path='./predict.png'" .predict.gp
firefox ./predict.png
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment