Skip to content

Instantly share code, notes, and snippets.

@danielhamelberg
Last active March 10, 2024 01:54
Show Gist options
  • Select an option

  • Save danielhamelberg/705394acec8736955cd6601176c55eb3 to your computer and use it in GitHub Desktop.

Select an option

Save danielhamelberg/705394acec8736955cd6601176c55eb3 to your computer and use it in GitHub Desktop.
unfinished
import os
import logging
from pathlib import Path
from collections import deque
from random import sample
import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.ticker import FuncFormatter
import seaborn as sns
import tensorflow as tf
from keras.losses import Huber
import gymnasium as gym
from gymnasium.envs.registration import register
from gymnasium import spaces
from stable_baselines3.common.vec_env import DummyVecEnv
from keras import Sequential
from keras.layers import Dense, Dropout
from keras.optimizers import Adam
from keras.regularizers import l2
import talib
from python_bitvavo_api.bitvavo import Bitvavo
from random import sample
from matplotlib.ticker import FuncFormatter
import talib
import CryptoDataHandler
CryptoDataHandler = CryptoDataHandler()
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Set random seeds for reproducibility
np.random.seed(42)
tf.random.set_seed(42)
# Set seaborn style for plots
sns.set_style('whitegrid')
# Define paths
BASE_PATH = Path(__file__).resolve().parent
RESULTS_PATH = BASE_PATH / 'results' / 'trading_bot'
RESULTS_PATH.mkdir(parents=True, exist_ok=True)
# Helper functions
def format_time(t):
m_, s = divmod(t, 60)
h, m = divmod(m_, 60)
return '{:02.0f}:{:02.0f}:{:02.0f}'.format(h, m, s)
# Trading Environment
class MultiCryptoTradingEnvironment(gym.Env):
metadata = {'render.modes': ['human']}
def __init__(self, cryptos, trading_days=365, trading_cost_bps=2e-3, time_cost_bps=2e-4,
initial_cash=10000, lookback_window_size=60, indicators=['SMA', 'EMA', 'RSI', 'ATR'],
use_simulated_data=True):
super(MultiCryptoTradingEnvironment, self).__init__()
self.total_gross_profit = 0.0 # To calculate profit factor
self.total_gross_loss = 0.0 # To calculate profit factor
self.reward_history = [] # For risk adjustment (Sharpe Ratio)
self.cryptos = cryptos
self.num_cryptos = len(cryptos)
self.action_space = spaces.MultiDiscrete([3] * self.num_cryptos) # Hold, Buy, Sell
self.indicators = indicators
self.num_indicators = len(indicators)
self.observation_space = spaces.Box(low=-np.inf, high=np.inf,
shape=(self.num_cryptos, lookback_window_size, 3 + len(indicators)),
dtype=np.float32)
self.initial_cash = initial_cash
self.cash_in_hand = self.initial_cash
self.crypto_holdings = np.zeros(self.num_cryptos)
self.lookback_window_size = lookback_window_size
self.data = None
self.trading_days = trading_days
self.trading_cost_bps = trading_cost_bps
self.time_cost_bps = time_cost_bps
self.previous_portfolio_value = 0.0
self.use_simulated_data = use_simulated_data
self.data_handler = CryptoDataHandler()
self.initialize_environment()
def initialize_environment(self):
try:
logging.info("Initializing environment with %s data.",
"simulated" if self.use_simulated_data else "real historical")
self.data = CryptoDataHandler().generate_data(use_simulated_data=self.use_simulated_data)
except Exception as e:
logging.error(f"Failed to initialize environment: {e}")
raise
def _get_observation(self):
"""
Get the current observation of the environment.
"""
observation = np.zeros((self.num_cryptos, self.lookback_window_size, 3 + len(self.indicators)))
# Fill in the observation with data
for i, crypto in enumerate(self.cryptos):
observation[i, :, 0] = self.data[f'close_{crypto.lower()}'][self.current_step - self.lookback_window_size + 1:self.current_step + 1]
observation[i, :, 1] = self.data[f'volume_{crypto.lower()}'][self.current_step - self.lookback_window_size + 1:self.current_step + 1]
for j, indicator in enumerate(self.indicators):
observation[i, :, j + 2] = self.data[f'{indicator.lower()}_{crypto.lower()}'][self.current_step - self.lookback_window_size + 1:self.current_step + 1]
return observation
def _calculate_portfolio_value(self):
# Placeholder for calculating current portfolio value
# This should include the current value of held cryptos plus any cash
current_portfolio_value = self.cash_in_hand + np.dot(self.crypto_holdings, self._get_current_prices())
return current_portfolio_value
def _get_current_prices(self):
# Placeholder method to get current prices of held cryptos
# Replace with actual logic to fetch current prices
return np.array([10000, 5000]) # Example prices for BTC and ETH
def _calculate_reward(self):
current_portfolio_value = self._calculate_portfolio_value()
profit = current_portfolio_value - self.previous_portfolio_value
trading_costs = self.trading_cost_bps * np.sum(np.abs(self.crypto_holdings - self.previous_crypto_holdings))
time_costs = self.time_cost_bps * np.sum(self.crypto_holdings)
net_profit = profit - trading_costs - time_costs
if net_profit > 0:
self.total_gross_profit += net_profit
else:
self.total_gross_loss += abs(net_profit)
profit_factor = self.total_gross_profit / (self.total_gross_loss + 1) # Avoid division by zero
scaled_reward = net_profit / 1000 # Scale reward for stability
# Apply profit factor to reward
scaled_reward *= profit_factor
# Risk adjustment using a simplified Sharpe Ratio
# Note: For a more accurate Sharpe Ratio, you would need to calculate based on returns over time
if len(self.reward_history) > 0:
mean_reward = np.mean(self.reward_history)
std_reward = np.std(self.reward_history) if np.std(self.reward_history) > 0 else 1 # Avoid division by zero
sharpe_ratio = mean_reward / std_reward
else:
sharpe_ratio = 1 # Default value if no history exists
risk_adjusted_reward = scaled_reward * sharpe_ratio
# Update tracking variables for next step
self.previous_portfolio_value = current_portfolio_value
self.previous_crypto_holdings = np.copy(self.crypto_holdings)
self.reward_history.append(risk_adjusted_reward)
return risk_adjusted_reward
def _perform_action(self, action):
# Placeholder for action execution logic
# This method should update self.crypto_holdings based on the action taken
# Example action execution (simplified, replace with actual logic)
for i, act in enumerate(action):
if act == 1: # Buy action
# Assuming each buy action uses a fixed amount of cash to buy crypto
# Replace with your actual buying logic
amount_to_buy = self.cash_in_hand * 0.1 / self._get_current_prices()[i]
self.crypto_holdings[i] += amount_to_buy
self.cash_in_hand -= amount_to_buy * self._get_current_prices()[i]
elif act == 2: # Sell action
# Selling a fixed proportion of holdings
# Replace with your actual selling logic
amount_to_sell = self.crypto_holdings[i] * 0.1
self.crypto_holdings[i] -= amount_to_sell
self.cash_in_hand += amount_to_sell * self._get_current_prices()[i]
# Note: Ensure cash_in_hand never goes negative in your actual implementation
def _get_current_prices(self):
logging.info(f"Fetching current prices for {self.cryptos}...")
try:
symbols = [f"{crypto}-EUR" for crypto in self.cryptos] # Adjust according to your market pairs
prices = self.data_handler.get_current_prices(symbols)
return np.array(list(prices.values()))
except Exception as e:
logging.error(f"Error fetching current prices: {str(e)}")
def _perform_action(self, action):
"""
Executes the given action (buy, hold, sell) for each cryptocurrency.
"""
current_prices = self._get_current_prices()
for i, act in enumerate(action):
if act == 1: # Buy
# Calculate the amount of crypto to buy with 10% of cash in hand
amount_to_buy = (0.1 * self.cash_in_hand) / current_prices[i]
self.crypto_holdings[i] += amount_to_buy
self.cash_in_hand -= amount_to_buy * current_prices[i]
elif act == 2: # Sell
# Sell 10% of the crypto holdings
amount_to_sell = self.crypto_holdings[i] * 0.1
self.crypto_holdings[i] -= amount_to_sell
self.cash_in_hand += amount_to_sell * current_prices[i]
# Ensure cash_in_hand never goes negative
self.cash_in_hand = max(0, self.cash_in_hand)
def reset(self, **kwargs):
# If kwargs are provided, log them
if kwargs:
logging.info(f"Resetting environment with parameters: {kwargs}")
# if seed is provided, set the seed
if 'seed' in kwargs:
np.random.seed(kwargs['seed'])
self.current_step = 0
self.cash_in_hand = self.initial_cash
self.crypto_holdings = np.zeros(self.num_cryptos)
self.previous_portfolio_value = self._calculate_portfolio_value()
self.total_gross_profit = 0.0
self.total_gross_loss = 0.0
self.reward_history = []
return self._get_observation()
def step(self, action):
if self.current_step >= self.trading_days - 1:
done = True
else:
done = False
self.current_step += 1
# Perform the action
self._perform_action(action)
# Calculate the reward and update the observation
observation, reward = self.update_and_calculate_reward(action)
info = {}
return observation, reward, done, info
# Deep Reinforcement Learning Agent
class EnhancedDDQNAgent:
def __init__(self, state_dim, num_actions, learning_rate=1e-3, gamma=0.99, epsilon_start=1.0,
epsilon_end=0.01, epsilon_decay_steps=500, epsilon_exponential_decay=0.99,
replay_capacity=10000, architecture=[64, 64], l2_reg=1e-6, tau=100, batch_size=64):
self.state_dim = state_dim
self.num_actions = num_actions
self.experience = deque(maxlen=replay_capacity)
self.learning_rate = learning_rate
self.gamma = gamma
self.architecture = architecture
self.l2_reg = l2_reg
self.online_network = self.build_model()
self.target_network = self.build_model(trainable=False)
self.epsilon = epsilon_start
self.epsilon_decay_steps = epsilon_decay_steps
if epsilon_decay_steps > 0:
self.epsilon_decay = (epsilon_start - epsilon_end) / epsilon_decay_steps
else:
self.epsilon_decay = 0
self.epsilon_exponential_decay = epsilon_exponential_decay
self.epsilon_history = []
self.total_steps = self.train_steps = 0
self.episodes = self.episode_length = self.train_episodes = 0
self.steps_per_episode = []
self.episode_reward = 0
self.rewards_history = []
self.batch_size = batch_size
self.tau = tau
self.losses = []
# Removed unused variable self.idx
self.train = True
def build_model(self, trainable=True):
layers = []
n = len(self.architecture)
for i, units in enumerate(self.architecture, 1):
layers.append(Dense(units=units,
input_dim=self.state_dim if i == 1 else None,
activation='relu',
kernel_regularizer=l2(self.l2_reg),
name=f'Dense_{i}',
trainable=trainable))
layers.append(Dropout(0.1))
layers.append(Dense(units=self.num_actions,
trainable=trainable,
name='Output'))
model = Sequential(layers)
model.compile(loss=Huber(delta=1.0),
optimizer=Adam(learning_rate=self.learning_rate))
return model
def memorize_transition(self, state, action, reward, next_state, terminated):
self.experience.append((state, action, reward, next_state, terminated))
def experience_replay(self):
if len(self.experience) < self.batch_size:
return
minibatch = sample(self.experience, self.batch_size)
states = np.array([x[0] for x in minibatch])
actions = np.array([x[1] for x in minibatch])
rewards = np.array([x[2] for x in minibatch])
next_states = np.array([x[3] for x in minibatch])
terminated = np.array([x[4] for x in minibatch])
online_q_values = self.online_network.predict_on_batch(states)
target_q_values = online_q_values.numpy()
next_q_values = self.online_network.predict_on_batch(next_states)
next_q_values_target = self.target_network.predict_on_batch(next_states)
for i in range(self.batch_size):
if terminated[i]:
online_q_values[i][actions[i]] = rewards[i]
else:
online_q_values[i][actions[i]] = rewards[i] + self.gamma * np.amax(next_q_values_target[i])
self.online_network.train_on_batch(states, online_q_values)
self.update_target()
def update_target(self):
self.target_network.set_weights(self.online_network.get_weights())
def epsilon_greedy_policy(self, state):
if np.random.rand() < self.epsilon:
return np.random.choice(self.num_actions)
else:
q_values = self.online_network.predict(state)
return np.argmax(q_values[0])
def main():
# Define hyperparameters
trading_days = 365
trading_cost_bps = 1e-3
time_cost_bps = 1e-4
indicators = ['SMA', 'EMA', 'RSI']
use_simulated_data = False
num_episodes = 1000
# Register the environment
register(
id='multi-crypto-trading-v0',
entry_point='q_learning_for_trading_fixed_v2:MultiCryptoTradingEnvironment',
max_episode_steps=trading_days,
kwargs={'cryptos': ['BTC', 'ETH'], 'trading_days': trading_days, 'trading_cost_bps': trading_cost_bps,
'time_cost_bps': time_cost_bps, 'indicators': indicators, 'use_simulated_data': use_simulated_data}
)
# Initialize the environment and agent
cryptos = ['BTC', 'ETH']
env = DummyVecEnv([lambda: gym.make('multi-crypto-trading-v0',
cryptos=cryptos,
trading_days=trading_days,
trading_cost_bps=trading_cost_bps,
time_cost_bps=time_cost_bps,
indicators=indicators,
use_simulated_data=use_simulated_data)])
max_episode_steps = env.envs[0].spec.max_episode_steps
state_dim = np.prod(env.observation_space.shape)
num_actions = env.action_space.nvec[0]
max_episode_steps = env.envs[0].spec.max_episode_steps
agent = EnhancedDDQNAgent(state_dim=state_dim,
num_actions=num_actions,
learning_rate=0.001,
gamma=0.99,
epsilon_start=1.0,
epsilon_end=0.01,
epsilon_decay_steps=500,
epsilon_exponential_decay=0.99,
replay_capacity=10000,
architecture=[64, 64],
l2_reg=1e-6,
tau=100,
batch_size=64)
# Initialize Bitvavo API
api_key = os.environ.get('BITVAVO_APIKEY')
api_secret = os.environ.get('BITVAVO_APISECRET')
data_handler = CryptoDataHandler()
data_handler.initialize_bitvavo()
# Removed call to non-existent method populate_csv_with_real_data
# Assuming the method to load data is correctly implemented within the CryptoDataHandler class or elsewhere
# Train the agent
start_time = time.time()
for episode in range(num_episodes):
state = env.reset()[0]
terminated = False
while not terminated:
action = [agent.epsilon_greedy_policy(state.reshape(1, -1))]
next_state, reward, terminated, info = env.step(action)[0]
# Removed 'truncated' from unpacking as it's not returned by env.step()
agent.memorize_transition(state, action, reward, next_state, terminated)
state = next_state
agent.experience_replay()
# Removed duplicate call to experience_replay, ensuring it's only called once per condition
if episode % 50 == 0:
total_reward = np.sum(agent.rewards_history[-10:])
logging.info(f"Episode: {episode}, Total Reward: {total_reward}")
agent.update_target()
end_time = time.time()
logging.info(f"Training completed in {format_time(end_time - start_time)}")
env.close()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment