Skip to content

Instantly share code, notes, and snippets.

@solitarioo1
Created June 12, 2025 06:27
Show Gist options
  • Save solitarioo1/8d231f5edf08fe1c0104d8fd91b4068d to your computer and use it in GitHub Desktop.
Save solitarioo1/8d231f5edf08fe1c0104d8fd91b4068d to your computer and use it in GitHub Desktop.
bot_triangular_arbitraje
# =============================================================================
# API_KEYS.PY - Gestión segura de claves API
# =============================================================================
import os
from cryptography.fernet import Fernet
import base64
import hashlib
class APIKeyManager:
def __init__(self, password="your_secure_password"):
# Generar clave de encriptación basada en contraseña
key = hashlib.sha256(password.encode()).digest()
self.cipher = Fernet(base64.urlsafe_b64encode(key))
def encrypt_keys(self, api_key, api_secret):
"""Encripta y guarda las API keys"""
encrypted_key = self.cipher.encrypt(api_key.encode())
encrypted_secret = self.cipher.encrypt(api_secret.encode())
# Guardar en archivo
with open('keys.enc', 'wb') as f:
f.write(encrypted_key + b'\n' + encrypted_secret)
print("✅ API keys encrypted and saved")
def load_keys(self):
"""Carga y desencripta las API keys"""
try:
with open('keys.enc', 'rb') as f:
lines = f.read().split(b'\n')
api_key = self.cipher.decrypt(lines[0]).decode()
api_secret = self.cipher.decrypt(lines[1]).decode()
return api_key, api_secret
except FileNotFoundError:
print("❌ No encrypted keys found. Use encrypt_keys() first")
return None, None
except Exception as e:
print(f"❌ Error loading keys: {e}")
return None, None
# Uso rápido para primera configuración
def setup_keys():
"""Configuración inicial de API keys"""
print("🔐 API Keys Setup")
api_key = input("Enter your Binance API Key: ")
api_secret = input("Enter your Binance API Secret: ")
password = input("Enter encryption password: ")
manager = APIKeyManager(password)
manager.encrypt_keys(api_key, api_secret)
import time
import logging
from datetime import datetime
from typing import Dict, List, Tuple, Optional
import math
class ArbitrageEngine:
def __init__(self, binance_client, min_profit_threshold=0.008, trade_amount=10.0):
"""
Motor de arbitraje triangular
Args:
binance_client: Cliente Binance conectado
min_profit_threshold: Umbral mínimo de ganancia (0.008 = 0.8%)
trade_amount: Cantidad base para operar en USDT
"""
self.client = binance_client
self.min_profit_threshold = min_profit_threshold
self.trade_amount = trade_amount
# Configuración de fees Binance (0.1% por trade)
self.trading_fee = 0.001
# Definición de rutas triangulares
self.triangular_routes = [
# Ruta 1: USDT → BTC → ETH → USDT
{
'name': 'USDT-BTC-ETH-USDT',
'path': ['BTCUSDT', 'ETHBTC', 'ETHUSDT'],
'sides': ['BUY', 'SELL', 'SELL'],
'base_asset': 'USDT'
},
# Ruta 2: USDT → ETH → BTC → USDT
{
'name': 'USDT-ETH-BTC-USDT',
'path': ['ETHUSDT', 'ETHBTC', 'BTCUSDT'],
'sides': ['BUY', 'BUY', 'SELL'],
'base_asset': 'USDT'
},
# Ruta 3: USDT → BTC → BNB → USDT
{
'name': 'USDT-BTC-BNB-USDT',
'path': ['BTCUSDT', 'BNBBTC', 'BNBUSDT'],
'sides': ['BUY', 'SELL', 'SELL'],
'base_asset': 'USDT'
},
# Ruta 4: USDT → BNB → BTC → USDT
{
'name': 'USDT-BNB-BTC-USDT',
'path': ['BNBUSDT', 'BNBBTC', 'BTCUSDT'],
'sides': ['BUY', 'BUY', 'SELL'],
'base_asset': 'USDT'
},
# Ruta 5: USDT → ETH → BNB → USDT
{
'name': 'USDT-ETH-BNB-USDT',
'path': ['ETHUSDT', 'BNBETH', 'BNBUSDT'],
'sides': ['BUY', 'SELL', 'SELL'],
'base_asset': 'USDT'
},
# Ruta 6: USDT → BNB → ETH → USDT
{
'name': 'USDT-BNB-ETH-USDT',
'path': ['BNBUSDT', 'BNBETH', 'ETHUSDT'],
'sides': ['BUY', 'BUY', 'SELL'],
'base_asset': 'USDT'
}
]
# Logging
self.logger = logging.getLogger(__name__)
# Estadísticas
self.opportunities_found = 0
self.trades_executed = 0
self.total_profit = 0.0
self.last_opportunity_time = None
def calculate_triangular_arbitrage(self, prices: Dict[str, float]) -> List[Dict]:
"""
Calcula oportunidades de arbitraje triangular
Args:
prices: Dict con precios actuales {symbol: price}
Returns:
Lista de oportunidades ordenadas por rentabilidad
"""
opportunities = []
# Verificar que tengamos todos los precios necesarios
required_symbols = set()
for route in self.triangular_routes:
required_symbols.update(route['path'])
missing_symbols = required_symbols - set(prices.keys())
if missing_symbols:
self.logger.warning(f"Missing prices for: {missing_symbols}")
return opportunities
# Calcular cada ruta triangular
for route in self.triangular_routes:
opportunity = self._calculate_route_profit(route, prices)
if opportunity and opportunity['profit_percentage'] > self.min_profit_threshold:
opportunities.append(opportunity)
self.opportunities_found += 1
self.last_opportunity_time = datetime.now()
# Ordenar por rentabilidad descendente
opportunities.sort(key=lambda x: x['profit_percentage'], reverse=True)
return opportunities
def _calculate_route_profit(self, route: Dict, prices: Dict[str, float]) -> Optional[Dict]:
"""
Calcula la rentabilidad de una ruta específica
Args:
route: Definición de la ruta triangular
prices: Precios actuales
Returns:
Dict con detalles de la oportunidad o None si no es rentable
"""
try:
path = route['path']
sides = route['sides']
# Simular la secuencia de trades
amount = self.trade_amount
# Trade 1
symbol1 = path[0]
side1 = sides[0]
price1 = prices[symbol1]
if side1 == 'BUY':
# Compramos el activo base
amount_after_fee = amount * (1 - self.trading_fee)
amount1 = amount_after_fee / price1
else:
# Vendemos el activo base
amount_after_fee = amount * (1 - self.trading_fee)
amount1 = amount_after_fee * price1
# Trade 2
symbol2 = path[1]
side2 = sides[1]
price2 = prices[symbol2]
if side2 == 'BUY':
amount_after_fee = amount1 * (1 - self.trading_fee)
amount2 = amount_after_fee / price2
else:
amount_after_fee = amount1 * (1 - self.trading_fee)
amount2 = amount_after_fee * price2
# Trade 3
symbol3 = path[2]
side3 = sides[2]
price3 = prices[symbol3]
if side3 == 'BUY':
final_amount_after_fee = amount2 * (1 - self.trading_fee)
final_amount = final_amount_after_fee / price3
else:
final_amount_after_fee = amount2 * (1 - self.trading_fee)
final_amount = final_amount_after_fee * price3
# Calcular ganancia
profit = final_amount - self.trade_amount
profit_percentage = profit / self.trade_amount
# Solo retornar si es rentable
if profit_percentage > 0:
return {
'route_name': route['name'],
'path': path,
'sides': sides,
'initial_amount': self.trade_amount,
'final_amount': final_amount,
'profit': profit,
'profit_percentage': profit_percentage,
'prices_used': {
path[0]: price1,
path[1]: price2,
path[2]: price3
},
'estimated_execution_time': self._estimate_execution_time(),
'timestamp': datetime.now()
}
return None
except Exception as e:
self.logger.error(f"Error calculating route {route['name']}: {e}")
return None
def _estimate_execution_time(self) -> float:
"""Estima tiempo de ejecución en segundos"""
# Estimación conservadora: 3 órdenes de mercado = ~2-3 segundos
return 2.5
def validate_opportunity(self, opportunity: Dict) -> bool:
"""
Valida si una oportunidad es ejecutable
Args:
opportunity: Oportunidad calculada
Returns:
True si es válida para ejecutar
"""
try:
# Verificar que los precios no sean muy antiguos
time_since_calc = (datetime.now() - opportunity['timestamp']).total_seconds()
if time_since_calc > 5: # Más de 5 segundos = precios obsoletos
self.logger.warning("Opportunity too old, prices may have changed")
return False
# Verificar saldos disponibles
balance = self.client.get_balance('USDT')
if not balance or balance['free'] < self.trade_amount:
self.logger.warning(f"Insufficient USDT balance: {balance['free'] if balance else 0}")
return False
# Verificar que el cliente esté conectado
if not self.client.is_websocket_connected():
self.logger.warning("WebSocket not connected")
return False
# Verificar liquidez en order books (básico)
for symbol in opportunity['path']:
order_book = self.client.get_order_book(symbol, limit=5)
if not order_book or not order_book.get('bids') or not order_book.get('asks'):
self.logger.warning(f"No liquidity data for {symbol}")
return False
return True
except Exception as e:
self.logger.error(f"Error validating opportunity: {e}")
return False
def execute_arbitrage(self, opportunity: Dict) -> Dict:
"""
Ejecuta una oportunidad de arbitraje
Args:
opportunity: Oportunidad validada
Returns:
Resultado de la ejecución
"""
execution_result = {
'success': False,
'opportunity': opportunity,
'orders': [],
'actual_profit': 0.0,
'error': None,
'execution_time': 0.0,
'timestamp': datetime.now()
}
start_time = time.time()
try:
self.logger.info(f"🚀 Executing arbitrage: {opportunity['route_name']}")
self.logger.info(f"Expected profit: {opportunity['profit_percentage']*100:.3f}% (${opportunity['profit']:.4f})")
path = opportunity['path']
sides = opportunity['sides']
# Obtener saldos iniciales
initial_balances = {}
for symbol in ['USDT', 'BTC', 'ETH', 'BNB']:
balance = self.client.get_balance(symbol)
initial_balances[symbol] = balance['free'] if balance else 0.0
# Ejecutar secuencia de órdenes
current_amount = self.trade_amount
for i, (symbol, side) in enumerate(zip(path, sides)):
self.logger.info(f"Order {i+1}/3: {side} {symbol} amount={current_amount:.6f}")
# Calcular cantidad exacta basada en saldo actual
if i == 0:
# Primera orden: usar cantidad base
quantity = self._calculate_order_quantity(symbol, side, current_amount)
else:
# Órdenes siguientes: usar saldo disponible del activo
base_asset = self._get_base_asset(symbol)
balance = self.client.get_balance(base_asset)
available = balance['free'] if balance else 0.0
quantity = self._calculate_order_quantity(symbol, side, available, use_balance=True)
if quantity <= 0:
raise Exception(f"Invalid quantity for {symbol}: {quantity}")
# Ejecutar orden
order = self.client.place_market_order(symbol, side, quantity)
if not order:
raise Exception(f"Failed to place order: {symbol} {side} {quantity}")
execution_result['orders'].append(order)
# Pequeña pausa entre órdenes
time.sleep(0.1)
# Calcular ganancia real
time.sleep(1) # Esperar que se actualicen los balances
final_balances = {}
for symbol in ['USDT', 'ETH', 'BTC', 'BNB']:
balance = self.client.get_balance(symbol)
final_balances[symbol] = balance['free'] if balance else 0.0
# Calcular profit real en USDT
usdt_change = final_balances['USDT'] - initial_balances['USDT']
execution_result['actual_profit'] = usdt_change
execution_result['success'] = True
execution_result['initial_balances'] = initial_balances
execution_result['final_balances'] = final_balances
self.trades_executed += 1
self.total_profit += usdt_change
self.logger.info(f"✅ Arbitrage completed! Actual profit: ${usdt_change:.4f}")
except Exception as e:
execution_result['error'] = str(e)
self.logger.error(f"❌ Arbitrage execution failed: {e}")
execution_result['execution_time'] = time.time() - start_time
return execution_result
def _calculate_order_quantity(self, symbol: str, side: str, amount: float, use_balance: bool = False) -> float:
"""Calcula la cantidad exacta para una order"""
try:
symbol_info = self.client.get_symbol_info(symbol)
if not symbol_info:
return 0.0
# Obtener filtros del símbolo
lot_size_filter = None
min_notional_filter = None
for f in symbol_info['filters']:
if f['filterType'] == 'LOT_SIZE':
lot_size_filter = f
elif f['filterType'] == 'MIN_NOTIONAL':
min_notional_filter = f
if not lot_size_filter:
return 0.0
step_size = float(lot_size_filter['stepSize'])
min_qty = float(lot_size_filter['minQty'])
# Calcular cantidad base
if use_balance:
# Usar saldo disponible
quantity = amount * 0.999 # Dejar un pequeño buffer
else:
# Primera orden: calcular basado en precio
current_price = self.client.get_live_prices().get(symbol)
if not current_price:
return 0.0
if side == 'BUY':
quantity = amount / current_price
else:
quantity = amount
# Ajustar al step size
quantity = math.floor(quantity / step_size) * step_size
# Verificar cantidad mínima
if quantity < min_qty:
return 0.0
return quantity
except Exception as e:
self.logger.error(f"Error calculating quantity for {symbol}: {e}")
return 0.0
def _get_base_asset(self, symbol: str) -> str:
"""Obtiene el activo base de un símbolo"""
if symbol.endswith('USDT'):
return symbol[:-4]
elif symbol.endswith('BTC'):
return symbol[:-3]
elif symbol.endswith('ETH'):
return symbol[:-3]
else:
return symbol[:3] # Fallback
def get_statistics(self) -> Dict:
"""Obtiene estadísticas del motor de arbitraje"""
return {
'opportunities_found': self.opportunities_found,
'trades_executed': self.trades_executed,
'total_profit': self.total_profit,
'success_rate': self.trades_executed / max(self.opportunities_found, 1) * 100,
'last_opportunity': self.last_opportunity_time.isoformat() if self.last_opportunity_time else None,
'min_profit_threshold': self.min_profit_threshold * 100,
'trade_amount': self.trade_amount
}
def scan_for_opportunities(self) -> List[Dict]:
"""Escanea oportunidades usando precios live del WebSocket"""
prices = self.client.get_live_prices()
if not prices:
self.logger.warning("No live prices available")
return []
return self.calculate_triangular_arbitrage(prices)
# Ejemplo de uso
if __name__ == "__main__":
# Este sería el uso típico junto con BinanceClient
print("ArbitrageEngine ready for integration with BinanceClient")
print("Features implemented:")
print("✅ 6 triangular routes calculation")
print("✅ Profit validation with fees")
print("✅ Order execution logic")
print("✅ Real-time opportunity scanning")
print("✅ Risk management validations")
import websocket
import json
import threading
import time
import requests
import hmac
import hashlib
from urllib.parse import urlencode
from datetime import datetime
import logging
class BinanceClient:
def __init__(self, api_key, api_secret, testnet=True):
self.api_key = api_key
self.api_secret = api_secret
self.testnet = testnet
# URLs base
if testnet:
self.base_url = "https://testnet.binance.vision/api"
self.ws_url = "wss://testnet.binance.vision/ws"
else:
self.base_url = "https://api.binance.com/api"
self.ws_url = "wss://stream.binance.com:9443/ws"
# Variables de estado
self.ws = None
self.prices = {}
self.is_connected = False
self.reconnect_attempts = 0
self.max_reconnect = 5
# Pares a monitorear para arbitraje triangular
self.symbols = [
'BTCUSDT', 'ETHUSDT', 'BNBUSDT', # Base pairs
'ETHBTC', 'BNBBTC', 'BNBETH' # Cross pairs
]
# Setup logging
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
def _generate_signature(self, params):
"""Genera firma HMAC para requests autenticados"""
query_string = urlencode(params)
return hmac.new(
self.api_secret.encode('utf-8'),
query_string.encode('utf-8'),
hashlib.sha256
).hexdigest()
def _make_request(self, method, endpoint, params=None, signed=False):
"""Hace request HTTP a Binance API"""
url = f"{self.base_url}{endpoint}"
headers = {'X-MBX-APIKEY': self.api_key}
if params is None:
params = {}
if signed:
params['timestamp'] = int(time.time() * 1000)
params['signature'] = self._generate_signature(params)
try:
if method == 'GET':
response = requests.get(url, headers=headers, params=params, timeout=10)
elif method == 'POST':
response = requests.post(url, headers=headers, data=params, timeout=10)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
self.logger.error(f"API Request error: {e}")
return None
def get_account_info(self):
"""Obtiene información de la cuenta"""
return self._make_request('GET', '/v3/account', signed=True)
def get_balance(self, asset='USDT'):
"""Obtiene balance de un activo específico"""
account_info = self.get_account_info()
if account_info:
for balance in account_info['balances']:
if balance['asset'] == asset:
return {
'free': float(balance['free']),
'locked': float(balance['locked']),
'total': float(balance['free']) + float(balance['locked'])
}
return None
def place_market_order(self, symbol, side, quantity):
"""Coloca orden de mercado"""
params = {
'symbol': symbol,
'side': side, # 'BUY' or 'SELL'
'type': 'MARKET',
'quantity': quantity
}
result = self._make_request('POST', '/v3/order', params, signed=True)
if result:
self.logger.info(f"Order placed: {symbol} {side} {quantity}")
return result
def get_symbol_info(self, symbol):
"""Obtiene información del símbolo (filtros, precisión, etc.)"""
exchange_info = self._make_request('GET', '/v3/exchangeInfo')
if exchange_info:
for s in exchange_info['symbols']:
if s['symbol'] == symbol:
return s
return None
def get_current_prices(self):
"""Obtiene precios actuales de todos los símbolos monitoreados"""
current_prices = {}
for symbol in self.symbols:
ticker = self._make_request('GET', '/v3/ticker/price', {'symbol': symbol})
if ticker:
current_prices[symbol] = float(ticker['price'])
return current_prices
def on_message(self, ws, message):
"""Callback para mensajes WebSocket"""
try:
data = json.loads(message)
# Si es un ticker individual
if 's' in data and 'c' in data:
symbol = data['s']
price = float(data['c'])
self.prices[symbol] = price
# Log cada 100 updates para no saturar
if hasattr(self, 'update_count'):
self.update_count += 1
else:
self.update_count = 1
if self.update_count % 100 == 0:
self.logger.info(f"Price update #{self.update_count}: {symbol}={price}")
# Si es un stream múltiple
elif 'stream' in data:
stream_data = data['data']
if 's' in stream_data and 'c' in stream_data:
symbol = stream_data['s']
price = float(stream_data['c'])
self.prices[symbol] = price
except json.JSONDecodeError as e:
self.logger.error(f"JSON decode error: {e}")
except Exception as e:
self.logger.error(f"Message processing error: {e}")
def on_error(self, ws, error):
"""Callback para errores WebSocket"""
self.logger.error(f"WebSocket error: {error}")
self.is_connected = False
def on_close(self, ws, close_status_code, close_msg):
"""Callback para cierre de WebSocket"""
self.logger.warning("WebSocket connection closed")
self.is_connected = False
# Intentar reconectar
if self.reconnect_attempts < self.max_reconnect:
self.reconnect_attempts += 1
self.logger.info(f"Attempting reconnect #{self.reconnect_attempts}")
time.sleep(5)
self.connect_websocket()
else:
self.logger.error("Max reconnection attempts reached")
def on_open(self, ws):
"""Callback para apertura de WebSocket"""
self.logger.info("WebSocket connection opened")
self.is_connected = True
self.reconnect_attempts = 0
# Suscribirse a streams de precios
streams = [f"{symbol.lower()}@ticker" for symbol in self.symbols]
subscribe_msg = {
"method": "SUBSCRIBE",
"params": streams,
"id": 1
}
ws.send(json.dumps(subscribe_msg))
self.logger.info(f"Subscribed to {len(streams)} price streams")
def connect_websocket(self):
"""Conecta WebSocket para precios en tiempo real"""
try:
websocket.enableTrace(False) # Desactivar debug trace
self.ws = websocket.WebSocketApp(
self.ws_url,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close,
on_open=self.on_open
)
# Ejecutar en thread separado
wst = threading.Thread(target=self.ws.run_forever)
wst.daemon = True
wst.start()
# Esperar conexión
timeout = 10
while not self.is_connected and timeout > 0:
time.sleep(0.5)
timeout -= 0.5
if self.is_connected:
self.logger.info("Successfully connected to Binance WebSocket")
return True
else:
self.logger.error("Failed to connect to WebSocket")
return False
except Exception as e:
self.logger.error(f"WebSocket connection error: {e}")
return False
def disconnect(self):
"""Desconecta WebSocket"""
if self.ws:
self.ws.close()
self.is_connected = False
self.logger.info("WebSocket disconnected")
def get_live_prices(self):
"""Retorna precios actuales del WebSocket"""
return self.prices.copy()
def is_websocket_connected(self):
"""Verifica si WebSocket está conectado"""
return self.is_connected
def get_order_book(self, symbol, limit=5):
"""Obtiene libro de órdenes para análisis de liquidez"""
params = {'symbol': symbol, 'limit': limit}
return self._make_request('GET', '/v3/depth', params)
# Ejemplo de uso y testing
if __name__ == "__main__":
# IMPORTANTE: Usar tus propias API keys
API_KEY = "your_api_key_here"
API_SECRET = "your_api_secret_here"
# Crear cliente (testnet=True para pruebas)
client = BinanceClient(API_KEY, API_SECRET, testnet=True)
# Conectar WebSocket
print("Connecting to Binance WebSocket...")
if client.connect_websocket():
print("✅ Connected successfully!")
# Monitorear precios por 30 segundos
start_time = time.time()
while time.time() - start_time < 30:
prices = client.get_live_prices()
if prices:
print(f"\n🔄 Live Prices ({len(prices)} symbols):")
for symbol, price in prices.items():
print(f" {symbol}: ${price:,.4f}")
time.sleep(2)
# Obtener información de cuenta
print("\n📊 Account Info:")
balance = client.get_balance('USDT')
if balance:
print(f" USDT Balance: ${balance['total']:.2f}")
else:
print("❌ Failed to connect")
# Desconectar
client.disconnect()
print("🔌 Disconnected")
# =============================================================================
# ORDER_MANAGER.PY - Gestor avanzado de órdenes
# =============================================================================
import time
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional
class OrderManager:
def __init__(self, binance_client):
self.client = binance_client
self.active_orders = {}
self.order_history = []
self.max_orders_per_minute = 50
self.order_timestamps = []
self.logger = logging.getLogger(__name__)
def place_order_with_retry(self, symbol, side, quantity, order_type='MARKET', retries=3):
"""Coloca orden con reintentos automáticos"""
for attempt in range(retries):
try:
# Verificar rate limiting
if not self._check_rate_limit():
self.logger.warning("Rate limit exceeded, waiting...")
time.sleep(2)
continue
# Validar parámetros
if not self._validate_order_params(symbol, side, quantity):
return None
# Colocar orden
order = self.client.place_market_order(symbol, side, quantity)
if order:
# Registrar orden
self._register_order(order)
self.logger.info(f"✅ Order placed: {symbol} {side} {quantity}")
return order
else:
self.logger.warning(f"⚠️ Order failed, attempt {attempt + 1}/{retries}")
except Exception as e:
self.logger.error(f"❌ Order error (attempt {attempt + 1}): {e}")
if attempt < retries - 1:
time.sleep(1)
return None
def _check_rate_limit(self):
"""Verifica límites de API calls"""
now = datetime.now()
minute_ago = now - timedelta(minutes=1)
# Limpiar timestamps antiguos
self.order_timestamps = [ts for ts in self.order_timestamps if ts > minute_ago]
return len(self.order_timestamps) < self.max_orders_per_minute
def _validate_order_params(self, symbol, side, quantity):
"""Valida parámetros de orden"""
if quantity <= 0:
self.logger.error(f"Invalid quantity: {quantity}")
return False
if side not in ['BUY', 'SELL']:
self.logger.error(f"Invalid side: {side}")
return False
return True
def _register_order(self, order):
"""Registra orden en tracking"""
self.order_timestamps.append(datetime.now())
self.order_history.append({
'order': order,
'timestamp': datetime.now(),
'status': 'FILLED'
})
def get_order_statistics(self):
"""Estadísticas de órdenes"""
return {
'total_orders': len(self.order_history),
'orders_last_hour': len([o for o in self.order_history
if o['timestamp'] > datetime.now() - timedelta(hours=1)]),
'success_rate': 100.0 if self.order_history else 0.0
}
# ===================================================================
# utils/logger.py
import logging
import os
from datetime import datetime
from logging.handlers import RotatingFileHandler
class BotLogger:
"""Sistema de logging centralizado para el bot"""
def __init__(self, log_level="INFO", log_to_file=True, log_file="data/bot.log"):
self.log_level = getattr(logging, log_level.upper())
self.log_to_file = log_to_file
self.log_file = log_file
# Crear directorio si no existe
os.makedirs(os.path.dirname(log_file), exist_ok=True)
# Configurar logger principal
self.setup_logger()
# Loggers especializados
self.main_logger = logging.getLogger('bot.main')
self.trade_logger = logging.getLogger('bot.trades')
self.risk_logger = logging.getLogger('bot.risk')
self.websocket_logger = logging.getLogger('bot.websocket')
self.performance_logger = logging.getLogger('bot.performance')
def setup_logger(self):
"""Configura el sistema de logging"""
# Formatter
formatter = logging.Formatter(
'%(asctime)s | %(name)-15s | %(levelname)-8s | %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# Root logger
root_logger = logging.getLogger()
root_logger.setLevel(self.log_level)
# Console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(self.log_level)
console_handler.setFormatter(formatter)
root_logger.addHandler(console_handler)
# File handler
if self.log_to_file:
file_handler = RotatingFileHandler(
self.log_file,
maxBytes=10*1024*1024, # 10MB
backupCount=5
)
file_handler.setLevel(self.log_level)
file_handler.setFormatter(formatter)
root_logger.addHandler(file_handler)
# Configurar loggers específicos para menos verbosidad
logging.getLogger('websocket').setLevel(logging.WARNING)
logging.getLogger('urllib3').setLevel(logging.WARNING)
logging.getLogger('requests').setLevel(logging.WARNING)
def log_trade_opportunity(self, opportunity: Dict):
"""Log específico para oportunidades de trading"""
self.trade_logger.info(
f"OPPORTUNITY | {opportunity['route_name']} | "
f"Profit: {opportunity['profit_percentage']*100:.3f}% "
f"(${opportunity['profit']:.4f}) | "
f"Amount: ${opportunity['initial_amount']:.2f}"
)
def log_trade_execution(self, trade_id: str, opportunity: Dict, result: Dict):
"""Log específico para ejecución de trades"""
if result['success']:
self.trade_logger.info(
f"TRADE_SUCCESS | {trade_id} | {opportunity['route_name']} | "
f"Expected: ${opportunity['profit']:.4f} | "
f"Actual: ${result['actual_profit']:.4f} | "
f"Time: {result['execution_time']:.2f}s"
)
else:
self.trade_logger.error(
f"TRADE_FAILED | {trade_id} | {opportunity['route_name']} | "
f"Error: {result['error']} | "
f"Time: {result['execution_time']:.2f}s"
)
def log_risk_event(self, event_type: str, message: str):
"""Log específico para eventos de riesgo"""
self.risk_logger.warning(f"RISK_EVENT | {event_type} | {message}")
def log_performance_metrics(self, metrics: Dict):
"""Log específico para métricas de performance"""
self.performance_logger.info(
f"PERFORMANCE | Trades: {metrics['total_trades']} | "
f"Success Rate: {metrics['success_rate']:.1f}% | "
f"Total Profit: ${metrics['total_profit']:.4f} | "
f"ROI: {metrics['roi_today']:.2f}%"
)
def log_websocket_event(self, event: str, details: str = ""):
"""Log específico para eventos WebSocket"""
self.websocket_logger.info(f"WS_EVENT | {event} | {details}")
# =============================================================================
# MAIN.PY - Ejecutor principal del bot
# =============================================================================
import time
import logging
import schedule
from datetime import datetime
import signal
import sys
# Importar componentes (asumiendo que están en archivos separados)
# from binance_client import BinanceClient
# from arbitrage_engine import ArbitrageEngine
# from api_keys import APIKeyManager
class TriangularArbitrageBot:
def __init__(self):
self.setup_logging()
self.running = False
self.client = None
self.engine = None
self.order_manager = None
self.risk_manager = None
self.calculator = None
# Estadísticas
self.start_time = None
self.opportunities_scanned = 0
self.trades_executed = 0
def setup_logging(self):
"""Configura sistema de logging"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('arbitrage_bot.log'),
logging.StreamHandler(sys.stdout)
]
)
self.logger = logging.getLogger(__name__)
def initialize_components(self):
"""Inicializa todos los componentes del bot"""
try:
# Cargar API keys
key_manager = APIKeyManager("your_password")
api_key, api_secret = key_manager.load_keys()
if not api_key or not api_secret:
self.logger.error("❌ Failed to load API keys")
return False
# Inicializar cliente Binance
self.client = BinanceClient(api_key, api_secret, testnet=True)
# Conectar WebSocket
if not self.client.connect_websocket():
self.logger.error("❌ Failed to connect WebSocket")
return False
# Inicializar componentes
self.engine = ArbitrageEngine(self.client, min_profit_threshold=0.008, trade_amount=10.0)
self.order_manager = OrderManager(self.client)
self.risk_manager = RiskManager(max_daily_loss=5.0, max_position_size=15.0)
self.calculator = ArbitrageCalculator()
self.logger.info("✅ All components initialized successfully")
return True
except Exception as e:
self.logger.error(f"❌ Initialization failed: {e}")
return False
def run_trading_cycle(self):
"""Ejecuta un ciclo completo de trading"""
try:
# Verificar conexión
if not self.client.is_websocket_connected():
self.logger.warning("⚠️ WebSocket disconnected, attempting reconnect...")
self.client.connect_websocket()
return
# Escanear oportunidades
opportunities = self.engine.scan_for_opportunities()
self.opportunities_scanned += len(opportunities)
if not opportunities:
return
# Procesar mejor oportunidad
best_opportunity = opportunities[0]
self.logger.info(f"🎯 Best opportunity: {best_opportunity['route_name']}")
self.logger.info(f"💰 Expected profit: {best_opportunity['profit_percentage']*100:.3f}%")
# Validar riesgos
if not self.risk_manager.check_risk_limits(best_opportunity['initial_amount']):
self.logger.warning("⚠️ Risk limits exceeded, skipping trade")
return
# Validar oportunidad
if not self.engine.validate_opportunity(best_opportunity):
self.logger.warning("⚠️ Opportunity validation failed")
return
# Ejecutar arbitraje
result = self.engine.execute_arbitrage(best_opportunity)
if result['success']:
profit = result['actual_profit']
self.risk_manager.record_trade(profit)
self.trades_executed += 1
self.logger.info(f"✅ Trade executed successfully!")
self.logger.info(f"💵 Actual profit: ${profit:.4f}")
else:
self.logger.error(f"❌ Trade failed: {result['error']}")
self.risk_manager.record_trade(-0.5) # Penalizar trades fallidos
except Exception as e:
self.logger.error(f"❌ Trading cycle error: {e}")
def print_status(self):
"""Imprime estado actual del bot"""
if not self.running:
return
uptime = datetime.now() - self.start_time if self.start_time else timedelta(0)
print("\n" + "="*50)
print("🤖 TRIANGULAR ARBITRAGE BOT STATUS")
print("="*50)
print(f"⏰ Uptime: {uptime}")
print(f"🔍 Opportunities scanned: {self.opportunities_scanned}")
print(f"📈 Trades executed: {self.trades_executed}")
if self.risk_manager:
risk_status = self.risk_manager.get_risk_status()
print(f"💰 Daily P&L: ${risk_status['daily_pnl']:.2f}")
print(f"🛡️ Risk status: {'🚨 STOP' if risk_status['emergency_stop'] else '✅ OK'}")
if self.client:
prices = self.client.get_live_prices()
print(f"📊 Live prices: {len(prices)} symbols")
print("="*50)
def start(self):
"""Inicia el bot"""
self.logger.info("🚀 Starting Triangular Arbitrage Bot...")
# Manejar señales para cierre limpio
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
# Inicializar componentes
if not self.initialize_components():
self.logger.error("❌ Failed to initialize, exiting...")
return
# Configurar tareas programadas
schedule.every(10).minutes.do(self.print_status)
schedule.every().day.at("00:00").do(self.risk_manager.reset_daily_limits)
self.running = True
self.start_time = datetime.now()
self.logger.info("✅ Bot started successfully!")
self.logger.info("🔄 Entering main trading loop...")
# Loop principal
try:
while self.running:
# Ejecutar ciclo de trading
self.run_trading_cycle()
# Ejecutar tareas programadas
schedule.run_pending()
# Pausa corta
time.sleep(1)
except KeyboardInterrupt:
self.logger.info("🛑 Shutdown requested by user")
except Exception as e:
self.logger.error(f"❌ Fatal error: {e}")
finally:
self.stop()
def stop(self):
"""Detiene el bot limpiamente"""
self.logger.info("🛑 Stopping bot...")
self.running = False
if self.client:
self.client.disconnect()
# Imprimir estadísticas finales
if self.start_time:
total_time = datetime.now() - self.start_time
self.logger.info(f"📊 Final stats - Uptime: {total_time}")
self.logger.info(f"📊 Opportunities: {self.opportunities_scanned}")
self.logger.info(f"📊 Trades: {self.trades_executed}")
self.logger.info("✅ Bot stopped cleanly")
def _signal_handler(self, signum, frame):
"""Maneja señales del sistema"""
self.logger.info(f"🛑 Received signal {signum}")
self.running = False
# =============================================================================
# PUNTO DE ENTRADA PRINCIPAL
# =============================================================================
if __name__ == "__main__":
print("🤖 Triangular Arbitrage Bot")
print("="*50)
# Configuración inicial si es primera vez
setup_choice = input("First time setup? (y/n): ").lower()
if setup_choice == 'y':
setup_keys()
print("✅ Setup complete! Run the bot again to start trading.")
sys.exit(0)
# Confirmar modo
mode = input("Run in testnet mode? (y/n): ").lower()
testnet = mode == 'y'
print(f"🚀 Starting bot in {'TESTNET' if testnet else 'LIVE'} mode...")
print("Press Ctrl+C to stop")
# Iniciar bot
bot = TriangularArbitrageBot()
bot.start()
# utils/monitor.py
import time
import json
from datetime import datetime
from typing import Dict, List
import threading
class PerformanceMonitor:
"""Monitor de performance en tiempo real"""
def __init__(self, update_interval=300): # 5 minutos
self.update_interval = update_interval
self.metrics_history = []
self.is_running = False
self.monitor_thread = None
# Métricas actuales
self.current_metrics = {
'start_time': datetime.now().isoformat(),
'opportunities_detected': 0,
'opportunities_per_hour': 0.0,
'trades_executed': 0,
'trades_successful': 0,
'total_profit': 0.0,
'average_execution_time': 0.0,
'websocket_uptime': 0.0,
'last_opportunity_time': None,
'last_trade_time': None
}
def start_monitoring(self):
"""Inicia el monitoreo de performance"""
self.is_running = True
self.monitor_thread = threading.Thread(target=self._monitor_loop)
self.monitor_thread.daemon = True
self.monitor_thread.start()
def stop_monitoring(self):
"""Detiene el monitoreo"""
self.is_running = False
if self.monitor_thread:
self.monitor_thread.join()
def _monitor_loop(self):
"""Loop principal de monitoreo"""
while self.is_running:
time.sleep(self.update_interval)
self._update_metrics()
self._save_metrics_snapshot()
def _update_metrics(self):
"""Actualiza métricas calculadas"""
now = datetime.now()
start_time = datetime.fromisoformat(self.current_metrics['start_time'])
hours_running = (now - start_time).total_seconds() / 3600
if hours_running > 0:
self.current_metrics['opportunities_per_hour'] = (
self.current_metrics['opportunities_detected'] / hours_running
)
def _save_metrics_snapshot(self):
"""Guarda snapshot de métricas actuales"""
snapshot = self.current_metrics.copy()
snapshot['timestamp'] = datetime.now().isoformat()
self.metrics_history.append(snapshot)
# Mantener solo las últimas 24 horas de datos
cutoff_time = datetime.now().timestamp() - 86400
self.metrics_history = [
m for m in self.metrics_history
if datetime.fromisoformat(m['timestamp']).timestamp() > cutoff_time
]
def record_opportunity(self, opportunity: Dict):
"""Registra una oportunidad detectada"""
self.current_metrics['opportunities_detected'] += 1
self.current_metrics['last_opportunity_time'] = datetime.now().isoformat()
def record_trade(self, trade_result: Dict):
"""Registra un trade ejecutado"""
self.current_metrics['trades_executed'] += 1
self.current_metrics['last_trade_time'] = datetime.now().isoformat()
if trade_result['success']:
self.current_metrics['trades_successful'] += 1
self.current_metrics['total_profit'] += trade_result.get('actual_profit', 0)
# Actualizar tiempo promedio de ejecución
execution_time = trade_result.get('execution_time', 0)
if execution_time > 0:
current_avg = self.current_metrics['average_execution_time']
total_trades = self.current_metrics['trades_executed']
self.current_metrics['average_execution_time'] = (
(current_avg * (total_trades - 1) + execution_time) / total_trades
)
def get_current_metrics(self) -> Dict:
"""Obtiene métricas actuales"""
return self.current_metrics.copy()
def get_metrics_history(self) -> List[Dict]:
"""Obtiene historial de métricas"""
return self.metrics_history.copy()
def get_performance_summary(self) -> Dict:
"""Obtiene resumen de performance"""
metrics = self.current_metrics
success_rate = 0.0
if metrics['trades_executed'] > 0:
success_rate = (metrics['trades_successful'] / metrics['trades_executed']) * 100
return {
'uptime_hours': (datetime.now() - datetime.fromisoformat(metrics['start_time'])).total_seconds() / 3600,
'opportunities_detected': metrics['opportunities_detected'],
'opportunities_per_hour': metrics['opportunities_per_hour'],
'trades_executed': metrics['trades_executed'],
'success_rate': success_rate,
'total_profit': metrics['total_profit'],
'average_execution_time': metrics['average_execution_time'],
'profit_per_hour': metrics['total_profit'] / max((datetime.now() - datetime.fromisoformat(metrics['start_time'])).total_seconds() / 3600, 1)
}
# =============================================================================
# ORDER_MANAGER.PY - Gestor avanzado de órdenes
# =============================================================================
import time
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional
class OrderManager:
def __init__(self, binance_client):
self.client = binance_client
self.active_orders = {}
self.order_history = []
self.max_orders_per_minute = 50
self.order_timestamps = []
self.logger = logging.getLogger(__name__)
def place_order_with_retry(self, symbol, side, quantity, order_type='MARKET', retries=3):
"""Coloca orden con reintentos automáticos"""
for attempt in range(retries):
try:
# Verificar rate limiting
if not self._check_rate_limit():
self.logger.warning("Rate limit exceeded, waiting...")
time.sleep(2)
continue
# Validar parámetros
if not self._validate_order_params(symbol, side, quantity):
return None
# Colocar orden
order = self.client.place_market_order(symbol, side, quantity)
if order:
# Registrar orden
self._register_order(order)
self.logger.info(f"✅ Order placed: {symbol} {side} {quantity}")
return order
else:
self.logger.warning(f"⚠️ Order failed, attempt {attempt + 1}/{retries}")
except Exception as e:
self.logger.error(f"❌ Order error (attempt {attempt + 1}): {e}")
if attempt < retries - 1:
time.sleep(1)
return None
def _check_rate_limit(self):
"""Verifica límites de API calls"""
now = datetime.now()
minute_ago = now - timedelta(minutes=1)
# Limpiar timestamps antiguos
self.order_timestamps = [ts for ts in self.order_timestamps if ts > minute_ago]
return len(self.order_timestamps) < self.max_orders_per_minute
def _validate_order_params(self, symbol, side, quantity):
"""Valida parámetros de orden"""
if quantity <= 0:
self.logger.error(f"Invalid quantity: {quantity}")
return False
if side not in ['BUY', 'SELL']:
self.logger.error(f"Invalid side: {side}")
return False
return True
def _register_order(self, order):
"""Registra orden en tracking"""
self.order_timestamps.append(datetime.now())
self.order_history.append({
'order': order,
'timestamp': datetime.now(),
'status': 'FILLED'
})
def get_order_statistics(self):
"""Estadísticas de órdenes"""
return {
'total_orders': len(self.order_history),
'orders_last_hour': len([o for o in self.order_history
if o['timestamp'] > datetime.now() - timedelta(hours=1)]),
'success_rate': 100.0 if self.order_history else 0.0
}
import time
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import json
import os
class RiskManager:
def __init__(self, initial_capital=50.0, max_daily_loss=5.0, max_trade_size=10.0):
"""
Gestor de riesgos para bot de arbitraje
Args:
initial_capital: Capital inicial en USDT
max_daily_loss: Pérdida máxima diaria permitida
max_trade_size: Tamaño máximo por trade
"""
self.initial_capital = initial_capital
self.max_daily_loss = max_daily_loss
self.max_trade_size = max_trade_size
# Límites operativos
self.max_concurrent_trades = 1
self.max_trades_per_hour = 10
self.max_trades_per_day = 50
self.min_balance_buffer = 5.0 # USDT mínimo siempre disponible
# Tracking de operaciones
self.daily_trades = []
self.daily_pnl = 0.0
self.current_trades = 0
self.consecutive_losses = 0
self.max_consecutive_losses = 3
# Control de API rate limiting
self.api_requests_count = 0
self.api_requests_window_start = time.time()
self.max_api_requests_per_minute = 1000 # Límite conservador
# Emergency stop
self.emergency_stop_active = False
self.emergency_reasons = []
# Archivos de estado
self.state_file = "data/risk_state.json"
self.ensure_data_directory()
self.load_state()
# Logging
self.logger = logging.getLogger(__name__)
def ensure_data_directory(self):
"""Crea directorio data si no existe"""
os.makedirs("data", exist_ok=True)
def load_state(self):
"""Carga estado previo del archivo"""
try:
if os.path.exists(self.state_file):
with open(self.state_file, 'r') as f:
state = json.load(f)
# Cargar solo datos del día actual
today = datetime.now().date()
if state.get('date') == today.isoformat():
self.daily_trades = state.get('daily_trades', [])
self.daily_pnl = state.get('daily_pnl', 0.0)
self.consecutive_losses = state.get('consecutive_losses', 0)
self.logger.info(f"Loaded risk state: {len(self.daily_trades)} trades today, PnL: ${self.daily_pnl:.2f}")
except Exception as e:
self.logger.error(f"Error loading risk state: {e}")
def save_state(self):
"""Guarda estado actual"""
try:
state = {
'date': datetime.now().date().isoformat(),
'daily_trades': self.daily_trades,
'daily_pnl': self.daily_pnl,
'consecutive_losses': self.consecutive_losses,
'last_update': datetime.now().isoformat()
}
with open(self.state_file, 'w') as f:
json.dump(state, f, indent=2)
except Exception as e:
self.logger.error(f"Error saving risk state: {e}")
def can_trade(self, trade_amount: float) -> Dict[str, any]:
"""
Verifica si se puede ejecutar un trade
Args:
trade_amount: Cantidad a operar
Returns:
Dict con resultado y razones
"""
result = {
'allowed': True,
'reasons': [],
'warnings': []
}
# Verificar emergency stop
if self.emergency_stop_active:
result['allowed'] = False
result['reasons'].append(f"Emergency stop active: {', '.join(self.emergency_reasons)}")
return result
# Verificar pérdida diaria máxima
if self.daily_pnl <= -self.max_daily_loss:
result['allowed'] = False
result['reasons'].append(f"Daily loss limit reached: ${self.daily_pnl:.2f} <= -${self.max_daily_loss}")
self.activate_emergency_stop("Daily loss limit exceeded")
return result
# Verificar tamaño de trade
if trade_amount > self.max_trade_size:
result['allowed'] = False
result['reasons'].append(f"Trade size too large: ${trade_amount} > ${self.max_trade_size}")
return result
# Verificar trades concurrentes
if self.current_trades >= self.max_concurrent_trades:
result['allowed'] = False
result['reasons'].append(f"Max concurrent trades reached: {self.current_trades}")
return result
# Verificar límites diarios/horarios
now = datetime.now()
today_trades = [t for t in self.daily_trades if datetime.fromisoformat(t['timestamp']).date() == now.date()]
hour_trades = [t for t in today_trades if (now - datetime.fromisoformat(t['timestamp'])).total_seconds() < 3600]
if len(today_trades) >= self.max_trades_per_day:
result['allowed'] = False
result['reasons'].append(f"Daily trade limit reached: {len(today_trades)}")
return result
if len(hour_trades) >= self.max_trades_per_hour:
result['allowed'] = False
result['reasons'].append(f"Hourly trade limit reached: {len(hour_trades)}")
return result
# Verificar pérdidas consecutivas
if self.consecutive_losses >= self.max_consecutive_losses:
result['allowed'] = False
result['reasons'].append(f"Max consecutive losses reached: {self.consecutive_losses}")
self.activate_emergency_stop("Too many consecutive losses")
return result
# Verificar API rate limiting
if not self.check_api_rate_limit():
result['allowed'] = False
result['reasons'].append("API rate limit exceeded")
return result
# Warnings (no bloquean pero alertan)
if self.daily_pnl < -self.max_daily_loss * 0.7:
result['warnings'].append(f"Approaching daily loss limit: ${self.daily_pnl:.2f}")
if self.consecutive_losses >= 2:
result['warnings'].append(f"Consecutive losses: {self.consecutive_losses}")
return result
def validate_balance(self, binance_client, required_amount: float) -> Dict[str, any]:
"""
Valida que haya suficiente balance para operar
Args:
binance_client: Cliente Binance
required_amount: Cantidad requerida en USDT
Returns:
Dict con resultado de validación
"""
result = {
'sufficient': False,
'available': 0.0,
'required': required_amount,
'buffer_respected': False
}
try:
balance = binance_client.get_balance('USDT')
if not balance:
result['error'] = "Could not fetch balance"
return result
available = balance['free']
result['available'] = available
# Verificar balance suficiente
if available >= required_amount:
result['sufficient'] = True
# Verificar que se respete el buffer mínimo
if available - required_amount >= self.min_balance_buffer:
result['buffer_respected'] = True
else:
self.logger.warning(f"Balance buffer at risk: {available - required_amount:.2f} < {self.min_balance_buffer}")
except Exception as e:
result['error'] = str(e)
self.logger.error(f"Error validating balance: {e}")
return result
def register_trade_start(self, opportunity: Dict) -> str:
"""
Registra inicio de trade
Args:
opportunity: Oportunidad que se va a ejecutar
Returns:
Trade ID
"""
trade_id = f"trade_{int(time.time() * 1000)}"
trade_record = {
'id': trade_id,
'timestamp': datetime.now().isoformat(),
'opportunity': opportunity['route_name'],
'expected_profit': opportunity['profit'],
'trade_amount': opportunity['initial_amount'],
'status': 'STARTED'
}
self.daily_trades.append(trade_record)
self.current_trades += 1
self.logger.info(f"Trade started: {trade_id} - {opportunity['route_name']}")
self.save_state()
return trade_id
def register_trade_completion(self, trade_id: str, execution_result: Dict):
"""
Registra finalización de trade
Args:
trade_id: ID del trade
execution_result: Resultado de la ejecución
"""
# Buscar y actualizar el trade
for trade in self.daily_trades:
if trade['id'] == trade_id:
trade['status'] = 'COMPLETED' if execution_result['success'] else 'FAILED'
trade['actual_profit'] = execution_result.get('actual_profit', 0.0)
trade['execution_time'] = execution_result.get('execution_time', 0.0)
trade['error'] = execution_result.get('error')
break
self.current_trades = max(0, self.current_trades - 1)
# Actualizar PnL diario
actual_profit = execution_result.get('actual_profit', 0.0)
self.daily_pnl += actual_profit
# Tracking de pérdidas consecutivas
if actual_profit < 0:
self.consecutive_losses += 1
else:
self.consecutive_losses = 0
self.logger.info(f"Trade completed: {trade_id} - Profit: ${actual_profit:.4f}")
self.save_state()
# Verificar si activar emergency stop
if actual_profit < -2.0: # Pérdida mayor a $2
self.logger.warning("Large loss detected, monitoring closely")
if self.daily_pnl <= -self.max_daily_loss:
self.activate_emergency_stop("Daily loss limit exceeded")
def check_api_rate_limit(self) -> bool:
"""Verifica límites de API rate limiting"""
now = time.time()
# Reset contador cada minuto
if now - self.api_requests_window_start > 60:
self.api_requests_count = 0
self.api_requests_window_start = now
# Verificar límite
if self.api_requests_count >= self.max_api_requests_per_minute:
self.logger.warning("API rate limit reached")
return False
self.api_requests_count += 1
return True
def activate_emergency_stop(self, reason: str):
"""Activa parada de emergencia"""
self.emergency_stop_active = True
if reason not in self.emergency_reasons:
self.emergency_reasons.append(reason)
self.logger.error(f"🚨 EMERGENCY STOP ACTIVATED: {reason}")
self.save_state()
def deactivate_emergency_stop(self):
"""Desactiva parada de emergencia (manual)"""
self.emergency_stop_active = False
self.emergency_reasons = []
self.logger.info("Emergency stop deactivated")
self.save_state()
def reset_daily_limits(self):
"""Reset límites diarios (llamar a medianoche)"""
self.daily_trades = []
self.daily_pnl = 0.0
self.consecutive_losses = 0
self.logger.info("Daily limits reset")
self.save_state()
def get_risk_status(self) -> Dict:
"""Obtiene estado actual de riesgos"""
now = datetime.now()
today_trades = [t for t in self.daily_trades if datetime.fromisoformat(t['timestamp']).date() == now.date()]
return {
'emergency_stop_active': self.emergency_stop_active,
'emergency_reasons': self.emergency_reasons,
'daily_pnl': self.daily_pnl,
'daily_trades_count': len(today_trades),
'consecutive_losses': self.consecutive_losses,
'current_trades': self.current_trades,
'daily_loss_remaining': self.max_daily_loss + self.daily_pnl,
'trades_remaining_today': self.max_trades_per_day - len(today_trades),
'can_trade': not self.emergency_stop_active and self.daily_pnl > -self.max_daily_loss
}
def get_performance_report(self) -> Dict:
"""Genera reporte de performance"""
now = datetime.now()
today_trades = [t for t in self.daily_trades if datetime.fromisoformat(t['timestamp']).date() == now.date()]
successful_trades = [t for t in today_trades if t.get('status') == 'COMPLETED' and t.get('actual_profit', 0) > 0]
failed_trades = [t for t in today_trades if t.get('status') == 'FAILED' or t.get('actual_profit', 0) <= 0]
total_profit = sum(t.get('actual_profit', 0) for t in today_trades)
return {
'total_trades': len(today_trades),
'successful_trades': len(successful_trades),
'failed_trades': len(failed_trades),
'success_rate': len(successful_trades) / max(len(today_trades), 1) * 100,
'total_profit': total_profit,
'average_profit_per_trade': total_profit / max(len(today_trades), 1),
'largest_win': max((t.get('actual_profit', 0) for t in today_trades), default=0),
'largest_loss': min((t.get('actual_profit', 0) for t in today_trades), default=0),
'current_drawdown': min(0, self.daily_pnl),
'roi_today': (total_profit / self.initial_capital) * 100
}
# Ejemplo de uso
if __name__ == "__main__":
rm = RiskManager(initial_capital=50.0, max_daily_loss=5.0)
print("RiskManager initialized")
print("Risk Status:", rm.get_risk_status())
print("Performance Report:", rm.get_performance_report())
# config/settings.py
import os
from typing import Dict, List
class BotConfig:
"""Configuración centralizada del bot"""
def __init__(self):
# API Configuration
self.API_KEY = os.getenv('BINANCE_API_KEY', 'your_api_key_here')
self.API_SECRET = os.getenv('BINANCE_API_SECRET', 'your_api_secret_here')
self.USE_TESTNET = os.getenv('USE_TESTNET', 'True').lower() == 'true'
# Trading Configuration
self.INITIAL_CAPITAL = float(os.getenv('INITIAL_CAPITAL', '50.0'))
self.TRADE_AMOUNT = float(os.getenv('TRADE_AMOUNT', '10.0'))
self.MIN_PROFIT_THRESHOLD = float(os.getenv('MIN_PROFIT_THRESHOLD', '0.008')) # 0.8%
# Risk Management
self.MAX_DAILY_LOSS = float(os.getenv('MAX_DAILY_LOSS', '5.0'))
self.MAX_TRADE_SIZE = float(os.getenv('MAX_TRADE_SIZE', '10.0'))
self.MAX_CONCURRENT_TRADES = int(os.getenv('MAX_CONCURRENT_TRADES', '1'))
self.MAX_TRADES_PER_HOUR = int(os.getenv('MAX_TRADES_PER_HOUR', '10'))
self.MAX_TRADES_PER_DAY = int(os.getenv('MAX_TRADES_PER_DAY', '50'))
# Symbols to monitor
self.SYMBOLS = ['BTCUSDT', 'ETHUSDT', 'BNBUSDT', 'ETHBTC', 'BNBBTC', 'BNBETH']
# WebSocket Configuration
self.WS_RECONNECT_ATTEMPTS = int(os.getenv('WS_RECONNECT_ATTEMPTS', '5'))
self.WS_PING_INTERVAL = int(os.getenv('WS_PING_INTERVAL', '30'))
# Logging Configuration
self.LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO')
self.LOG_TO_FILE = os.getenv('LOG_TO_FILE', 'True').lower() == 'true'
self.LOG_FILE_PATH = os.getenv('LOG_FILE_PATH', 'data/bot.log')
# Performance Monitoring
self.ENABLE_PERFORMANCE_TRACKING = True
self.PERFORMANCE_LOG_INTERVAL = 300 # 5 minutes
# Emergency Settings
self.EMERGENCY_STOP_ON_LARGE_LOSS = True
self.LARGE_LOSS_THRESHOLD = 2.0 # $2 loss triggers alert
def validate_config(self) -> List[str]:
"""Valida la configuración y retorna lista de errores"""
errors = []
if not self.API_KEY or self.API_KEY == 'your_api_key_here':
errors.append("API_KEY not configured")
if not self.API_SECRET or self.API_SECRET == 'your_api_secret_here':
errors.append("API_SECRET not configured")
if self.TRADE_AMOUNT > self.INITIAL_CAPITAL:
errors.append("TRADE_AMOUNT cannot be larger than INITIAL_CAPITAL")
if self.MIN_PROFIT_THRESHOLD <= 0:
errors.append("MIN_PROFIT_THRESHOLD must be positive")
if self.MAX_DAILY_LOSS <= 0:
errors.append("MAX_DAILY_LOSS must be positive")
return errors
def get_config_summary(self) -> Dict:
"""Retorna resumen de configuración (sin secrets)"""
return {
'use_testnet': self.USE_TESTNET,
'initial_capital': self.INITIAL_CAPITAL,
'trade_amount': self.TRADE_AMOUNT,
'min_profit_threshold': self.MIN_PROFIT_THRESHOLD * 100, # Como porcentaje
'max_daily_loss': self.MAX_DAILY_LOSS,
'symbols_monitored': len(self.SYMBOLS),
'max_concurrent_trades': self.MAX_CONCURRENT_TRADES
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment