Skip to content

Instantly share code, notes, and snippets.

@tiandiao123
Created November 27, 2024 12:36
Show Gist options
  • Save tiandiao123/bc96ef15bfe42c95febef003aeccfd51 to your computer and use it in GitHub Desktop.
Save tiandiao123/bc96ef15bfe42c95febef003aeccfd51 to your computer and use it in GitHub Desktop.
import os
import joblib
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import precision_score, recall_score, accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from rl_trade.rolling_train.trainers.indicator_factory import add_technical_indicators, fetch_and_process_data
# 定义一个简单的 MLP 神经网络模型
class MLP(nn.Module):
def __init__(self, input_dim, hidden_dim, output_dim):
super(MLP, self).__init__()
self.fc1 = nn.Linear(input_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, hidden_dim)
self.fc3 = nn.Linear(hidden_dim, output_dim)
self.relu = nn.ReLU()
self.softmax = nn.Softmax(dim=1)
def forward(self, x):
x = self.relu(self.fc1(x))
x = self.relu(self.fc2(x))
x = self.fc3(x)
return self.softmax(x)
def prepare_supervised_data(df, window_size=1):
"""
构造监督学习数据,因子信号作为特征,三分类的标签作为目标。
"""
df = add_technical_indicators(df)
# 计算 next_return 并生成标签(0: short, 1: neutral, 2: buy)
df['next_return'] = df['c'].pct_change(1).shift(-1)
df.dropna(inplace=True)
# 生成三类标签
df['label'] = int(1) # 默认为平仓(neutral)
df.loc[df['next_return'] > 0.001, 'label'] = int(2) # 买入信号
df.loc[df['next_return'] < -0.001, 'label'] = int(0) # 卖出信号
# 因子信号作为特征
features = [col for col in df.columns if col not in ['next_return', 'label', 'Unnamed: 0', 'confirm']]
X = df[features]
y = df['label']
return X, y, df
# 用于评估精度和召回率的函数
def calculate_precision_recall(y_true, y_pred, label):
precision = precision_score(y_true, y_pred, labels=[label], average='micro')
recall = recall_score(y_true, y_pred, labels=[label], average='micro')
return precision, recall
# 训练神经网络模型
def train_nn_model(X_train, y_train, X_val, y_val, input_dim, hidden_dim=64, output_dim=3, epochs=20, batch_size=32, model_save_path='mlp_model.pth'):
# Create model, criterion, and optimizer
model = MLP(input_dim=input_dim, hidden_dim=hidden_dim, output_dim=output_dim)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# Convert to PyTorch tensors
X_train_tensor = torch.tensor(X_train.values, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train.values, dtype=torch.long)
X_val_tensor = torch.tensor(X_val.values, dtype=torch.float32)
y_val_tensor = torch.tensor(y_val.values, dtype=torch.long)
# Create TensorDatasets
train_dataset = torch.utils.data.TensorDataset(X_train_tensor, y_train_tensor)
val_dataset = torch.utils.data.TensorDataset(X_val_tensor, y_val_tensor)
# Create DataLoaders
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size)
best_val_loss = float('inf')
patience = 5
patience_counter = 0
# Training loop
for epoch in range(epochs):
model.train()
train_loss = 0.0
# Training phase
for batch_X, batch_y in train_loader:
optimizer.zero_grad()
outputs = model(batch_X)
loss = criterion(outputs, batch_y)
loss.backward()
optimizer.step()
train_loss += loss.item()
# Validation phase
model.eval()
val_loss = 0.0
with torch.no_grad():
for batch_X, batch_y in val_loader:
outputs = model(batch_X)
val_loss += criterion(outputs, batch_y).item()
# Calculate average losses
avg_train_loss = train_loss / len(train_loader)
avg_val_loss = val_loss / len(val_loader)
if (epoch + 1) % 10 == 0:
print(f"Epoch [{epoch+1}/{epochs}], Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}")
# Early stopping check
if avg_val_loss < best_val_loss:
best_val_loss = avg_val_loss
patience_counter = 0
# Save best model
torch.save(model.state_dict(), model_save_path)
print(f"Model saved to {model_save_path}")
else:
patience_counter += 1
if patience_counter >= patience:
print(f"Early stopping triggered after {epoch + 1} epochs")
break
return model
# 评估神经网络模型
def evaluate_nn_model(model, X_test, y_test):
model.eval()
X_test_tensor = torch.tensor(X_test.values, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test.values, dtype=torch.long)
with torch.no_grad():
# 预测类别
outputs = model(X_test_tensor)
_, predicted_labels = torch.max(outputs, 1)
# 计算准确率
accuracy = accuracy_score(y_test, predicted_labels.numpy())
print(f"Accuracy: {accuracy:.4f}")
# 计算 label == 0 和 label == 2 的精确度和召回率
precision_0, recall_0 = calculate_precision_recall(y_test, predicted_labels.numpy(), 0)
precision_2, recall_2 = calculate_precision_recall(y_test, predicted_labels.numpy(), 2)
print(f"Precision for label 0: {precision_0:.4f}, Recall for label 0: {recall_0:.4f}")
print(f"Precision for label 2: {precision_2:.4f}, Recall for label 2: {recall_2:.4f}")
return predicted_labels, accuracy, precision_0, recall_0, precision_2, recall_2
def main_nn_pipeline(file_path, window_size=1, save_dir="ckpt", model_save_path='mlp_model.pth'):
"""
Main function to load data, train NN, evaluate on train, val, and test datasets.
"""
# Load and preprocess the data
df = fetch_and_process_data(file_path)
X, y, df_processed = prepare_supervised_data(df, window_size=window_size)
# First split the data before scaling
train_split = int(len(X) * 0.85)
val_split = int(len(X) * 0.92)
X_train = X[:train_split]
X_val = X[train_split:val_split]
X_test = X[val_split:]
y_train = y[:train_split]
y_val = y[train_split:val_split]
y_test = y[val_split:]
# Fit scaler only on training data and transform all sets
scaler = MinMaxScaler()
# Fit the scaler on the training data and transform it
X_train_scaled = scaler.fit_transform(X_train)
# Transform validation and test sets using the same scaler
X_val_scaled = scaler.transform(X_val)
X_test_scaled = scaler.transform(X_test)
# Save scaler for future use
scaler_path = os.path.join(save_dir, 'scaler.pkl')
joblib.dump(scaler, scaler_path)
print(f"Scaler saved to {scaler_path}")
print("Training NN model...")
model = train_nn_model(
X_train_scaled, y_train,
X_val_scaled, y_val,
X_train.shape[1],
model_save_path=model_save_path
)
print("Evaluating model on train dataset...")
evaluate_nn_model(model, X_train_scaled, y_train)
print("Evaluating model on validation dataset...")
evaluate_nn_model(model, X_val_scaled, y_val)
print("Evaluating model on test dataset...")
evaluate_nn_model(model, X_test_scaled, y_test)
return model, scaler
def load_nn_model(model_save_path):
"""
加载已保存的神经网络模型。
"""
model = MLP(input_dim=5, hidden_dim=64, output_dim=3) # 根据实际数据调整input_dim
model.load_state_dict(torch.load(model_save_path))
print(f"Model loaded from {model_save_path}")
return model
if __name__ == "__main__":
file_path = '/Users/cuiqingli/Desktop/workspace/RLTrading/data/bitcoin_data/sorted_btc_swap_hour.csv'
save_dir = "ckpt"
os.makedirs(save_dir, exist_ok=True)
model_save_path = os.path.join(save_dir, 'mlp_model.pth')
# Train and save the model
main_nn_pipeline(file_path, window_size=1, save_dir=save_dir, model_save_path=model_save_path)
# Load and use the saved model
loaded_model = load_nn_model(model_save_path)
# 可以继续使用 loaded_model 进行预测
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment