Skip to content

Instantly share code, notes, and snippets.

@Kurry
Created October 29, 2024 02:55
Show Gist options
  • Save Kurry/09863b696ef62003f0a6d7898e0b03ca to your computer and use it in GitHub Desktop.
Save Kurry/09863b696ef62003f0a6d7898e0b03ca to your computer and use it in GitHub Desktop.
Advanced Options Trading Analytics Tool A comprehensive Python tool for analyzing options trading performance with features including: - Trade metrics calculation (ROI, efficiency, risk-adjusted returns) - Precise timezone-aware datetime handling - Advanced statistical aggregation with performance scoring - Robust CSV parsing and data validation…
import pandas as pd
from datetime import datetime
from io import StringIO
import pytz
from dataclasses import dataclass
from typing import Optional, List, Tuple, Dict
from collections import defaultdict
import logging
from decimal import Decimal, ROUND_HALF_UP
import numpy as np
# Configure logging with more detailed format
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
@dataclass
class OptionsTradeMetrics:
symbol: str
trade_duration: float # in hours
entry_price: float
exit_price: float
best_exit_price: float
position_mfe: float
position_mae: float
actual_roi: float
potential_roi: float
lost_alpha: float
efficiency_score: float
risk_adjusted_return: float
position_mfe_mae_ratio: float
def __repr__(self):
return (f"OptionsTradeMetrics(symbol={self.symbol}, trade_duration={self.trade_duration:.2f}h, "
f"entry_price={self.entry_price:.4f}, exit_price={self.exit_price:.4f}, "
f"best_exit_price={self.best_exit_price:.4f}, position_mfe={self.position_mfe:.4f}, "
f"position_mae={self.position_mae:.4f}, actual_roi={self.actual_roi:.2f}%, "
f"potential_roi={self.potential_roi:.2f}%, lost_alpha={self.lost_alpha:.2f}, "
f"efficiency_score={self.efficiency_score:.2f}%, "
f"risk_adjusted_return={self.risk_adjusted_return:.4f}, "
f"position_mfe_mae_ratio={self.position_mfe_mae_ratio:.4f}")
def parse_datetime_with_timezone(date_str: str, time_str: str, default_tz: pytz.timezone) -> Optional[datetime]:
"""
Parse date and time strings with proper timezone handling.
Parameters:
- date_str: Date string in YYYY-MM-DD format
- time_str: Time string that may include timezone information
- default_tz: Default timezone to use if none is specified
Returns:
- Timezone-aware datetime object in UTC, or None if parsing fails
"""
try:
# Remove timezone abbreviations and clean the strings
time_str = time_str.replace('EST', '').replace('EDT', '').replace('UTC', '').strip()
datetime_str = f"{date_str} {time_str}"
# Try parsing with various formats
formats = [
'%Y-%m-%d %H:%M:%S',
'%Y-%m-%d %H:%M:%S.%f',
'%Y-%m-%d %H:%M'
]
parsed_dt = None
for fmt in formats:
try:
parsed_dt = datetime.strptime(datetime_str, fmt)
break
except ValueError:
continue
if parsed_dt is None:
raise ValueError(f"Could not parse datetime: {datetime_str}")
# Localize the datetime and convert to UTC
localized_dt = default_tz.localize(parsed_dt)
return localized_dt.astimezone(pytz.UTC)
except Exception as e:
logging.error(f"Error parsing datetime {date_str} {time_str}: {str(e)}")
return None
def calculate_aggregate_statistics(patterns: List[OptionsTradeMetrics]) -> pd.DataFrame:
"""
Calculate comprehensive aggregate statistics by symbol with enhanced metrics
and proper error handling.
Parameters:
- patterns: List of OptionsTradeMetrics instances
Returns:
- DataFrame containing detailed aggregate statistics
"""
if not patterns:
logging.warning("No patterns provided for aggregate statistics calculation")
return pd.DataFrame()
try:
# Initialize statistics dictionary
stats_dict: Dict[str, Dict[str, list]] = defaultdict(lambda: defaultdict(list))
# Collect metrics by symbol
for pattern in patterns:
symbol = pattern.symbol
# Basic metrics
stats_dict[symbol]['Duration'].append(pattern.trade_duration)
stats_dict[symbol]['Entry Price'].append(pattern.entry_price)
stats_dict[symbol]['Exit Price'].append(pattern.exit_price)
stats_dict[symbol]['Best Exit Price'].append(pattern.best_exit_price)
# Performance metrics
stats_dict[symbol]['Lost Alpha'].append(pattern.lost_alpha)
stats_dict[symbol]['Efficiency Score'].append(pattern.efficiency_score)
stats_dict[symbol]['Risk-Adjusted Return'].append(pattern.risk_adjusted_return)
stats_dict[symbol]['Actual ROI'].append(pattern.actual_roi)
stats_dict[symbol]['Potential ROI'].append(pattern.potential_roi)
# Risk metrics
stats_dict[symbol]['MFE'].append(pattern.position_mfe)
stats_dict[symbol]['MAE'].append(pattern.position_mae)
stats_dict[symbol]['MFE/MAE Ratio'].append(pattern.position_mfe_mae_ratio)
# Process statistics for each symbol
results = []
for symbol, metrics in stats_dict.items():
try:
# Calculate basic statistics
trade_count = len(metrics['Duration'])
avg_duration = np.mean(metrics['Duration'])
# Calculate performance metrics
total_lost_alpha = sum(metrics['Lost Alpha'])
avg_efficiency = np.mean(metrics['Efficiency Score'])
avg_risk_adjusted = np.mean(metrics['Risk-Adjusted Return'])
# Calculate ROI statistics
avg_actual_roi = np.mean(metrics['Actual ROI'])
avg_potential_roi = np.mean(metrics['Potential ROI'])
roi_capture = (avg_actual_roi / avg_potential_roi * 100) if avg_potential_roi != 0 else 0
# Calculate risk metrics
avg_mfe_mae_ratio = np.mean(metrics['MFE/MAE Ratio'])
max_drawdown = min(metrics['MAE'])
best_gain = max(metrics['MFE'])
# Calculate price metrics
avg_entry = np.mean(metrics['Entry Price'])
avg_exit = np.mean(metrics['Exit Price'])
price_improvement = ((avg_exit - avg_entry) / avg_entry * 100) if avg_entry != 0 else 0
# Calculate volatility metrics
roi_volatility = np.std(metrics['Actual ROI'])
risk_reward_ratio = abs(best_gain / max_drawdown) if max_drawdown != 0 else float('inf')
results.append({
'Symbol': symbol,
'Trade Count': trade_count,
'Avg Duration (hours)': round(avg_duration, 2),
'Total Lost Alpha': round(total_lost_alpha, 2),
'Avg Efficiency Score (%)': round(avg_efficiency, 2),
'Avg Risk-Adjusted Return': round(avg_risk_adjusted, 4),
'Avg Actual ROI (%)': round(avg_actual_roi, 2),
'Avg Potential ROI (%)': round(avg_potential_roi, 2),
'ROI Capture (%)': round(roi_capture, 2),
'Avg MFE/MAE Ratio': round(avg_mfe_mae_ratio, 2),
'Max Drawdown': round(max_drawdown, 2),
'Best Gain': round(best_gain, 2),
'Risk/Reward Ratio': round(risk_reward_ratio, 2),
'Price Improvement (%)': round(price_improvement, 2),
'ROI Volatility (%)': round(roi_volatility, 2)
})
except Exception as e:
logging.error(f"Error calculating statistics for symbol {symbol}: {str(e)}")
continue
# Create DataFrame and sort by trade count
df = pd.DataFrame(results)
if not df.empty:
df = df.sort_values('Trade Count', ascending=False)
# Add performance rating based on combined metrics
df['Performance Score'] = (
df['Avg Efficiency Score (%)'] * 0.3 +
df['ROI Capture (%)'] * 0.3 +
df['Avg Risk-Adjusted Return'] * 0.2 +
(100 - abs(df['ROI Volatility (%)'])) * 0.2
).round(2)
# Sort columns for better readability
column_order = [
'Symbol', 'Trade Count', 'Performance Score',
'Avg Actual ROI (%)', 'Avg Potential ROI (%)', 'ROI Capture (%)',
'Avg Efficiency Score (%)', 'Avg Risk-Adjusted Return',
'Total Lost Alpha', 'ROI Volatility (%)',
'Avg Duration (hours)', 'Risk/Reward Ratio',
'Max Drawdown', 'Best Gain', 'Avg MFE/MAE Ratio',
'Price Improvement (%)'
]
df = df[column_order]
# Add summary statistics
total_row = pd.DataFrame([{
'Symbol': 'TOTAL/AVG',
'Trade Count': df['Trade Count'].sum(),
'Performance Score': df['Performance Score'].mean(),
'Avg Actual ROI (%)': df['Avg Actual ROI (%)'].mean(),
'Avg Potential ROI (%)': df['Avg Potential ROI (%)'].mean(),
'ROI Capture (%)': df['ROI Capture (%)'].mean(),
'Avg Efficiency Score (%)': df['Avg Efficiency Score (%)'].mean(),
'Avg Risk-Adjusted Return': df['Avg Risk-Adjusted Return'].mean(),
'Total Lost Alpha': df['Total Lost Alpha'].sum(),
'ROI Volatility (%)': df['ROI Volatility (%)'].mean(),
'Avg Duration (hours)': df['Avg Duration (hours)'].mean(),
'Risk/Reward Ratio': df['Risk/Reward Ratio'].mean(),
'Max Drawdown': df['Max Drawdown'].min(),
'Best Gain': df['Best Gain'].max(),
'Avg MFE/MAE Ratio': df['Avg MFE/MAE Ratio'].mean(),
'Price Improvement (%)': df['Price Improvement (%)'].mean()
}])
df = pd.concat([df, total_row])
return df
return pd.DataFrame()
except Exception as e:
logging.error(f"Error calculating aggregate statistics: {str(e)}")
return pd.DataFrame()
def calculate_metrics(row: pd.Series, quantity: int = 100) -> Optional[OptionsTradeMetrics]:
"""
Calculate advanced performance metrics for a single options trade with improved error handling
and more accurate calculations.
Parameters:
- row: A pandas Series containing trade data
- quantity: The number of contracts traded (default: 100)
Returns:
- An instance of OptionsTradeMetrics with calculated metrics, or None if calculation fails
"""
required_columns = {
'Open Date', 'Open Time', 'Close Date', 'Close Time',
'Best Exit Time', 'Avg Buy Price', 'Avg Sell Price',
'Best Exit Price', 'Position MFE', 'Position MAE', 'Symbol'
}
# Verify all required columns exist and contain non-null values
missing_columns = [col for col in required_columns if col not in row.index]
null_columns = [col for col in required_columns if col in row.index and pd.isna(row[col])]
if missing_columns or null_columns:
logging.warning(f"Missing or null columns for Symbol {row.get('Symbol', 'Unknown')}: "
f"Missing: {missing_columns}, Null: {null_columns}")
return None
try:
# Parse timestamps with improved timezone handling
est_tz = pytz.timezone('US/Eastern')
open_time = parse_datetime_with_timezone(row['Open Date'], row['Open Time'], est_tz)
close_time = parse_datetime_with_timezone(row['Close Date'], row['Close Time'], est_tz)
best_exit_time = parse_datetime_with_timezone(
row['Best Exit Time'].split()[0],
' '.join(row['Best Exit Time'].split()[1:]),
est_tz
)
if any(dt is None for dt in [open_time, close_time, best_exit_time]):
raise ValueError("Failed to parse one or more timestamps")
# Calculate trade duration in hours using Decimal for precise calculation
trade_duration = Decimal(str((close_time - open_time).total_seconds())) / Decimal('3600')
trade_duration = float(trade_duration.quantize(Decimal('0.01'), rounding=ROUND_HALF_UP))
# Extract and validate prices
entry_price = float(Decimal(str(row['Avg Buy Price'])))
exit_price = float(Decimal(str(row['Avg Sell Price'])))
best_exit_price = float(Decimal(str(row['Best Exit Price'])))
if any(p <= 0 for p in [entry_price, exit_price, best_exit_price]):
raise ValueError("Invalid price values detected")
# Calculate P&L metrics with Decimal for accuracy
actual_pnl = float((Decimal(str(exit_price)) - Decimal(str(entry_price))) * Decimal(str(quantity)))
potential_pnl = float((Decimal(str(best_exit_price)) - Decimal(str(entry_price))) * Decimal(str(quantity)))
lost_alpha = potential_pnl - actual_pnl
# Calculate ROI percentages
investment = Decimal(str(entry_price)) * Decimal(str(quantity))
actual_roi = float((Decimal(str(actual_pnl)) / investment * Decimal('100')).quantize(Decimal('0.01')))
potential_roi = float((Decimal(str(potential_pnl)) / investment * Decimal('100')).quantize(Decimal('0.01')))
# Calculate Efficiency Score with proper handling of edge cases
price_range = Decimal(str(best_exit_price)) - Decimal(str(entry_price))
if abs(price_range) < Decimal('0.0001'): # Avoid division by very small numbers
efficiency_score = 0.0
else:
actual_change = Decimal(str(exit_price)) - Decimal(str(entry_price))
efficiency_score = float((actual_change / price_range * Decimal('100')).quantize(Decimal('0.01')))
# Extract and validate MFE/MAE values
position_mfe = float(Decimal(str(row['Position MFE'])))
position_mae = float(Decimal(str(row['Position MAE'])))
# Calculate MFE/MAE ratio with safety checks
if abs(position_mae) < 0.0001:
position_mfe_mae_ratio = 0.0
else:
position_mfe_mae_ratio = position_mfe / position_mae
# Calculate Risk-Adjusted Return with safety checks
risk_factor = Decimal(str(trade_duration)) * Decimal(str(abs(position_mae)))
if risk_factor < Decimal('0.0001'):
risk_adjusted_return = 0.0
else:
risk_adjusted_return = float((Decimal(str(actual_pnl)) / risk_factor).quantize(Decimal('0.0001')))
return OptionsTradeMetrics(
symbol=row['Symbol'],
trade_duration=trade_duration,
entry_price=entry_price,
exit_price=exit_price,
best_exit_price=best_exit_price,
position_mfe=position_mfe,
position_mae=position_mae,
actual_roi=actual_roi,
potential_roi=potential_roi,
lost_alpha=lost_alpha,
efficiency_score=efficiency_score,
risk_adjusted_return=risk_adjusted_return,
position_mfe_mae_ratio=position_mfe_mae_ratio
)
except Exception as e:
logging.error(f"Error calculating metrics for Symbol {row.get('Symbol', 'Unknown')}: {str(e)}")
return None
def parse_multiline_csv(data: str, skip_blank_lines: bool = True) -> pd.DataFrame:
"""
Parse multiline CSV data with enhanced error handling and validation.
Parameters:
- data: String containing multiline CSV data
- skip_blank_lines: Whether to skip blank lines in the CSV data
Returns:
- Parsed DataFrame or empty DataFrame if parsing fails
"""
try:
# Remove any BOM characters and normalize line endings
cleaned_data = data.replace('\ufeff', '').replace('\r\n', '\n').replace('\r', '\n')
# Create StringIO object with cleaned data
data_io = StringIO(cleaned_data)
# Read CSV with additional error handling parameters
df = pd.read_csv(
data_io,
skip_blank_lines=skip_blank_lines,
on_bad_lines='warn',
dtype={
'Symbol': str,
'Quantity': 'Int64', # Allows for NA values
'Avg Buy Price': float,
'Avg Sell Price': float,
'Best Exit Price': float,
'Position MFE': float,
'Position MAE': float
}
)
# Validate required columns
required_columns = {
'Symbol', 'Open Date', 'Open Time', 'Close Date', 'Close Time',
'Best Exit Time', 'Avg Buy Price', 'Avg Sell Price', 'Best Exit Price',
'Position MFE', 'Position MAE'
}
missing_columns = required_columns - set(df.columns)
if missing_columns:
logging.error(f"Missing required columns in CSV: {missing_columns}")
return pd.DataFrame()
# Remove any completely empty rows
df = df.dropna(how='all')
# Validate date formats
date_columns = ['Open Date', 'Close Date']
for col in date_columns:
try:
pd.to_datetime(df[col], format='%Y-%m-%d')
except Exception as e:
logging.error(f"Invalid date format in column {col}: {str(e)}")
return pd.DataFrame()
return df
except Exception as e:
logging.error(f"Error parsing CSV data: {str(e)}")
return pd.DataFrame()
def main():
"""
Main function with improved error handling and data validation.
"""
# Sample data (your existing data string here)
data = """Account Name,Adjusted Cost,Adjusted Proceeds,Avg Buy Price,Avg Sell Price,Exit Efficiency,Best Exit,Best Exit Price,Best Exit Time,Close Date,Close Time,Commission,Custom Tags,Duration,Entry Price,Executions,Exit Price,Gross P&L,Trade Risk,Initial Target,Instrument,Spread Type,Market Cap,Mistakes,Net P&L,Net ROI,Open Date,Open Time,Pips,Reward Ratio,Playbook,Points,Position MAE,Position MFE,Price MAE,Price MFE,Realized RR,Return Per Pip,Reviewed,Sector,Setups,Side,Status,Symbol,Ticks Value,Ticks Per Contract,Fee,Swap,Rating,Quantity,Zella Score
Robinhood 0281,11291.0,15041.6,8.065,10.744,112.21,3342.0,12.27,2022-11-09 20:25:00 UTC,2022-11-09,10:26:49 EST,0.0,"",168245.717,11.51,27,8.19,3750.0,0.0,0.0,2022-11-09 276 PUT,single,64891144500.0,"",3750.0,0.33212293,2022-11-07,11:42:44 EST,,,"",,-2957.0,0.0,4.28,10.33,,,false,No sector,"",short,Win,QQQ,,,0.0,0.0,,14.0,126.81772066"""
try:
# Parse the CSV data
df = parse_multiline_csv(data)
if df.empty:
logging.error("Failed to parse CSV data or no valid data found")
return
# Process each row with error handling
results = []
for index, row in df.iterrows():
try:
# Validate and extract quantity
quantity = row.get('Quantity', 100)
if pd.isna(quantity) or not isinstance(quantity, (int, float)) or quantity <= 0:
logging.warning(f"Invalid quantity ({quantity}) for row {index}, using default value 100")
quantity = 100
# Calculate metrics
metrics = calculate_metrics(row, quantity=int(quantity))
if metrics:
results.append(metrics)
print(f"\nProcessed trade {index + 1}:")
print(metrics)
except Exception as e:
logging.error(f"Error processing row {index}: {str(e)}")
continue
if not results:
logging.warning("No valid trades processed")
return
# Calculate and display aggregate statistics
try:
aggregate_stats = calculate_aggregate_statistics(results)
if not aggregate_stats.empty:
print("\nAggregate Statistics by Symbol:")
print(aggregate_stats.to_string())
except Exception as e:
logging.error(f"Error calculating aggregate statistics: {str(e)}")
except Exception as e:
logging.error(f"Unexpected error in main function: {str(e)}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment