Created
October 29, 2024 02:55
-
-
Save Kurry/09863b696ef62003f0a6d7898e0b03ca to your computer and use it in GitHub Desktop.
Advanced Options Trading Analytics Tool A comprehensive Python tool for analyzing options trading performance with features including: - Trade metrics calculation (ROI, efficiency, risk-adjusted returns) - Precise timezone-aware datetime handling - Advanced statistical aggregation with performance scoring - Robust CSV parsing and data validation…
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
from datetime import datetime | |
from io import StringIO | |
import pytz | |
from dataclasses import dataclass | |
from typing import Optional, List, Tuple, Dict | |
from collections import defaultdict | |
import logging | |
from decimal import Decimal, ROUND_HALF_UP | |
import numpy as np | |
# Configure logging with more detailed format | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s', | |
datefmt='%Y-%m-%d %H:%M:%S' | |
) | |
@dataclass | |
class OptionsTradeMetrics: | |
symbol: str | |
trade_duration: float # in hours | |
entry_price: float | |
exit_price: float | |
best_exit_price: float | |
position_mfe: float | |
position_mae: float | |
actual_roi: float | |
potential_roi: float | |
lost_alpha: float | |
efficiency_score: float | |
risk_adjusted_return: float | |
position_mfe_mae_ratio: float | |
def __repr__(self): | |
return (f"OptionsTradeMetrics(symbol={self.symbol}, trade_duration={self.trade_duration:.2f}h, " | |
f"entry_price={self.entry_price:.4f}, exit_price={self.exit_price:.4f}, " | |
f"best_exit_price={self.best_exit_price:.4f}, position_mfe={self.position_mfe:.4f}, " | |
f"position_mae={self.position_mae:.4f}, actual_roi={self.actual_roi:.2f}%, " | |
f"potential_roi={self.potential_roi:.2f}%, lost_alpha={self.lost_alpha:.2f}, " | |
f"efficiency_score={self.efficiency_score:.2f}%, " | |
f"risk_adjusted_return={self.risk_adjusted_return:.4f}, " | |
f"position_mfe_mae_ratio={self.position_mfe_mae_ratio:.4f}") | |
def parse_datetime_with_timezone(date_str: str, time_str: str, default_tz: pytz.timezone) -> Optional[datetime]: | |
""" | |
Parse date and time strings with proper timezone handling. | |
Parameters: | |
- date_str: Date string in YYYY-MM-DD format | |
- time_str: Time string that may include timezone information | |
- default_tz: Default timezone to use if none is specified | |
Returns: | |
- Timezone-aware datetime object in UTC, or None if parsing fails | |
""" | |
try: | |
# Remove timezone abbreviations and clean the strings | |
time_str = time_str.replace('EST', '').replace('EDT', '').replace('UTC', '').strip() | |
datetime_str = f"{date_str} {time_str}" | |
# Try parsing with various formats | |
formats = [ | |
'%Y-%m-%d %H:%M:%S', | |
'%Y-%m-%d %H:%M:%S.%f', | |
'%Y-%m-%d %H:%M' | |
] | |
parsed_dt = None | |
for fmt in formats: | |
try: | |
parsed_dt = datetime.strptime(datetime_str, fmt) | |
break | |
except ValueError: | |
continue | |
if parsed_dt is None: | |
raise ValueError(f"Could not parse datetime: {datetime_str}") | |
# Localize the datetime and convert to UTC | |
localized_dt = default_tz.localize(parsed_dt) | |
return localized_dt.astimezone(pytz.UTC) | |
except Exception as e: | |
logging.error(f"Error parsing datetime {date_str} {time_str}: {str(e)}") | |
return None | |
def calculate_aggregate_statistics(patterns: List[OptionsTradeMetrics]) -> pd.DataFrame: | |
""" | |
Calculate comprehensive aggregate statistics by symbol with enhanced metrics | |
and proper error handling. | |
Parameters: | |
- patterns: List of OptionsTradeMetrics instances | |
Returns: | |
- DataFrame containing detailed aggregate statistics | |
""" | |
if not patterns: | |
logging.warning("No patterns provided for aggregate statistics calculation") | |
return pd.DataFrame() | |
try: | |
# Initialize statistics dictionary | |
stats_dict: Dict[str, Dict[str, list]] = defaultdict(lambda: defaultdict(list)) | |
# Collect metrics by symbol | |
for pattern in patterns: | |
symbol = pattern.symbol | |
# Basic metrics | |
stats_dict[symbol]['Duration'].append(pattern.trade_duration) | |
stats_dict[symbol]['Entry Price'].append(pattern.entry_price) | |
stats_dict[symbol]['Exit Price'].append(pattern.exit_price) | |
stats_dict[symbol]['Best Exit Price'].append(pattern.best_exit_price) | |
# Performance metrics | |
stats_dict[symbol]['Lost Alpha'].append(pattern.lost_alpha) | |
stats_dict[symbol]['Efficiency Score'].append(pattern.efficiency_score) | |
stats_dict[symbol]['Risk-Adjusted Return'].append(pattern.risk_adjusted_return) | |
stats_dict[symbol]['Actual ROI'].append(pattern.actual_roi) | |
stats_dict[symbol]['Potential ROI'].append(pattern.potential_roi) | |
# Risk metrics | |
stats_dict[symbol]['MFE'].append(pattern.position_mfe) | |
stats_dict[symbol]['MAE'].append(pattern.position_mae) | |
stats_dict[symbol]['MFE/MAE Ratio'].append(pattern.position_mfe_mae_ratio) | |
# Process statistics for each symbol | |
results = [] | |
for symbol, metrics in stats_dict.items(): | |
try: | |
# Calculate basic statistics | |
trade_count = len(metrics['Duration']) | |
avg_duration = np.mean(metrics['Duration']) | |
# Calculate performance metrics | |
total_lost_alpha = sum(metrics['Lost Alpha']) | |
avg_efficiency = np.mean(metrics['Efficiency Score']) | |
avg_risk_adjusted = np.mean(metrics['Risk-Adjusted Return']) | |
# Calculate ROI statistics | |
avg_actual_roi = np.mean(metrics['Actual ROI']) | |
avg_potential_roi = np.mean(metrics['Potential ROI']) | |
roi_capture = (avg_actual_roi / avg_potential_roi * 100) if avg_potential_roi != 0 else 0 | |
# Calculate risk metrics | |
avg_mfe_mae_ratio = np.mean(metrics['MFE/MAE Ratio']) | |
max_drawdown = min(metrics['MAE']) | |
best_gain = max(metrics['MFE']) | |
# Calculate price metrics | |
avg_entry = np.mean(metrics['Entry Price']) | |
avg_exit = np.mean(metrics['Exit Price']) | |
price_improvement = ((avg_exit - avg_entry) / avg_entry * 100) if avg_entry != 0 else 0 | |
# Calculate volatility metrics | |
roi_volatility = np.std(metrics['Actual ROI']) | |
risk_reward_ratio = abs(best_gain / max_drawdown) if max_drawdown != 0 else float('inf') | |
results.append({ | |
'Symbol': symbol, | |
'Trade Count': trade_count, | |
'Avg Duration (hours)': round(avg_duration, 2), | |
'Total Lost Alpha': round(total_lost_alpha, 2), | |
'Avg Efficiency Score (%)': round(avg_efficiency, 2), | |
'Avg Risk-Adjusted Return': round(avg_risk_adjusted, 4), | |
'Avg Actual ROI (%)': round(avg_actual_roi, 2), | |
'Avg Potential ROI (%)': round(avg_potential_roi, 2), | |
'ROI Capture (%)': round(roi_capture, 2), | |
'Avg MFE/MAE Ratio': round(avg_mfe_mae_ratio, 2), | |
'Max Drawdown': round(max_drawdown, 2), | |
'Best Gain': round(best_gain, 2), | |
'Risk/Reward Ratio': round(risk_reward_ratio, 2), | |
'Price Improvement (%)': round(price_improvement, 2), | |
'ROI Volatility (%)': round(roi_volatility, 2) | |
}) | |
except Exception as e: | |
logging.error(f"Error calculating statistics for symbol {symbol}: {str(e)}") | |
continue | |
# Create DataFrame and sort by trade count | |
df = pd.DataFrame(results) | |
if not df.empty: | |
df = df.sort_values('Trade Count', ascending=False) | |
# Add performance rating based on combined metrics | |
df['Performance Score'] = ( | |
df['Avg Efficiency Score (%)'] * 0.3 + | |
df['ROI Capture (%)'] * 0.3 + | |
df['Avg Risk-Adjusted Return'] * 0.2 + | |
(100 - abs(df['ROI Volatility (%)'])) * 0.2 | |
).round(2) | |
# Sort columns for better readability | |
column_order = [ | |
'Symbol', 'Trade Count', 'Performance Score', | |
'Avg Actual ROI (%)', 'Avg Potential ROI (%)', 'ROI Capture (%)', | |
'Avg Efficiency Score (%)', 'Avg Risk-Adjusted Return', | |
'Total Lost Alpha', 'ROI Volatility (%)', | |
'Avg Duration (hours)', 'Risk/Reward Ratio', | |
'Max Drawdown', 'Best Gain', 'Avg MFE/MAE Ratio', | |
'Price Improvement (%)' | |
] | |
df = df[column_order] | |
# Add summary statistics | |
total_row = pd.DataFrame([{ | |
'Symbol': 'TOTAL/AVG', | |
'Trade Count': df['Trade Count'].sum(), | |
'Performance Score': df['Performance Score'].mean(), | |
'Avg Actual ROI (%)': df['Avg Actual ROI (%)'].mean(), | |
'Avg Potential ROI (%)': df['Avg Potential ROI (%)'].mean(), | |
'ROI Capture (%)': df['ROI Capture (%)'].mean(), | |
'Avg Efficiency Score (%)': df['Avg Efficiency Score (%)'].mean(), | |
'Avg Risk-Adjusted Return': df['Avg Risk-Adjusted Return'].mean(), | |
'Total Lost Alpha': df['Total Lost Alpha'].sum(), | |
'ROI Volatility (%)': df['ROI Volatility (%)'].mean(), | |
'Avg Duration (hours)': df['Avg Duration (hours)'].mean(), | |
'Risk/Reward Ratio': df['Risk/Reward Ratio'].mean(), | |
'Max Drawdown': df['Max Drawdown'].min(), | |
'Best Gain': df['Best Gain'].max(), | |
'Avg MFE/MAE Ratio': df['Avg MFE/MAE Ratio'].mean(), | |
'Price Improvement (%)': df['Price Improvement (%)'].mean() | |
}]) | |
df = pd.concat([df, total_row]) | |
return df | |
return pd.DataFrame() | |
except Exception as e: | |
logging.error(f"Error calculating aggregate statistics: {str(e)}") | |
return pd.DataFrame() | |
def calculate_metrics(row: pd.Series, quantity: int = 100) -> Optional[OptionsTradeMetrics]: | |
""" | |
Calculate advanced performance metrics for a single options trade with improved error handling | |
and more accurate calculations. | |
Parameters: | |
- row: A pandas Series containing trade data | |
- quantity: The number of contracts traded (default: 100) | |
Returns: | |
- An instance of OptionsTradeMetrics with calculated metrics, or None if calculation fails | |
""" | |
required_columns = { | |
'Open Date', 'Open Time', 'Close Date', 'Close Time', | |
'Best Exit Time', 'Avg Buy Price', 'Avg Sell Price', | |
'Best Exit Price', 'Position MFE', 'Position MAE', 'Symbol' | |
} | |
# Verify all required columns exist and contain non-null values | |
missing_columns = [col for col in required_columns if col not in row.index] | |
null_columns = [col for col in required_columns if col in row.index and pd.isna(row[col])] | |
if missing_columns or null_columns: | |
logging.warning(f"Missing or null columns for Symbol {row.get('Symbol', 'Unknown')}: " | |
f"Missing: {missing_columns}, Null: {null_columns}") | |
return None | |
try: | |
# Parse timestamps with improved timezone handling | |
est_tz = pytz.timezone('US/Eastern') | |
open_time = parse_datetime_with_timezone(row['Open Date'], row['Open Time'], est_tz) | |
close_time = parse_datetime_with_timezone(row['Close Date'], row['Close Time'], est_tz) | |
best_exit_time = parse_datetime_with_timezone( | |
row['Best Exit Time'].split()[0], | |
' '.join(row['Best Exit Time'].split()[1:]), | |
est_tz | |
) | |
if any(dt is None for dt in [open_time, close_time, best_exit_time]): | |
raise ValueError("Failed to parse one or more timestamps") | |
# Calculate trade duration in hours using Decimal for precise calculation | |
trade_duration = Decimal(str((close_time - open_time).total_seconds())) / Decimal('3600') | |
trade_duration = float(trade_duration.quantize(Decimal('0.01'), rounding=ROUND_HALF_UP)) | |
# Extract and validate prices | |
entry_price = float(Decimal(str(row['Avg Buy Price']))) | |
exit_price = float(Decimal(str(row['Avg Sell Price']))) | |
best_exit_price = float(Decimal(str(row['Best Exit Price']))) | |
if any(p <= 0 for p in [entry_price, exit_price, best_exit_price]): | |
raise ValueError("Invalid price values detected") | |
# Calculate P&L metrics with Decimal for accuracy | |
actual_pnl = float((Decimal(str(exit_price)) - Decimal(str(entry_price))) * Decimal(str(quantity))) | |
potential_pnl = float((Decimal(str(best_exit_price)) - Decimal(str(entry_price))) * Decimal(str(quantity))) | |
lost_alpha = potential_pnl - actual_pnl | |
# Calculate ROI percentages | |
investment = Decimal(str(entry_price)) * Decimal(str(quantity)) | |
actual_roi = float((Decimal(str(actual_pnl)) / investment * Decimal('100')).quantize(Decimal('0.01'))) | |
potential_roi = float((Decimal(str(potential_pnl)) / investment * Decimal('100')).quantize(Decimal('0.01'))) | |
# Calculate Efficiency Score with proper handling of edge cases | |
price_range = Decimal(str(best_exit_price)) - Decimal(str(entry_price)) | |
if abs(price_range) < Decimal('0.0001'): # Avoid division by very small numbers | |
efficiency_score = 0.0 | |
else: | |
actual_change = Decimal(str(exit_price)) - Decimal(str(entry_price)) | |
efficiency_score = float((actual_change / price_range * Decimal('100')).quantize(Decimal('0.01'))) | |
# Extract and validate MFE/MAE values | |
position_mfe = float(Decimal(str(row['Position MFE']))) | |
position_mae = float(Decimal(str(row['Position MAE']))) | |
# Calculate MFE/MAE ratio with safety checks | |
if abs(position_mae) < 0.0001: | |
position_mfe_mae_ratio = 0.0 | |
else: | |
position_mfe_mae_ratio = position_mfe / position_mae | |
# Calculate Risk-Adjusted Return with safety checks | |
risk_factor = Decimal(str(trade_duration)) * Decimal(str(abs(position_mae))) | |
if risk_factor < Decimal('0.0001'): | |
risk_adjusted_return = 0.0 | |
else: | |
risk_adjusted_return = float((Decimal(str(actual_pnl)) / risk_factor).quantize(Decimal('0.0001'))) | |
return OptionsTradeMetrics( | |
symbol=row['Symbol'], | |
trade_duration=trade_duration, | |
entry_price=entry_price, | |
exit_price=exit_price, | |
best_exit_price=best_exit_price, | |
position_mfe=position_mfe, | |
position_mae=position_mae, | |
actual_roi=actual_roi, | |
potential_roi=potential_roi, | |
lost_alpha=lost_alpha, | |
efficiency_score=efficiency_score, | |
risk_adjusted_return=risk_adjusted_return, | |
position_mfe_mae_ratio=position_mfe_mae_ratio | |
) | |
except Exception as e: | |
logging.error(f"Error calculating metrics for Symbol {row.get('Symbol', 'Unknown')}: {str(e)}") | |
return None | |
def parse_multiline_csv(data: str, skip_blank_lines: bool = True) -> pd.DataFrame: | |
""" | |
Parse multiline CSV data with enhanced error handling and validation. | |
Parameters: | |
- data: String containing multiline CSV data | |
- skip_blank_lines: Whether to skip blank lines in the CSV data | |
Returns: | |
- Parsed DataFrame or empty DataFrame if parsing fails | |
""" | |
try: | |
# Remove any BOM characters and normalize line endings | |
cleaned_data = data.replace('\ufeff', '').replace('\r\n', '\n').replace('\r', '\n') | |
# Create StringIO object with cleaned data | |
data_io = StringIO(cleaned_data) | |
# Read CSV with additional error handling parameters | |
df = pd.read_csv( | |
data_io, | |
skip_blank_lines=skip_blank_lines, | |
on_bad_lines='warn', | |
dtype={ | |
'Symbol': str, | |
'Quantity': 'Int64', # Allows for NA values | |
'Avg Buy Price': float, | |
'Avg Sell Price': float, | |
'Best Exit Price': float, | |
'Position MFE': float, | |
'Position MAE': float | |
} | |
) | |
# Validate required columns | |
required_columns = { | |
'Symbol', 'Open Date', 'Open Time', 'Close Date', 'Close Time', | |
'Best Exit Time', 'Avg Buy Price', 'Avg Sell Price', 'Best Exit Price', | |
'Position MFE', 'Position MAE' | |
} | |
missing_columns = required_columns - set(df.columns) | |
if missing_columns: | |
logging.error(f"Missing required columns in CSV: {missing_columns}") | |
return pd.DataFrame() | |
# Remove any completely empty rows | |
df = df.dropna(how='all') | |
# Validate date formats | |
date_columns = ['Open Date', 'Close Date'] | |
for col in date_columns: | |
try: | |
pd.to_datetime(df[col], format='%Y-%m-%d') | |
except Exception as e: | |
logging.error(f"Invalid date format in column {col}: {str(e)}") | |
return pd.DataFrame() | |
return df | |
except Exception as e: | |
logging.error(f"Error parsing CSV data: {str(e)}") | |
return pd.DataFrame() | |
def main(): | |
""" | |
Main function with improved error handling and data validation. | |
""" | |
# Sample data (your existing data string here) | |
data = """Account Name,Adjusted Cost,Adjusted Proceeds,Avg Buy Price,Avg Sell Price,Exit Efficiency,Best Exit,Best Exit Price,Best Exit Time,Close Date,Close Time,Commission,Custom Tags,Duration,Entry Price,Executions,Exit Price,Gross P&L,Trade Risk,Initial Target,Instrument,Spread Type,Market Cap,Mistakes,Net P&L,Net ROI,Open Date,Open Time,Pips,Reward Ratio,Playbook,Points,Position MAE,Position MFE,Price MAE,Price MFE,Realized RR,Return Per Pip,Reviewed,Sector,Setups,Side,Status,Symbol,Ticks Value,Ticks Per Contract,Fee,Swap,Rating,Quantity,Zella Score | |
Robinhood 0281,11291.0,15041.6,8.065,10.744,112.21,3342.0,12.27,2022-11-09 20:25:00 UTC,2022-11-09,10:26:49 EST,0.0,"",168245.717,11.51,27,8.19,3750.0,0.0,0.0,2022-11-09 276 PUT,single,64891144500.0,"",3750.0,0.33212293,2022-11-07,11:42:44 EST,,,"",,-2957.0,0.0,4.28,10.33,,,false,No sector,"",short,Win,QQQ,,,0.0,0.0,,14.0,126.81772066""" | |
try: | |
# Parse the CSV data | |
df = parse_multiline_csv(data) | |
if df.empty: | |
logging.error("Failed to parse CSV data or no valid data found") | |
return | |
# Process each row with error handling | |
results = [] | |
for index, row in df.iterrows(): | |
try: | |
# Validate and extract quantity | |
quantity = row.get('Quantity', 100) | |
if pd.isna(quantity) or not isinstance(quantity, (int, float)) or quantity <= 0: | |
logging.warning(f"Invalid quantity ({quantity}) for row {index}, using default value 100") | |
quantity = 100 | |
# Calculate metrics | |
metrics = calculate_metrics(row, quantity=int(quantity)) | |
if metrics: | |
results.append(metrics) | |
print(f"\nProcessed trade {index + 1}:") | |
print(metrics) | |
except Exception as e: | |
logging.error(f"Error processing row {index}: {str(e)}") | |
continue | |
if not results: | |
logging.warning("No valid trades processed") | |
return | |
# Calculate and display aggregate statistics | |
try: | |
aggregate_stats = calculate_aggregate_statistics(results) | |
if not aggregate_stats.empty: | |
print("\nAggregate Statistics by Symbol:") | |
print(aggregate_stats.to_string()) | |
except Exception as e: | |
logging.error(f"Error calculating aggregate statistics: {str(e)}") | |
except Exception as e: | |
logging.error(f"Unexpected error in main function: {str(e)}") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment