Last active
May 6, 2025 00:05
-
-
Save titu1994/a0e3ac46539cab2e118acdcf478e284b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv | |
import re | |
import inspect | |
from pathlib import Path | |
import torch | |
import torch.nn as nn | |
import numpy as np | |
### --- Model Function --- ### | |
# def equation(b: torch.Tensor, s: torch.Tensor, temp: torch.Tensor, pH: torch.Tensor, params: list) -> torch.Tensor: | |
# """ Mathematical function for bacterial growth rate based on provided model. | |
# | |
# Args: | |
# b: A torch tensor representing observations of population density of the bacterial species (B). | |
# s: A torch tensor representing observations of substrate concentration (S). | |
# temp: A torch tensor representing observations of temperature (T). | |
# pH: A torch tensor representing observations of pH level (pH). | |
# params: Array of numeric constants or parameters to be optimized, expected in the following order: | |
# [Mu, alpha, Ks, Kt, Kp, Kd, gamma_t, gamma_p, T_opt, pH_opt (params[10])] | |
# | |
# Return: | |
# A torch tensor representing bacterial growth rate as the result of applying the mathematical function to the inputs. | |
# """ | |
# # Unpack parameters using list indexing | |
# # params = [Mu, alpha, Ks, Kt, Kp, Kd, gamma_t, gamma_p, T_opt, pH_opt] | |
# # [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9] | |
# | |
# # Temperature modulation factor | |
# temp_factor = 1 / (1 + torch.exp(params[6] * (temp - params[8]))) | |
# | |
# # pH modulation factor | |
# pH_factor = 1 / (1 + torch.exp(params[7] * (pH - params[9]))) | |
# | |
# # Population density effect | |
# # Note: Using torch.clamp to avoid issues if b is zero or negative and alpha is non-integer | |
# # population_effect = torch.pow(torch.clamp(b, min=1e-9), params[1]) | |
# population_effect = torch.pow(b, params[1]) | |
# | |
# # Substrate concentration effect (Monod) | |
# # Note: Adding small epsilon to denominator to avoid division by zero if s + Ks is zero | |
# substrate_effect = s / (s + params[2]) | |
# | |
# # Temperature dependency | |
# temperature_effect = torch.exp(-params[3] * torch.square(temp - params[8])) | |
# | |
# # pH dependency | |
# pH_effect = torch.exp(-params[4] * torch.square(pH - params[9])) | |
# | |
# # Bacterial death rate | |
# death_rate = params[5] * b | |
# | |
# # Combine all effects to get the growth rate | |
# growth_rate = (params[0] * population_effect * substrate_effect * | |
# temperature_effect * pH_effect - death_rate) * temp_factor * pH_factor | |
# | |
# return growth_rate | |
def equation(b: torch.Tensor, s: torch.Tensor, temp: torch.Tensor, pH: torch.Tensor, params: list) -> torch.Tensor: | |
""" Mathematical function for bacterial growth rate based on provided model. | |
Args: | |
b: A torch tensor representing observations of population density (B). | |
s: A torch tensor representing observations of substrate concentration (S). | |
temp: A torch tensor representing observations of temperature (T). | |
pH: A torch tensor representing observations of pH level (pH). | |
params: Array of numeric constants or parameters to be optimized. | |
Expected order: [Mu, K, Ks, Kt, Kh, E, F, G, H, I, T_opt, pH_opt (params[11])] | |
Return: | |
A torch tensor representing bacterial growth rate as the result of applying the mathematical function to the inputs. | |
""" | |
# Unpack the parameters | |
# Note: Ensure params list has the correct number and order of parameters | |
if len(params) != 12: | |
raise ValueError("Expected 12 parameters in the params list.") | |
Mu, K, Ks, Kt, Kh, E, F, G, H, I, T_opt, pH_opt = params | |
# Temperature factor (Cardinal Temperature Model with inflexion) | |
# Using torch equivalents of numpy functions | |
temp_diff = temp - T_opt | |
temperature_factor = torch.exp(-Kt * torch.abs(temp_diff) / E - (temp_diff**2) / (2 * (E**2))) | |
# pH factor | |
pH_diff_abs = torch.abs(pH - pH_opt) | |
pH_factor = 1. / (1. + Kh * torch.exp(pH_diff_abs / F)) | |
# Inhibition factor | |
# Using torch.clamp for clipping and torch.exp for exponentiation | |
inhibition_factor = 1. + I * torch.clamp(pH_diff_abs + torch.exp(-pH_diff_abs), min=0., max=10.) | |
# Temperature-dependent growth factor (combining G, H, T) | |
h_factor_arg = G * (temp - T_opt) + H | |
# Using torch.tanh and torch.arctan (assuming np.arctanh was intended - if arctanh needed, ensure domain > -1 and < 1 or use torch.atanh). | |
h_factor = (1 + torch.tanh(0.5 * (h_factor_arg + torch.arctan(h_factor_arg)))) # Directly translating the image's formula, though it seems complex/potentially incorrect (tanh of tanh?) | |
growth_factor_temp_ph = h_factor * pH_factor * inhibition_factor | |
# Kinetic factor (Monod) | |
# Using torch division and addition | |
kinetic_factor = s / (Ks + s) | |
# Growth rate term combining kinetic factor with potential inhibition/modification | |
# Using torch.where for conditional logic | |
modified_kinetic = torch.where( | |
kinetic_factor > 1., | |
kinetic_factor / (1. + torch.exp(-K * (kinetic_factor - 1.))), | |
kinetic_factor | |
) | |
growth_rate = Mu * b * modified_kinetic # Corrected based on typical growth models (Mu * B * Monod_term) | |
# Combine all effects to get final growth rate | |
# The term np.maximum(1 - np.abs(T - T_opt) / 10, 0) seems like another temperature adjustment/limitation | |
# final_temp_adjustment = torch.maximum(torch.tensor(0.0, device=temp.device), 1 - torch.abs(temp - T_opt) / 10.) | |
final_temp_adjustment = 1 - torch.abs(temp - T_opt) / 10. | |
# | |
# print("TF", temperature_factor) | |
# print("GF", growth_factor_temp_ph) | |
# print("Tadj", final_temp_adjustment) | |
growth_rate *= temperature_factor * growth_factor_temp_ph * final_temp_adjustment # Combined all calculated factors with the base rate | |
return growth_rate | |
### --- Utility Functions --- ### | |
def get_function_code_as_string(func): | |
try: | |
source_code = inspect.getsource(func) | |
except OSError as e: | |
raise OSError(f"Could not retrieve source code: {e}") | |
all_imports = "import torch\nimport numpy as np\n" | |
source_code = f"{all_imports}{source_code}" | |
return source_code | |
class SkeletonModel(nn.Module): | |
def __init__(self, num_params, skeleton_func_ref, params=None, param_init_fn='ones', scale=0.1): | |
super().__init__() | |
self.skeleton_func_ref = skeleton_func_ref | |
# If params are provided, use them; otherwise, initialize new ones | |
if params is not None: | |
assert len(params) == num_params, "Provided params length does not match num_params" | |
# Check if torch tensors | |
for pidx in range(len(params)): | |
if not torch.is_tensor(params[pidx]): | |
params[pidx] = torch.tensor(params[pidx], dtype=torch.float32) | |
self.params = nn.ParameterList([nn.Parameter(p) for p in params]) | |
else: | |
# Initialize parameters (e.g., randomly or with ones) | |
assert param_init_fn in ['zeros', 'ones', 'randn', 'uniform'] | |
if param_init_fn in ['zeros', 'ones']: | |
init_fn = getattr(torch, param_init_fn) | |
init_fn = lambda: getattr(torch, param_init_fn)(1) | |
elif param_init_fn == 'uniform': | |
init_fn = lambda: torch.rand(size=(1,))[0] | |
elif param_init_fn == 'randn': | |
init_fn = lambda: torch.randn(size=(1,))[0] | |
else: | |
raise ValueError(f"Invalid param init fn: {param_init_fn}") | |
self.params = nn.ParameterList([nn.Parameter(init_fn() * scale) for _ in range(num_params)]) | |
def forward(self, *args): | |
# Call the function defined via exec, passing the torch ParameterList | |
try: | |
# The skeleton_func_ref needs to handle this ParameterList input | |
return self.skeleton_func_ref(*args, self.params) | |
except Exception as e: | |
# print(f"Error during model forward pass: {e}") # Optional debug | |
# Return NaN or Inf tensor to signal error downstream | |
# Adjust shape based on expected output if possible, otherwise use input shape | |
expected_shape = args[0].shape # Assuming N x 1 output like input | |
return torch.full(expected_shape, float('nan')) | |
def normalized_mse(pred, target): | |
mse = torch.mean(torch.square(pred - target)) | |
# nmse = mse | |
# nmse = mse / torch.var(target) | |
# nmse = mse # / torch.mean(target ** 2) | |
nmse = mse / (torch.max(target) - torch.min(target)) | |
return nmse | |
# --- Data --- | |
# Read csv file for training data | |
dataset_path = Path('./data/bactgrow') | |
train_path = str(dataset_path / "train.csv") | |
test_id_path = str(dataset_path / "test_id.csv") | |
test_path = str(dataset_path / "test_ood.csv") | |
def read_csv(file_path): | |
with open(file_path, 'r') as f: | |
reader = csv.reader(f) | |
_ = next(reader) # Skip header | |
data = [list(map(float, row)) for row in reader] | |
return np.array(data) | |
train_data = read_csv(train_path) | |
x_train = train_data[:, :-1] # All columns except the last | |
y_train = train_data[:, -1] # Last column | |
test_id_data = read_csv(test_id_path) | |
x_test_id = test_id_data[:, :-1] # All columns except the last | |
y_test_id = test_id_data[:, -1] # Last column | |
test_data = read_csv(test_path) | |
x_test = test_data[:, :-1] # All columns except the last | |
y_test = test_data[:, -1] # Last column | |
TRAIN_DATA = {'inputs': x_train, 'outputs': y_train} | |
TEST_ID_DATA = {'inputs': x_test_id, 'outputs': y_test_id} | |
TEST_OOD_DATA = {'inputs': x_test, 'outputs': y_test} | |
### --- Evaluation Function --- ### | |
def evaluate_code_lbfgs(code_str, data=None) -> (float, list): | |
""" | |
Evaluates the generated code string by optimizing its parameters using PyTorch and LBFGS | |
and returns the mean squared error (MSE) of the best fit found and the parameters used. | |
""" | |
if "np." in code_str and "torch." not in code_str: | |
print(f"--- Warning: Code appears NumPy-based. Skipping evaluation as PyTorch is required. ---") | |
return float('inf'), None | |
if f"def equation(" not in code_str or "params" not in code_str: | |
print(f"--- Skipping evaluation: Function 'equation' or 'params' not found/used in code ---") | |
return float('inf'), None | |
if data is None: | |
data = TRAIN_DATA | |
try: | |
# Prepare data | |
x_train_np = data['inputs'].astype(np.float32) | |
y_true_np = data['outputs'].astype(np.float32) | |
x_test_np = TEST_ID_DATA['inputs'].astype(np.float32) | |
y_test_np = TEST_ID_DATA['outputs'].astype(np.float32) | |
x_tensor = torch.tensor(x_train_np, dtype=torch.float32) | |
y_tensor = torch.tensor(y_true_np, dtype=torch.float32) | |
x_test_tensor = torch.tensor(x_test_np, dtype=torch.float32) | |
y_test_tensor = torch.tensor(y_test_np, dtype=torch.float32) | |
# Infer number of parameters | |
param_indices = [int(i) for i in re.findall(r'params\[(\d+)\]', code_str)] | |
num_params = max(param_indices) + 1 if param_indices else 0 | |
exec_namespace = {'torch': torch, 'nn': nn} | |
try: | |
exec(code_str, exec_namespace) | |
skeleton_func_torch = exec_namespace.get('equation') | |
if not callable(skeleton_func_torch): | |
print(f"--- Skipping evaluation: Failed to define callable function 'equation' via exec ---") | |
return float('inf'), None | |
except Exception as e: | |
print(f"--- Error executing generated code string: {e} ---") | |
return float('inf'), None | |
if num_params == 0: | |
print(f"--- Warning: No 'params[i]' usage found. Evaluating with no parameters. ---") | |
return float('inf'), None | |
model = SkeletonModel(num_params, skeleton_func_torch, param_init_fn='ones', scale=1.0) | |
criterion = nn.MSELoss() # Mean Squared Error Loss | |
# Define LBFGS optimizer | |
optimizer = torch.optim.LBFGS(model.parameters(), max_iter=200, tolerance_grad=1e-7) | |
# Optimization loop | |
model.train() | |
def closure(): | |
optimizer.zero_grad() | |
b, s, temp, pH = x_tensor.split(1, dim=1) # Assuming 4 features | |
y_pred = model(b, s, temp, pH) | |
if torch.isnan(y_pred).any() or torch.isinf(y_pred).any(): | |
raise ValueError("--- NaN/Inf detected in model output during optimization ---") | |
if y_pred.shape != y_tensor.shape: | |
if y_pred.numel() == y_tensor.numel(): | |
y_pred = y_pred.reshape(y_tensor.shape) | |
else: | |
raise ValueError(f"--- Shape mismatch ({y_pred.shape} vs {y_tensor.shape}) ---") | |
loss = criterion(y_pred, y_tensor) | |
if torch.isnan(loss) or torch.isinf(loss): | |
raise ValueError("--- NaN/Inf loss detected during optimization ---") | |
loss.backward() | |
return loss | |
try: | |
optimizer.step(closure) | |
except Exception as e: | |
print(f"--- Optimization failed: {e} ---") | |
return float('inf'), None | |
# Final evaluation | |
model.eval() | |
with torch.no_grad(): | |
b, s, temp, pH = x_test_tensor.split(1, dim=1) | |
y_final_pred = model(b, s, temp, pH) | |
if y_final_pred.shape != y_test_tensor.shape: | |
if y_final_pred.numel() == y_test_tensor.numel(): | |
y_final_pred = y_final_pred.reshape(y_test_tensor.shape) | |
else: | |
print(f"--- Warning: Final shape mismatch ({y_final_pred.shape} vs {y_test_tensor.shape}). ---") | |
return float('inf'), None | |
final_loss = normalized_mse(y_final_pred, y_test_tensor) | |
final_mse = final_loss.item() | |
# 7. Print Results | |
print(f"\nTest In-Domain Prediction Example (using optimized params):") | |
# print(f"Input x: {x_test_np}") | |
print(f"Expected y: {y_test_np}") | |
print(f"Predicted y: {y_final_pred.cpu().numpy()}") | |
print(f"Average Expected y: {y_test_np.mean()}") | |
print(f"Average Predicted y: {y_final_pred.mean().item()}") | |
print(f"Prediction MSE: {final_mse:.6f}") # Using MSE consistent with optimization | |
params = [float(p.clone().detach().item()) for p in model.parameters()] | |
return (final_mse, params) if not (np.isnan(final_mse) or np.isinf(final_mse)) else (float('inf'), None) | |
except Exception as e: | |
print(f"--- Error during PyTorch evaluation/optimization setup ---") | |
print(f"Error: {e}") | |
return float('inf'), None | |
def evaluate_final(code_str, params: list): | |
""" | |
Evaluate the generated code string by optimizing its parameters using PyTorch and Adam | |
and returns the mean squared error (MSE) of the best fit found and the parameters used. | |
""" | |
try: | |
# 1. Define the skeleton function from the final best code string | |
verify_exec_namespace = {'torch': torch, 'nn': nn} | |
exec(code_str, verify_exec_namespace) | |
final_skeleton_func = verify_exec_namespace.get('equation') | |
x_test_np = TEST_OOD_DATA['inputs'].astype(np.float32) | |
y_test_np = TEST_OOD_DATA['outputs'].astype(np.float32) | |
test_x_tensor = torch.tensor(x_test_np, dtype=torch.float32) | |
test_y_tensor = torch.tensor(y_test_np, dtype=torch.float32) | |
# 2. Define PyTorch Model Wrapper | |
num_params = len(params) if params is not None else 0 | |
verification_model = SkeletonModel(num_params, final_skeleton_func, params=params) | |
verification_model.eval() | |
with torch.no_grad(): | |
# 3. Run the verification model | |
b, s, temp, pH = test_x_tensor.split(1, dim=1) | |
test_pred = verification_model(b, s, temp, pH) | |
if test_pred.shape != test_y_tensor.shape: | |
if test_pred.numel() == test_y_tensor.numel(): | |
test_pred = test_pred.reshape(test_y_tensor.shape) | |
# Calculate final loss/metric | |
criterion = normalized_mse # Mean Squared Error Loss | |
test_loss_val = criterion(test_pred.float(), test_y_tensor.float()).item() | |
test_pred_np = test_pred.cpu().numpy() # Convert for printing | |
# 7. Print Results | |
print(f"\nTest OOD Prediction Example (using optimized params):") | |
# print(f"Input x: {x_test_np}") | |
print(f"Expected y: {y_test_np}") | |
print(f"Predicted y: {test_pred_np}") | |
print(f"Average Expected y: {y_test_np.mean()}") | |
print(f"Average Predicted y: {test_pred_np.mean().item()}") | |
print(f"Prediction MSE: {test_loss_val:.6f}") # Using MSE consistent with optimization | |
return test_loss_val | |
except Exception as e: | |
import traceback | |
print(f"\nCould not run final verification/prediction with PyTorch: {e}") | |
print(traceback.format_exc()) | |
### --- Main Execution --- ### | |
func = get_function_code_as_string(equation) | |
score, params = evaluate_code_lbfgs(func) | |
print("Test In-domain score:", score) | |
test_score = evaluate_final(func, params) | |
print("Test score:", test_score) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Mixtral equation (second one)