Last active
March 10, 2024 01:54
-
-
Save danielhamelberg/705394acec8736955cd6601176c55eb3 to your computer and use it in GitHub Desktop.
unfinished
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import logging | |
| from pathlib import Path | |
| from collections import deque | |
| from random import sample | |
| import time | |
| import numpy as np | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| from matplotlib.ticker import FuncFormatter | |
| import seaborn as sns | |
| import tensorflow as tf | |
| from keras.losses import Huber | |
| import gymnasium as gym | |
| from gymnasium.envs.registration import register | |
| from gymnasium import spaces | |
| from stable_baselines3.common.vec_env import DummyVecEnv | |
| from keras import Sequential | |
| from keras.layers import Dense, Dropout | |
| from keras.optimizers import Adam | |
| from keras.regularizers import l2 | |
| import talib | |
| from python_bitvavo_api.bitvavo import Bitvavo | |
| from random import sample | |
| from matplotlib.ticker import FuncFormatter | |
| import talib | |
| import CryptoDataHandler | |
| CryptoDataHandler = CryptoDataHandler() | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| # Set random seeds for reproducibility | |
| np.random.seed(42) | |
| tf.random.set_seed(42) | |
| # Set seaborn style for plots | |
| sns.set_style('whitegrid') | |
| # Define paths | |
| BASE_PATH = Path(__file__).resolve().parent | |
| RESULTS_PATH = BASE_PATH / 'results' / 'trading_bot' | |
| RESULTS_PATH.mkdir(parents=True, exist_ok=True) | |
| # Helper functions | |
| def format_time(t): | |
| m_, s = divmod(t, 60) | |
| h, m = divmod(m_, 60) | |
| return '{:02.0f}:{:02.0f}:{:02.0f}'.format(h, m, s) | |
| # Trading Environment | |
| class MultiCryptoTradingEnvironment(gym.Env): | |
| metadata = {'render.modes': ['human']} | |
| def __init__(self, cryptos, trading_days=365, trading_cost_bps=2e-3, time_cost_bps=2e-4, | |
| initial_cash=10000, lookback_window_size=60, indicators=['SMA', 'EMA', 'RSI', 'ATR'], | |
| use_simulated_data=True): | |
| super(MultiCryptoTradingEnvironment, self).__init__() | |
| self.total_gross_profit = 0.0 # To calculate profit factor | |
| self.total_gross_loss = 0.0 # To calculate profit factor | |
| self.reward_history = [] # For risk adjustment (Sharpe Ratio) | |
| self.cryptos = cryptos | |
| self.num_cryptos = len(cryptos) | |
| self.action_space = spaces.MultiDiscrete([3] * self.num_cryptos) # Hold, Buy, Sell | |
| self.indicators = indicators | |
| self.num_indicators = len(indicators) | |
| self.observation_space = spaces.Box(low=-np.inf, high=np.inf, | |
| shape=(self.num_cryptos, lookback_window_size, 3 + len(indicators)), | |
| dtype=np.float32) | |
| self.initial_cash = initial_cash | |
| self.cash_in_hand = self.initial_cash | |
| self.crypto_holdings = np.zeros(self.num_cryptos) | |
| self.lookback_window_size = lookback_window_size | |
| self.data = None | |
| self.trading_days = trading_days | |
| self.trading_cost_bps = trading_cost_bps | |
| self.time_cost_bps = time_cost_bps | |
| self.previous_portfolio_value = 0.0 | |
| self.use_simulated_data = use_simulated_data | |
| self.data_handler = CryptoDataHandler() | |
| self.initialize_environment() | |
| def initialize_environment(self): | |
| try: | |
| logging.info("Initializing environment with %s data.", | |
| "simulated" if self.use_simulated_data else "real historical") | |
| self.data = CryptoDataHandler().generate_data(use_simulated_data=self.use_simulated_data) | |
| except Exception as e: | |
| logging.error(f"Failed to initialize environment: {e}") | |
| raise | |
| def _get_observation(self): | |
| """ | |
| Get the current observation of the environment. | |
| """ | |
| observation = np.zeros((self.num_cryptos, self.lookback_window_size, 3 + len(self.indicators))) | |
| # Fill in the observation with data | |
| for i, crypto in enumerate(self.cryptos): | |
| observation[i, :, 0] = self.data[f'close_{crypto.lower()}'][self.current_step - self.lookback_window_size + 1:self.current_step + 1] | |
| observation[i, :, 1] = self.data[f'volume_{crypto.lower()}'][self.current_step - self.lookback_window_size + 1:self.current_step + 1] | |
| for j, indicator in enumerate(self.indicators): | |
| observation[i, :, j + 2] = self.data[f'{indicator.lower()}_{crypto.lower()}'][self.current_step - self.lookback_window_size + 1:self.current_step + 1] | |
| return observation | |
| def _calculate_portfolio_value(self): | |
| # Placeholder for calculating current portfolio value | |
| # This should include the current value of held cryptos plus any cash | |
| current_portfolio_value = self.cash_in_hand + np.dot(self.crypto_holdings, self._get_current_prices()) | |
| return current_portfolio_value | |
| def _get_current_prices(self): | |
| # Placeholder method to get current prices of held cryptos | |
| # Replace with actual logic to fetch current prices | |
| return np.array([10000, 5000]) # Example prices for BTC and ETH | |
| def _calculate_reward(self): | |
| current_portfolio_value = self._calculate_portfolio_value() | |
| profit = current_portfolio_value - self.previous_portfolio_value | |
| trading_costs = self.trading_cost_bps * np.sum(np.abs(self.crypto_holdings - self.previous_crypto_holdings)) | |
| time_costs = self.time_cost_bps * np.sum(self.crypto_holdings) | |
| net_profit = profit - trading_costs - time_costs | |
| if net_profit > 0: | |
| self.total_gross_profit += net_profit | |
| else: | |
| self.total_gross_loss += abs(net_profit) | |
| profit_factor = self.total_gross_profit / (self.total_gross_loss + 1) # Avoid division by zero | |
| scaled_reward = net_profit / 1000 # Scale reward for stability | |
| # Apply profit factor to reward | |
| scaled_reward *= profit_factor | |
| # Risk adjustment using a simplified Sharpe Ratio | |
| # Note: For a more accurate Sharpe Ratio, you would need to calculate based on returns over time | |
| if len(self.reward_history) > 0: | |
| mean_reward = np.mean(self.reward_history) | |
| std_reward = np.std(self.reward_history) if np.std(self.reward_history) > 0 else 1 # Avoid division by zero | |
| sharpe_ratio = mean_reward / std_reward | |
| else: | |
| sharpe_ratio = 1 # Default value if no history exists | |
| risk_adjusted_reward = scaled_reward * sharpe_ratio | |
| # Update tracking variables for next step | |
| self.previous_portfolio_value = current_portfolio_value | |
| self.previous_crypto_holdings = np.copy(self.crypto_holdings) | |
| self.reward_history.append(risk_adjusted_reward) | |
| return risk_adjusted_reward | |
| def _perform_action(self, action): | |
| # Placeholder for action execution logic | |
| # This method should update self.crypto_holdings based on the action taken | |
| # Example action execution (simplified, replace with actual logic) | |
| for i, act in enumerate(action): | |
| if act == 1: # Buy action | |
| # Assuming each buy action uses a fixed amount of cash to buy crypto | |
| # Replace with your actual buying logic | |
| amount_to_buy = self.cash_in_hand * 0.1 / self._get_current_prices()[i] | |
| self.crypto_holdings[i] += amount_to_buy | |
| self.cash_in_hand -= amount_to_buy * self._get_current_prices()[i] | |
| elif act == 2: # Sell action | |
| # Selling a fixed proportion of holdings | |
| # Replace with your actual selling logic | |
| amount_to_sell = self.crypto_holdings[i] * 0.1 | |
| self.crypto_holdings[i] -= amount_to_sell | |
| self.cash_in_hand += amount_to_sell * self._get_current_prices()[i] | |
| # Note: Ensure cash_in_hand never goes negative in your actual implementation | |
| def _get_current_prices(self): | |
| logging.info(f"Fetching current prices for {self.cryptos}...") | |
| try: | |
| symbols = [f"{crypto}-EUR" for crypto in self.cryptos] # Adjust according to your market pairs | |
| prices = self.data_handler.get_current_prices(symbols) | |
| return np.array(list(prices.values())) | |
| except Exception as e: | |
| logging.error(f"Error fetching current prices: {str(e)}") | |
| def _perform_action(self, action): | |
| """ | |
| Executes the given action (buy, hold, sell) for each cryptocurrency. | |
| """ | |
| current_prices = self._get_current_prices() | |
| for i, act in enumerate(action): | |
| if act == 1: # Buy | |
| # Calculate the amount of crypto to buy with 10% of cash in hand | |
| amount_to_buy = (0.1 * self.cash_in_hand) / current_prices[i] | |
| self.crypto_holdings[i] += amount_to_buy | |
| self.cash_in_hand -= amount_to_buy * current_prices[i] | |
| elif act == 2: # Sell | |
| # Sell 10% of the crypto holdings | |
| amount_to_sell = self.crypto_holdings[i] * 0.1 | |
| self.crypto_holdings[i] -= amount_to_sell | |
| self.cash_in_hand += amount_to_sell * current_prices[i] | |
| # Ensure cash_in_hand never goes negative | |
| self.cash_in_hand = max(0, self.cash_in_hand) | |
| def reset(self, **kwargs): | |
| # If kwargs are provided, log them | |
| if kwargs: | |
| logging.info(f"Resetting environment with parameters: {kwargs}") | |
| # if seed is provided, set the seed | |
| if 'seed' in kwargs: | |
| np.random.seed(kwargs['seed']) | |
| self.current_step = 0 | |
| self.cash_in_hand = self.initial_cash | |
| self.crypto_holdings = np.zeros(self.num_cryptos) | |
| self.previous_portfolio_value = self._calculate_portfolio_value() | |
| self.total_gross_profit = 0.0 | |
| self.total_gross_loss = 0.0 | |
| self.reward_history = [] | |
| return self._get_observation() | |
| def step(self, action): | |
| if self.current_step >= self.trading_days - 1: | |
| done = True | |
| else: | |
| done = False | |
| self.current_step += 1 | |
| # Perform the action | |
| self._perform_action(action) | |
| # Calculate the reward and update the observation | |
| observation, reward = self.update_and_calculate_reward(action) | |
| info = {} | |
| return observation, reward, done, info | |
| # Deep Reinforcement Learning Agent | |
| class EnhancedDDQNAgent: | |
| def __init__(self, state_dim, num_actions, learning_rate=1e-3, gamma=0.99, epsilon_start=1.0, | |
| epsilon_end=0.01, epsilon_decay_steps=500, epsilon_exponential_decay=0.99, | |
| replay_capacity=10000, architecture=[64, 64], l2_reg=1e-6, tau=100, batch_size=64): | |
| self.state_dim = state_dim | |
| self.num_actions = num_actions | |
| self.experience = deque(maxlen=replay_capacity) | |
| self.learning_rate = learning_rate | |
| self.gamma = gamma | |
| self.architecture = architecture | |
| self.l2_reg = l2_reg | |
| self.online_network = self.build_model() | |
| self.target_network = self.build_model(trainable=False) | |
| self.epsilon = epsilon_start | |
| self.epsilon_decay_steps = epsilon_decay_steps | |
| if epsilon_decay_steps > 0: | |
| self.epsilon_decay = (epsilon_start - epsilon_end) / epsilon_decay_steps | |
| else: | |
| self.epsilon_decay = 0 | |
| self.epsilon_exponential_decay = epsilon_exponential_decay | |
| self.epsilon_history = [] | |
| self.total_steps = self.train_steps = 0 | |
| self.episodes = self.episode_length = self.train_episodes = 0 | |
| self.steps_per_episode = [] | |
| self.episode_reward = 0 | |
| self.rewards_history = [] | |
| self.batch_size = batch_size | |
| self.tau = tau | |
| self.losses = [] | |
| # Removed unused variable self.idx | |
| self.train = True | |
| def build_model(self, trainable=True): | |
| layers = [] | |
| n = len(self.architecture) | |
| for i, units in enumerate(self.architecture, 1): | |
| layers.append(Dense(units=units, | |
| input_dim=self.state_dim if i == 1 else None, | |
| activation='relu', | |
| kernel_regularizer=l2(self.l2_reg), | |
| name=f'Dense_{i}', | |
| trainable=trainable)) | |
| layers.append(Dropout(0.1)) | |
| layers.append(Dense(units=self.num_actions, | |
| trainable=trainable, | |
| name='Output')) | |
| model = Sequential(layers) | |
| model.compile(loss=Huber(delta=1.0), | |
| optimizer=Adam(learning_rate=self.learning_rate)) | |
| return model | |
| def memorize_transition(self, state, action, reward, next_state, terminated): | |
| self.experience.append((state, action, reward, next_state, terminated)) | |
| def experience_replay(self): | |
| if len(self.experience) < self.batch_size: | |
| return | |
| minibatch = sample(self.experience, self.batch_size) | |
| states = np.array([x[0] for x in minibatch]) | |
| actions = np.array([x[1] for x in minibatch]) | |
| rewards = np.array([x[2] for x in minibatch]) | |
| next_states = np.array([x[3] for x in minibatch]) | |
| terminated = np.array([x[4] for x in minibatch]) | |
| online_q_values = self.online_network.predict_on_batch(states) | |
| target_q_values = online_q_values.numpy() | |
| next_q_values = self.online_network.predict_on_batch(next_states) | |
| next_q_values_target = self.target_network.predict_on_batch(next_states) | |
| for i in range(self.batch_size): | |
| if terminated[i]: | |
| online_q_values[i][actions[i]] = rewards[i] | |
| else: | |
| online_q_values[i][actions[i]] = rewards[i] + self.gamma * np.amax(next_q_values_target[i]) | |
| self.online_network.train_on_batch(states, online_q_values) | |
| self.update_target() | |
| def update_target(self): | |
| self.target_network.set_weights(self.online_network.get_weights()) | |
| def epsilon_greedy_policy(self, state): | |
| if np.random.rand() < self.epsilon: | |
| return np.random.choice(self.num_actions) | |
| else: | |
| q_values = self.online_network.predict(state) | |
| return np.argmax(q_values[0]) | |
| def main(): | |
| # Define hyperparameters | |
| trading_days = 365 | |
| trading_cost_bps = 1e-3 | |
| time_cost_bps = 1e-4 | |
| indicators = ['SMA', 'EMA', 'RSI'] | |
| use_simulated_data = False | |
| num_episodes = 1000 | |
| # Register the environment | |
| register( | |
| id='multi-crypto-trading-v0', | |
| entry_point='q_learning_for_trading_fixed_v2:MultiCryptoTradingEnvironment', | |
| max_episode_steps=trading_days, | |
| kwargs={'cryptos': ['BTC', 'ETH'], 'trading_days': trading_days, 'trading_cost_bps': trading_cost_bps, | |
| 'time_cost_bps': time_cost_bps, 'indicators': indicators, 'use_simulated_data': use_simulated_data} | |
| ) | |
| # Initialize the environment and agent | |
| cryptos = ['BTC', 'ETH'] | |
| env = DummyVecEnv([lambda: gym.make('multi-crypto-trading-v0', | |
| cryptos=cryptos, | |
| trading_days=trading_days, | |
| trading_cost_bps=trading_cost_bps, | |
| time_cost_bps=time_cost_bps, | |
| indicators=indicators, | |
| use_simulated_data=use_simulated_data)]) | |
| max_episode_steps = env.envs[0].spec.max_episode_steps | |
| state_dim = np.prod(env.observation_space.shape) | |
| num_actions = env.action_space.nvec[0] | |
| max_episode_steps = env.envs[0].spec.max_episode_steps | |
| agent = EnhancedDDQNAgent(state_dim=state_dim, | |
| num_actions=num_actions, | |
| learning_rate=0.001, | |
| gamma=0.99, | |
| epsilon_start=1.0, | |
| epsilon_end=0.01, | |
| epsilon_decay_steps=500, | |
| epsilon_exponential_decay=0.99, | |
| replay_capacity=10000, | |
| architecture=[64, 64], | |
| l2_reg=1e-6, | |
| tau=100, | |
| batch_size=64) | |
| # Initialize Bitvavo API | |
| api_key = os.environ.get('BITVAVO_APIKEY') | |
| api_secret = os.environ.get('BITVAVO_APISECRET') | |
| data_handler = CryptoDataHandler() | |
| data_handler.initialize_bitvavo() | |
| # Removed call to non-existent method populate_csv_with_real_data | |
| # Assuming the method to load data is correctly implemented within the CryptoDataHandler class or elsewhere | |
| # Train the agent | |
| start_time = time.time() | |
| for episode in range(num_episodes): | |
| state = env.reset()[0] | |
| terminated = False | |
| while not terminated: | |
| action = [agent.epsilon_greedy_policy(state.reshape(1, -1))] | |
| next_state, reward, terminated, info = env.step(action)[0] | |
| # Removed 'truncated' from unpacking as it's not returned by env.step() | |
| agent.memorize_transition(state, action, reward, next_state, terminated) | |
| state = next_state | |
| agent.experience_replay() | |
| # Removed duplicate call to experience_replay, ensuring it's only called once per condition | |
| if episode % 50 == 0: | |
| total_reward = np.sum(agent.rewards_history[-10:]) | |
| logging.info(f"Episode: {episode}, Total Reward: {total_reward}") | |
| agent.update_target() | |
| end_time = time.time() | |
| logging.info(f"Training completed in {format_time(end_time - start_time)}") | |
| env.close() | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment