Skip to content

Instantly share code, notes, and snippets.

@titu1994
Last active May 6, 2025 00:05
Show Gist options
  • Save titu1994/a0e3ac46539cab2e118acdcf478e284b to your computer and use it in GitHub Desktop.
Save titu1994/a0e3ac46539cab2e118acdcf478e284b to your computer and use it in GitHub Desktop.
import csv
import re
import inspect
from pathlib import Path
import torch
import torch.nn as nn
import numpy as np
### --- Model Function --- ###
# def equation(b: torch.Tensor, s: torch.Tensor, temp: torch.Tensor, pH: torch.Tensor, params: list) -> torch.Tensor:
# """ Mathematical function for bacterial growth rate based on provided model.
#
# Args:
# b: A torch tensor representing observations of population density of the bacterial species (B).
# s: A torch tensor representing observations of substrate concentration (S).
# temp: A torch tensor representing observations of temperature (T).
# pH: A torch tensor representing observations of pH level (pH).
# params: Array of numeric constants or parameters to be optimized, expected in the following order:
# [Mu, alpha, Ks, Kt, Kp, Kd, gamma_t, gamma_p, T_opt, pH_opt (params[10])]
#
# Return:
# A torch tensor representing bacterial growth rate as the result of applying the mathematical function to the inputs.
# """
# # Unpack parameters using list indexing
# # params = [Mu, alpha, Ks, Kt, Kp, Kd, gamma_t, gamma_p, T_opt, pH_opt]
# # [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
#
# # Temperature modulation factor
# temp_factor = 1 / (1 + torch.exp(params[6] * (temp - params[8])))
#
# # pH modulation factor
# pH_factor = 1 / (1 + torch.exp(params[7] * (pH - params[9])))
#
# # Population density effect
# # Note: Using torch.clamp to avoid issues if b is zero or negative and alpha is non-integer
# # population_effect = torch.pow(torch.clamp(b, min=1e-9), params[1])
# population_effect = torch.pow(b, params[1])
#
# # Substrate concentration effect (Monod)
# # Note: Adding small epsilon to denominator to avoid division by zero if s + Ks is zero
# substrate_effect = s / (s + params[2])
#
# # Temperature dependency
# temperature_effect = torch.exp(-params[3] * torch.square(temp - params[8]))
#
# # pH dependency
# pH_effect = torch.exp(-params[4] * torch.square(pH - params[9]))
#
# # Bacterial death rate
# death_rate = params[5] * b
#
# # Combine all effects to get the growth rate
# growth_rate = (params[0] * population_effect * substrate_effect *
# temperature_effect * pH_effect - death_rate) * temp_factor * pH_factor
#
# return growth_rate
def equation(b: torch.Tensor, s: torch.Tensor, temp: torch.Tensor, pH: torch.Tensor, params: list) -> torch.Tensor:
""" Mathematical function for bacterial growth rate based on provided model.
Args:
b: A torch tensor representing observations of population density (B).
s: A torch tensor representing observations of substrate concentration (S).
temp: A torch tensor representing observations of temperature (T).
pH: A torch tensor representing observations of pH level (pH).
params: Array of numeric constants or parameters to be optimized.
Expected order: [Mu, K, Ks, Kt, Kh, E, F, G, H, I, T_opt, pH_opt (params[11])]
Return:
A torch tensor representing bacterial growth rate as the result of applying the mathematical function to the inputs.
"""
# Unpack the parameters
# Note: Ensure params list has the correct number and order of parameters
if len(params) != 12:
raise ValueError("Expected 12 parameters in the params list.")
Mu, K, Ks, Kt, Kh, E, F, G, H, I, T_opt, pH_opt = params
# Temperature factor (Cardinal Temperature Model with inflexion)
# Using torch equivalents of numpy functions
temp_diff = temp - T_opt
temperature_factor = torch.exp(-Kt * torch.abs(temp_diff) / E - (temp_diff**2) / (2 * (E**2)))
# pH factor
pH_diff_abs = torch.abs(pH - pH_opt)
pH_factor = 1. / (1. + Kh * torch.exp(pH_diff_abs / F))
# Inhibition factor
# Using torch.clamp for clipping and torch.exp for exponentiation
inhibition_factor = 1. + I * torch.clamp(pH_diff_abs + torch.exp(-pH_diff_abs), min=0., max=10.)
# Temperature-dependent growth factor (combining G, H, T)
h_factor_arg = G * (temp - T_opt) + H
# Using torch.tanh and torch.arctan (assuming np.arctanh was intended - if arctanh needed, ensure domain > -1 and < 1 or use torch.atanh).
h_factor = (1 + torch.tanh(0.5 * (h_factor_arg + torch.arctan(h_factor_arg)))) # Directly translating the image's formula, though it seems complex/potentially incorrect (tanh of tanh?)
growth_factor_temp_ph = h_factor * pH_factor * inhibition_factor
# Kinetic factor (Monod)
# Using torch division and addition
kinetic_factor = s / (Ks + s)
# Growth rate term combining kinetic factor with potential inhibition/modification
# Using torch.where for conditional logic
modified_kinetic = torch.where(
kinetic_factor > 1.,
kinetic_factor / (1. + torch.exp(-K * (kinetic_factor - 1.))),
kinetic_factor
)
growth_rate = Mu * b * modified_kinetic # Corrected based on typical growth models (Mu * B * Monod_term)
# Combine all effects to get final growth rate
# The term np.maximum(1 - np.abs(T - T_opt) / 10, 0) seems like another temperature adjustment/limitation
# final_temp_adjustment = torch.maximum(torch.tensor(0.0, device=temp.device), 1 - torch.abs(temp - T_opt) / 10.)
final_temp_adjustment = 1 - torch.abs(temp - T_opt) / 10.
#
# print("TF", temperature_factor)
# print("GF", growth_factor_temp_ph)
# print("Tadj", final_temp_adjustment)
growth_rate *= temperature_factor * growth_factor_temp_ph * final_temp_adjustment # Combined all calculated factors with the base rate
return growth_rate
### --- Utility Functions --- ###
def get_function_code_as_string(func):
try:
source_code = inspect.getsource(func)
except OSError as e:
raise OSError(f"Could not retrieve source code: {e}")
all_imports = "import torch\nimport numpy as np\n"
source_code = f"{all_imports}{source_code}"
return source_code
class SkeletonModel(nn.Module):
def __init__(self, num_params, skeleton_func_ref, params=None, param_init_fn='ones', scale=0.1):
super().__init__()
self.skeleton_func_ref = skeleton_func_ref
# If params are provided, use them; otherwise, initialize new ones
if params is not None:
assert len(params) == num_params, "Provided params length does not match num_params"
# Check if torch tensors
for pidx in range(len(params)):
if not torch.is_tensor(params[pidx]):
params[pidx] = torch.tensor(params[pidx], dtype=torch.float32)
self.params = nn.ParameterList([nn.Parameter(p) for p in params])
else:
# Initialize parameters (e.g., randomly or with ones)
assert param_init_fn in ['zeros', 'ones', 'randn', 'uniform']
if param_init_fn in ['zeros', 'ones']:
init_fn = getattr(torch, param_init_fn)
init_fn = lambda: getattr(torch, param_init_fn)(1)
elif param_init_fn == 'uniform':
init_fn = lambda: torch.rand(size=(1,))[0]
elif param_init_fn == 'randn':
init_fn = lambda: torch.randn(size=(1,))[0]
else:
raise ValueError(f"Invalid param init fn: {param_init_fn}")
self.params = nn.ParameterList([nn.Parameter(init_fn() * scale) for _ in range(num_params)])
def forward(self, *args):
# Call the function defined via exec, passing the torch ParameterList
try:
# The skeleton_func_ref needs to handle this ParameterList input
return self.skeleton_func_ref(*args, self.params)
except Exception as e:
# print(f"Error during model forward pass: {e}") # Optional debug
# Return NaN or Inf tensor to signal error downstream
# Adjust shape based on expected output if possible, otherwise use input shape
expected_shape = args[0].shape # Assuming N x 1 output like input
return torch.full(expected_shape, float('nan'))
def normalized_mse(pred, target):
mse = torch.mean(torch.square(pred - target))
# nmse = mse
# nmse = mse / torch.var(target)
# nmse = mse # / torch.mean(target ** 2)
nmse = mse / (torch.max(target) - torch.min(target))
return nmse
# --- Data ---
# Read csv file for training data
dataset_path = Path('./data/bactgrow')
train_path = str(dataset_path / "train.csv")
test_id_path = str(dataset_path / "test_id.csv")
test_path = str(dataset_path / "test_ood.csv")
def read_csv(file_path):
with open(file_path, 'r') as f:
reader = csv.reader(f)
_ = next(reader) # Skip header
data = [list(map(float, row)) for row in reader]
return np.array(data)
train_data = read_csv(train_path)
x_train = train_data[:, :-1] # All columns except the last
y_train = train_data[:, -1] # Last column
test_id_data = read_csv(test_id_path)
x_test_id = test_id_data[:, :-1] # All columns except the last
y_test_id = test_id_data[:, -1] # Last column
test_data = read_csv(test_path)
x_test = test_data[:, :-1] # All columns except the last
y_test = test_data[:, -1] # Last column
TRAIN_DATA = {'inputs': x_train, 'outputs': y_train}
TEST_ID_DATA = {'inputs': x_test_id, 'outputs': y_test_id}
TEST_OOD_DATA = {'inputs': x_test, 'outputs': y_test}
### --- Evaluation Function --- ###
def evaluate_code_lbfgs(code_str, data=None) -> (float, list):
"""
Evaluates the generated code string by optimizing its parameters using PyTorch and LBFGS
and returns the mean squared error (MSE) of the best fit found and the parameters used.
"""
if "np." in code_str and "torch." not in code_str:
print(f"--- Warning: Code appears NumPy-based. Skipping evaluation as PyTorch is required. ---")
return float('inf'), None
if f"def equation(" not in code_str or "params" not in code_str:
print(f"--- Skipping evaluation: Function 'equation' or 'params' not found/used in code ---")
return float('inf'), None
if data is None:
data = TRAIN_DATA
try:
# Prepare data
x_train_np = data['inputs'].astype(np.float32)
y_true_np = data['outputs'].astype(np.float32)
x_test_np = TEST_ID_DATA['inputs'].astype(np.float32)
y_test_np = TEST_ID_DATA['outputs'].astype(np.float32)
x_tensor = torch.tensor(x_train_np, dtype=torch.float32)
y_tensor = torch.tensor(y_true_np, dtype=torch.float32)
x_test_tensor = torch.tensor(x_test_np, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test_np, dtype=torch.float32)
# Infer number of parameters
param_indices = [int(i) for i in re.findall(r'params\[(\d+)\]', code_str)]
num_params = max(param_indices) + 1 if param_indices else 0
exec_namespace = {'torch': torch, 'nn': nn}
try:
exec(code_str, exec_namespace)
skeleton_func_torch = exec_namespace.get('equation')
if not callable(skeleton_func_torch):
print(f"--- Skipping evaluation: Failed to define callable function 'equation' via exec ---")
return float('inf'), None
except Exception as e:
print(f"--- Error executing generated code string: {e} ---")
return float('inf'), None
if num_params == 0:
print(f"--- Warning: No 'params[i]' usage found. Evaluating with no parameters. ---")
return float('inf'), None
model = SkeletonModel(num_params, skeleton_func_torch, param_init_fn='ones', scale=1.0)
criterion = nn.MSELoss() # Mean Squared Error Loss
# Define LBFGS optimizer
optimizer = torch.optim.LBFGS(model.parameters(), max_iter=200, tolerance_grad=1e-7)
# Optimization loop
model.train()
def closure():
optimizer.zero_grad()
b, s, temp, pH = x_tensor.split(1, dim=1) # Assuming 4 features
y_pred = model(b, s, temp, pH)
if torch.isnan(y_pred).any() or torch.isinf(y_pred).any():
raise ValueError("--- NaN/Inf detected in model output during optimization ---")
if y_pred.shape != y_tensor.shape:
if y_pred.numel() == y_tensor.numel():
y_pred = y_pred.reshape(y_tensor.shape)
else:
raise ValueError(f"--- Shape mismatch ({y_pred.shape} vs {y_tensor.shape}) ---")
loss = criterion(y_pred, y_tensor)
if torch.isnan(loss) or torch.isinf(loss):
raise ValueError("--- NaN/Inf loss detected during optimization ---")
loss.backward()
return loss
try:
optimizer.step(closure)
except Exception as e:
print(f"--- Optimization failed: {e} ---")
return float('inf'), None
# Final evaluation
model.eval()
with torch.no_grad():
b, s, temp, pH = x_test_tensor.split(1, dim=1)
y_final_pred = model(b, s, temp, pH)
if y_final_pred.shape != y_test_tensor.shape:
if y_final_pred.numel() == y_test_tensor.numel():
y_final_pred = y_final_pred.reshape(y_test_tensor.shape)
else:
print(f"--- Warning: Final shape mismatch ({y_final_pred.shape} vs {y_test_tensor.shape}). ---")
return float('inf'), None
final_loss = normalized_mse(y_final_pred, y_test_tensor)
final_mse = final_loss.item()
# 7. Print Results
print(f"\nTest In-Domain Prediction Example (using optimized params):")
# print(f"Input x: {x_test_np}")
print(f"Expected y: {y_test_np}")
print(f"Predicted y: {y_final_pred.cpu().numpy()}")
print(f"Average Expected y: {y_test_np.mean()}")
print(f"Average Predicted y: {y_final_pred.mean().item()}")
print(f"Prediction MSE: {final_mse:.6f}") # Using MSE consistent with optimization
params = [float(p.clone().detach().item()) for p in model.parameters()]
return (final_mse, params) if not (np.isnan(final_mse) or np.isinf(final_mse)) else (float('inf'), None)
except Exception as e:
print(f"--- Error during PyTorch evaluation/optimization setup ---")
print(f"Error: {e}")
return float('inf'), None
def evaluate_final(code_str, params: list):
"""
Evaluate the generated code string by optimizing its parameters using PyTorch and Adam
and returns the mean squared error (MSE) of the best fit found and the parameters used.
"""
try:
# 1. Define the skeleton function from the final best code string
verify_exec_namespace = {'torch': torch, 'nn': nn}
exec(code_str, verify_exec_namespace)
final_skeleton_func = verify_exec_namespace.get('equation')
x_test_np = TEST_OOD_DATA['inputs'].astype(np.float32)
y_test_np = TEST_OOD_DATA['outputs'].astype(np.float32)
test_x_tensor = torch.tensor(x_test_np, dtype=torch.float32)
test_y_tensor = torch.tensor(y_test_np, dtype=torch.float32)
# 2. Define PyTorch Model Wrapper
num_params = len(params) if params is not None else 0
verification_model = SkeletonModel(num_params, final_skeleton_func, params=params)
verification_model.eval()
with torch.no_grad():
# 3. Run the verification model
b, s, temp, pH = test_x_tensor.split(1, dim=1)
test_pred = verification_model(b, s, temp, pH)
if test_pred.shape != test_y_tensor.shape:
if test_pred.numel() == test_y_tensor.numel():
test_pred = test_pred.reshape(test_y_tensor.shape)
# Calculate final loss/metric
criterion = normalized_mse # Mean Squared Error Loss
test_loss_val = criterion(test_pred.float(), test_y_tensor.float()).item()
test_pred_np = test_pred.cpu().numpy() # Convert for printing
# 7. Print Results
print(f"\nTest OOD Prediction Example (using optimized params):")
# print(f"Input x: {x_test_np}")
print(f"Expected y: {y_test_np}")
print(f"Predicted y: {test_pred_np}")
print(f"Average Expected y: {y_test_np.mean()}")
print(f"Average Predicted y: {test_pred_np.mean().item()}")
print(f"Prediction MSE: {test_loss_val:.6f}") # Using MSE consistent with optimization
return test_loss_val
except Exception as e:
import traceback
print(f"\nCould not run final verification/prediction with PyTorch: {e}")
print(traceback.format_exc())
### --- Main Execution --- ###
func = get_function_code_as_string(equation)
score, params = evaluate_code_lbfgs(func)
print("Test In-domain score:", score)
test_score = evaluate_final(func, params)
print("Test score:", test_score)
@titu1994
Copy link
Author

titu1994 commented May 6, 2025

Mixtral equation (second one)

Test In-Domain Prediction Example (using optimized params):
Expected y: [-2.1936168e-05 -6.7683235e-05 -4.9409105e-06 ...  1.2329100e-02
 -6.2118920e-07 -7.3119518e-05]
Predicted y: [ 1.0744512e-33  3.9555302e-20 -0.0000000e+00 ... -0.0000000e+00
  0.0000000e+00 -0.0000000e+00]
Average Expected y: 0.04015669599175453
Average Predicted y: 1.7802126421884168e-06
Prediction MSE: 0.011442
Test In-domain score: 0.01144240703433752

Test OOD Prediction Example (using optimized params):
Expected y: [ 1.9039959e-01  5.8494556e-09  3.3687285e-01 ... -1.7139068e-07
  9.1900420e-06 -1.4637854e-04]
Predicted y: [-0. -0. -0. ... -0. -0. -0.]
Average Expected y: 0.0436008982360363
Average Predicted y: 0.0002466280711814761
Prediction MSE: 0.012438
Test score: 0.012437880970537663```

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment