Created
November 27, 2024 12:36
-
-
Save tiandiao123/bc96ef15bfe42c95febef003aeccfd51 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import joblib | |
import pandas as pd | |
import numpy as np | |
import torch | |
import torch.nn as nn | |
import torch.optim as optim | |
from sklearn.metrics import precision_score, recall_score, accuracy_score | |
from sklearn.model_selection import train_test_split | |
from sklearn.preprocessing import MinMaxScaler, StandardScaler | |
from rl_trade.rolling_train.trainers.indicator_factory import add_technical_indicators, fetch_and_process_data | |
# 定义一个简单的 MLP 神经网络模型 | |
class MLP(nn.Module): | |
def __init__(self, input_dim, hidden_dim, output_dim): | |
super(MLP, self).__init__() | |
self.fc1 = nn.Linear(input_dim, hidden_dim) | |
self.fc2 = nn.Linear(hidden_dim, hidden_dim) | |
self.fc3 = nn.Linear(hidden_dim, output_dim) | |
self.relu = nn.ReLU() | |
self.softmax = nn.Softmax(dim=1) | |
def forward(self, x): | |
x = self.relu(self.fc1(x)) | |
x = self.relu(self.fc2(x)) | |
x = self.fc3(x) | |
return self.softmax(x) | |
def prepare_supervised_data(df, window_size=1): | |
""" | |
构造监督学习数据,因子信号作为特征,三分类的标签作为目标。 | |
""" | |
df = add_technical_indicators(df) | |
# 计算 next_return 并生成标签(0: short, 1: neutral, 2: buy) | |
df['next_return'] = df['c'].pct_change(1).shift(-1) | |
df.dropna(inplace=True) | |
# 生成三类标签 | |
df['label'] = int(1) # 默认为平仓(neutral) | |
df.loc[df['next_return'] > 0.001, 'label'] = int(2) # 买入信号 | |
df.loc[df['next_return'] < -0.001, 'label'] = int(0) # 卖出信号 | |
# 因子信号作为特征 | |
features = [col for col in df.columns if col not in ['next_return', 'label', 'Unnamed: 0', 'confirm']] | |
X = df[features] | |
y = df['label'] | |
return X, y, df | |
# 用于评估精度和召回率的函数 | |
def calculate_precision_recall(y_true, y_pred, label): | |
precision = precision_score(y_true, y_pred, labels=[label], average='micro') | |
recall = recall_score(y_true, y_pred, labels=[label], average='micro') | |
return precision, recall | |
# 训练神经网络模型 | |
def train_nn_model(X_train, y_train, X_val, y_val, input_dim, hidden_dim=64, output_dim=3, epochs=20, batch_size=32, model_save_path='mlp_model.pth'): | |
# Create model, criterion, and optimizer | |
model = MLP(input_dim=input_dim, hidden_dim=hidden_dim, output_dim=output_dim) | |
criterion = nn.CrossEntropyLoss() | |
optimizer = optim.Adam(model.parameters(), lr=0.001) | |
# Convert to PyTorch tensors | |
X_train_tensor = torch.tensor(X_train.values, dtype=torch.float32) | |
y_train_tensor = torch.tensor(y_train.values, dtype=torch.long) | |
X_val_tensor = torch.tensor(X_val.values, dtype=torch.float32) | |
y_val_tensor = torch.tensor(y_val.values, dtype=torch.long) | |
# Create TensorDatasets | |
train_dataset = torch.utils.data.TensorDataset(X_train_tensor, y_train_tensor) | |
val_dataset = torch.utils.data.TensorDataset(X_val_tensor, y_val_tensor) | |
# Create DataLoaders | |
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True) | |
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size) | |
best_val_loss = float('inf') | |
patience = 5 | |
patience_counter = 0 | |
# Training loop | |
for epoch in range(epochs): | |
model.train() | |
train_loss = 0.0 | |
# Training phase | |
for batch_X, batch_y in train_loader: | |
optimizer.zero_grad() | |
outputs = model(batch_X) | |
loss = criterion(outputs, batch_y) | |
loss.backward() | |
optimizer.step() | |
train_loss += loss.item() | |
# Validation phase | |
model.eval() | |
val_loss = 0.0 | |
with torch.no_grad(): | |
for batch_X, batch_y in val_loader: | |
outputs = model(batch_X) | |
val_loss += criterion(outputs, batch_y).item() | |
# Calculate average losses | |
avg_train_loss = train_loss / len(train_loader) | |
avg_val_loss = val_loss / len(val_loader) | |
if (epoch + 1) % 10 == 0: | |
print(f"Epoch [{epoch+1}/{epochs}], Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}") | |
# Early stopping check | |
if avg_val_loss < best_val_loss: | |
best_val_loss = avg_val_loss | |
patience_counter = 0 | |
# Save best model | |
torch.save(model.state_dict(), model_save_path) | |
print(f"Model saved to {model_save_path}") | |
else: | |
patience_counter += 1 | |
if patience_counter >= patience: | |
print(f"Early stopping triggered after {epoch + 1} epochs") | |
break | |
return model | |
# 评估神经网络模型 | |
def evaluate_nn_model(model, X_test, y_test): | |
model.eval() | |
X_test_tensor = torch.tensor(X_test.values, dtype=torch.float32) | |
y_test_tensor = torch.tensor(y_test.values, dtype=torch.long) | |
with torch.no_grad(): | |
# 预测类别 | |
outputs = model(X_test_tensor) | |
_, predicted_labels = torch.max(outputs, 1) | |
# 计算准确率 | |
accuracy = accuracy_score(y_test, predicted_labels.numpy()) | |
print(f"Accuracy: {accuracy:.4f}") | |
# 计算 label == 0 和 label == 2 的精确度和召回率 | |
precision_0, recall_0 = calculate_precision_recall(y_test, predicted_labels.numpy(), 0) | |
precision_2, recall_2 = calculate_precision_recall(y_test, predicted_labels.numpy(), 2) | |
print(f"Precision for label 0: {precision_0:.4f}, Recall for label 0: {recall_0:.4f}") | |
print(f"Precision for label 2: {precision_2:.4f}, Recall for label 2: {recall_2:.4f}") | |
return predicted_labels, accuracy, precision_0, recall_0, precision_2, recall_2 | |
def main_nn_pipeline(file_path, window_size=1, save_dir="ckpt", model_save_path='mlp_model.pth'): | |
""" | |
Main function to load data, train NN, evaluate on train, val, and test datasets. | |
""" | |
# Load and preprocess the data | |
df = fetch_and_process_data(file_path) | |
X, y, df_processed = prepare_supervised_data(df, window_size=window_size) | |
# First split the data before scaling | |
train_split = int(len(X) * 0.85) | |
val_split = int(len(X) * 0.92) | |
X_train = X[:train_split] | |
X_val = X[train_split:val_split] | |
X_test = X[val_split:] | |
y_train = y[:train_split] | |
y_val = y[train_split:val_split] | |
y_test = y[val_split:] | |
# Fit scaler only on training data and transform all sets | |
scaler = MinMaxScaler() | |
# Fit the scaler on the training data and transform it | |
X_train_scaled = scaler.fit_transform(X_train) | |
# Transform validation and test sets using the same scaler | |
X_val_scaled = scaler.transform(X_val) | |
X_test_scaled = scaler.transform(X_test) | |
# Save scaler for future use | |
scaler_path = os.path.join(save_dir, 'scaler.pkl') | |
joblib.dump(scaler, scaler_path) | |
print(f"Scaler saved to {scaler_path}") | |
print("Training NN model...") | |
model = train_nn_model( | |
X_train_scaled, y_train, | |
X_val_scaled, y_val, | |
X_train.shape[1], | |
model_save_path=model_save_path | |
) | |
print("Evaluating model on train dataset...") | |
evaluate_nn_model(model, X_train_scaled, y_train) | |
print("Evaluating model on validation dataset...") | |
evaluate_nn_model(model, X_val_scaled, y_val) | |
print("Evaluating model on test dataset...") | |
evaluate_nn_model(model, X_test_scaled, y_test) | |
return model, scaler | |
def load_nn_model(model_save_path): | |
""" | |
加载已保存的神经网络模型。 | |
""" | |
model = MLP(input_dim=5, hidden_dim=64, output_dim=3) # 根据实际数据调整input_dim | |
model.load_state_dict(torch.load(model_save_path)) | |
print(f"Model loaded from {model_save_path}") | |
return model | |
if __name__ == "__main__": | |
file_path = '/Users/cuiqingli/Desktop/workspace/RLTrading/data/bitcoin_data/sorted_btc_swap_hour.csv' | |
save_dir = "ckpt" | |
os.makedirs(save_dir, exist_ok=True) | |
model_save_path = os.path.join(save_dir, 'mlp_model.pth') | |
# Train and save the model | |
main_nn_pipeline(file_path, window_size=1, save_dir=save_dir, model_save_path=model_save_path) | |
# Load and use the saved model | |
loaded_model = load_nn_model(model_save_path) | |
# 可以继续使用 loaded_model 进行预测 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment