Last active
May 29, 2025 13:35
-
-
Save 18182324/c3cd48a9a5e11056e1efba16bfa3f42c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import alpaca_trade_api as tradeapi | |
from ta.trend import MACD | |
import pandas as pd | |
import numpy as np | |
import time | |
import datetime as dt | |
import logging | |
# ============ CONFIGURATION ============ | |
# Replace with your actual Alpaca API credentials | |
API_KEY = "YOUR_API_KEY" | |
API_SECRET = "YOUR_API_SECRET" | |
BASE_URL = "https://paper-api.alpaca.markets" # Use live URL for real trading | |
# Define your trading parameters and universe | |
SYMBOLS = ["AAPL", "AMD", "TSLA", "NVDA"] # Watchlist | |
RISK_PER_TRADE = 0.01 # Max portfolio percentage at risk per trade | |
STOP_LOSS_BUFFER = 0.05 # 5% below entry price | |
TAKE_PROFIT_BUFFER = 0.10 # 10% above entry price | |
TRADE_START_DELAY = 15 # Minutes after market open to begin trading | |
TRADE_END_DELAY = 60 # Stop buying after 1 hour of open | |
DAY_END_BUFFER = 15 # Close positions 15 minutes before market close | |
POSITION_SIZE = 1000 # Fixed dollar amount per trade | |
# ============ INITIAL SETUP ============ | |
# Initialize logger and Alpaca API client | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
api = tradeapi.REST(API_KEY, API_SECRET, BASE_URL, api_version="v2") | |
# ============ HELPER FUNCTIONS ============ | |
def get_market_open_close(): | |
"""Fetch market open and close times for today.""" | |
clock = api.get_clock() | |
today = clock.timestamp.date() | |
calendar = api.get_calendar(start=today, end=today)[0] | |
return pd.to_datetime(calendar.open), pd.to_datetime(calendar.close) | |
def get_data(symbol, limit=100): | |
"""Retrieve intraday bar data for a given stock.""" | |
barset = api.get_bars(symbol, tradeapi.TimeFrame.Minute, limit=limit) | |
df = pd.DataFrame([bar.__dict__ for bar in barset]) | |
df["t"] = pd.to_datetime(df["t"]) | |
df.set_index("t", inplace=True) | |
df = df[["o", "h", "l", "c", "v"]] | |
df.columns = ["open", "high", "low", "close", "volume"] | |
return df | |
def apply_macd(df): | |
"""Add MACD indicator values to the dataframe.""" | |
macd = MACD(close=df["close"], window_slow=26, window_fast=12, window_sign=9) | |
df["macd"] = macd.macd() | |
df["macd_signal"] = macd.macd_signal() | |
df["macd_diff"] = macd.macd_diff() | |
return df | |
def should_buy(df): | |
"""Determine whether conditions for buying are met.""" | |
current = df.iloc[-1] | |
prev = df.iloc[-2] | |
price_change = (current["close"] - prev["close"]) / prev["close"] | |
breakout = current["close"] > df.iloc[:15]["high"].max() # High of first 15 mins | |
macd_signal = current["macd"] > 0 and current["macd"] > prev["macd"] # MACD positive and rising | |
return price_change > 0.04 and breakout and macd_signal | |
def calculate_qty(price): | |
"""Compute how many shares to buy based on position size.""" | |
return int(POSITION_SIZE / price) | |
def place_bracket_order(symbol, qty, entry_price): | |
"""Place a bracket order with stop-loss and take-profit levels.""" | |
try: | |
stop_price = round(entry_price * (1 - STOP_LOSS_BUFFER), 2) | |
take_profit_price = round(entry_price * (1 + TAKE_PROFIT_BUFFER), 2) | |
api.submit_order( | |
symbol=symbol, | |
qty=qty, | |
side="buy", | |
type="market", | |
time_in_force="day", | |
order_class="bracket", | |
stop_loss={"stop_price": stop_price}, | |
take_profit={"limit_price": take_profit_price} | |
) | |
logger.info(f"Order placed: {symbol} | Entry: {entry_price} | SL: {stop_price} | TP: {take_profit_price}") | |
except Exception as e: | |
logger.error(f"Failed to place bracket order for {symbol}: {e}") | |
def liquidate_positions(): | |
"""Sell all current open positions at market price.""" | |
positions = api.list_positions() | |
for pos in positions: | |
try: | |
api.submit_order( | |
symbol=pos.symbol, | |
qty=pos.qty, | |
side="sell", | |
type="market", | |
time_in_force="day" | |
) | |
logger.info(f"Liquidated: {pos.symbol}") | |
except Exception as e: | |
logger.error(f"Error liquidating {pos.symbol}: {e}") | |
# ============ MAIN EXECUTION LOOP ============ | |
if __name__ == "__main__": | |
open_time, close_time = get_market_open_close() | |
start_buy = open_time + dt.timedelta(minutes=TRADE_START_DELAY) | |
end_buy = open_time + dt.timedelta(minutes=TRADE_END_DELAY) | |
final_sell = close_time - dt.timedelta(minutes=DAY_END_BUFFER) | |
bought = set() # Track purchased tickers | |
logger.info(f"Trading window: {start_buy.time()} to {end_buy.time()}") | |
logger.info(f"Liquidation time: {final_sell.time()} | Market close: {close_time.time()}") | |
while dt.datetime.now(dt.timezone.utc) < close_time: | |
now = dt.datetime.now(dt.timezone.utc) | |
# Check for buys during entry window | |
if start_buy <= now <= end_buy: | |
for symbol in SYMBOLS: | |
if symbol in bought: | |
continue | |
df = get_data(symbol, limit=50) | |
df = apply_macd(df) | |
if should_buy(df): | |
price = df.iloc[-1]["close"] | |
qty = calculate_qty(price) | |
place_bracket_order(symbol, qty, price) | |
bought.add(symbol) | |
# Liquidate positions before close | |
elif now >= final_sell: | |
logger.info("Closing all positions before end of day...") | |
liquidate_positions() | |
break | |
time.sleep(30) # Reduce API calls with delay | |
logger.info("Session ended. Strategy execution complete.") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#requirements | |
#alpaca-trade-api>=0.25 | |
#ta | |
#sklearn | |
import alpaca_trade_api as tradeapi | |
import requests | |
import time | |
from ta import macd | |
import numpy as np | |
from datetime import datetime, timedelta | |
from pytz import timezone | |
# Replace these with your API connection info from the dashboard | |
base_url = 'Your API URL' | |
api_key_id = 'Your API Key' | |
api_secret = 'Your API Secret' | |
api = tradeapi.REST( | |
base_url=base_url, | |
key_id=api_key_id, | |
secret_key=api_secret | |
) | |
session = requests.session() | |
# We only consider stocks with per-share prices inside this range | |
min_share_price = 2.0 | |
max_share_price = 13.0 | |
# Minimum previous-day dollar volume for a stock we might consider | |
min_last_dv = 500000 | |
# Stop limit to default to | |
default_stop = .95 | |
# How much of our portfolio to allocate to any one position | |
risk = 0.001 | |
def get_1000m_history_data(symbols): | |
print('Getting historical data...') | |
minute_history = {} | |
c = 0 | |
for symbol in symbols: | |
minute_history[symbol] = api.polygon.historic_agg( | |
size="minute", symbol=symbol, limit=1000 | |
).df | |
c += 1 | |
print('{}/{}'.format(c, len(symbols))) | |
print('Success.') | |
return minute_history | |
def get_tickers(): | |
print('Getting current ticker data...') | |
tickers = api.polygon.all_tickers() | |
print('Success.') | |
assets = api.list_assets() | |
symbols = [asset.symbol for asset in assets if asset.tradable] | |
return [ticker for ticker in tickers if ( | |
ticker.ticker in symbols and | |
ticker.lastTrade['p'] >= min_share_price and | |
ticker.lastTrade['p'] <= max_share_price and | |
ticker.prevDay['v'] * ticker.lastTrade['p'] > min_last_dv and | |
ticker.todaysChangePerc >= 3.5 | |
)] | |
def find_stop(current_value, minute_history, now): | |
series = minute_history['low'][-100:] \ | |
.dropna().resample('5min').min() | |
series = series[now.floor('1D'):] | |
diff = np.diff(series.values) | |
low_index = np.where((diff[:-1] <= 0) & (diff[1:] > 0))[0] + 1 | |
if len(low_index) > 0: | |
return series[low_index[-1]] - 0.01 | |
return current_value * default_stop | |
def run(tickers, market_open_dt, market_close_dt): | |
# Establish streaming connection | |
conn = tradeapi.StreamConn(base_url=base_url, key_id=api_key_id, secret_key=api_secret) | |
# Update initial state with information from tickers | |
volume_today = {} | |
prev_closes = {} | |
for ticker in tickers: | |
symbol = ticker.ticker | |
prev_closes[symbol] = ticker.prevDay['c'] | |
volume_today[symbol] = ticker.day['v'] | |
symbols = [ticker.ticker for ticker in tickers] | |
print('Tracking {} symbols.'.format(len(symbols))) | |
minute_history = get_1000m_history_data(symbols) | |
portfolio_value = float(api.get_account().portfolio_value) | |
open_orders = {} | |
positions = {} | |
# Cancel any existing open orders on watched symbols | |
existing_orders = api.list_orders(limit=500) | |
for order in existing_orders: | |
if order.symbol in symbols: | |
api.cancel_order(order.id) | |
stop_prices = {} | |
latest_cost_basis = {} | |
# Track any positions bought during previous executions | |
existing_positions = api.list_positions() | |
for position in existing_positions: | |
if position.symbol in symbols: | |
positions[position.symbol] = float(position.qty) | |
# Recalculate cost basis and stop price | |
latest_cost_basis[position.symbol] = float(position.cost_basis) | |
stop_prices[position.symbol] = ( | |
float(position.cost_basis) * default_stop | |
) | |
# Keep track of what we're buying/selling | |
target_prices = {} | |
partial_fills = {} | |
# Use trade updates to keep track of our portfolio | |
@conn.on(r'trade_update') | |
async def handle_trade_update(conn, channel, data): | |
symbol = data.order['symbol'] | |
last_order = open_orders.get(symbol) | |
if last_order is not None: | |
event = data.event | |
if event == 'partial_fill': | |
qty = int(data.order['filled_qty']) | |
if data.order['side'] == 'sell': | |
qty = qty * -1 | |
positions[symbol] = ( | |
positions.get(symbol, 0) - partial_fills.get(symbol, 0) | |
) | |
partial_fills[symbol] = qty | |
positions[symbol] += qty | |
open_orders[symbol] = data.order | |
elif event == 'fill': | |
qty = int(data.order['filled_qty']) | |
if data.order['side'] == 'sell': | |
qty = qty * -1 | |
positions[symbol] = ( | |
positions.get(symbol, 0) - partial_fills.get(symbol, 0) | |
) | |
partial_fills[symbol] = 0 | |
positions[symbol] += qty | |
open_orders[symbol] = None | |
elif event == 'canceled' or event == 'rejected': | |
partial_fills[symbol] = 0 | |
open_orders[symbol] = None | |
@conn.on(r'A$') | |
async def handle_second_bar(conn, channel, data): | |
symbol = data.symbol | |
# First, aggregate 1s bars for up-to-date MACD calculations | |
ts = data.start | |
ts -= timedelta(seconds=ts.second, microseconds=ts.microsecond) | |
try: | |
current = minute_history[data.symbol].loc[ts] | |
except KeyError: | |
current = None | |
new_data = [] | |
if current is None: | |
new_data = [ | |
data.open, | |
data.high, | |
data.low, | |
data.close, | |
data.volume | |
] | |
else: | |
new_data = [ | |
current.open, | |
data.high if data.high > current.high else current.high, | |
data.low if data.low < current.low else current.low, | |
data.close, | |
current.volume + data.volume | |
] | |
minute_history[symbol].loc[ts] = new_data | |
# Next, check for existing orders for the stock | |
existing_order = open_orders.get(symbol) | |
if existing_order is not None: | |
# Make sure the order's not too old | |
submission_ts = existing_order.submitted_at.astimezone( | |
timezone('America/New_York') | |
) | |
order_lifetime = ts - submission_ts | |
if order_lifetime.seconds // 60 > 1: | |
# Cancel it so we can try again for a fill | |
api.cancel_order(existing_order.id) | |
return | |
# Now we check to see if it might be time to buy or sell | |
since_market_open = ts - market_open_dt | |
until_market_close = market_close_dt - ts | |
if ( | |
since_market_open.seconds // 60 > 15 and | |
since_market_open.seconds // 60 < 60 | |
): | |
# Check for buy signals | |
# See if we've already bought in first | |
position = positions.get(symbol, 0) | |
if position > 0: | |
return | |
# See how high the price went during the first 15 minutes | |
lbound = market_open_dt | |
ubound = lbound + timedelta(minutes=15) | |
high_15m = 0 | |
try: | |
high_15m = minute_history[symbol][lbound:ubound]['high'].max() | |
except Exception as e: | |
# Because we're aggregating on the fly, sometimes the datetime | |
# index can get messy until it's healed by the minute bars | |
return | |
# Get the change since yesterday's market close | |
daily_pct_change = ( | |
(data.close - prev_closes[symbol]) / prev_closes[symbol] | |
) | |
if ( | |
daily_pct_change > .04 and | |
data.close > high_15m and | |
volume_today[symbol] > 30000 | |
): | |
# check for a positive, increasing MACD | |
hist = macd( | |
minute_history[symbol]['close'].dropna(), | |
n_fast=12, | |
n_slow=26 | |
) | |
if ( | |
hist[-1] < 0 or | |
not (hist[-3] < hist[-2] < hist[-1]) | |
): | |
return | |
hist = macd( | |
minute_history[symbol]['close'].dropna(), | |
n_fast=40, | |
n_slow=60 | |
) | |
if hist[-1] < 0 or np.diff(hist)[-1] < 0: | |
return | |
# Stock has passed all checks; figure out how much to buy | |
stop_price = find_stop( | |
data.close, minute_history[symbol], ts | |
) | |
stop_prices[symbol] = stop_price | |
target_prices[symbol] = data.close + ( | |
(data.close - stop_price) * 3 | |
) | |
shares_to_buy = portfolio_value * risk // ( | |
data.close - stop_price | |
) | |
if shares_to_buy == 0: | |
shares_to_buy = 1 | |
shares_to_buy -= positions.get(symbol, 0) | |
if shares_to_buy <= 0: | |
return | |
print('Submitting buy for {} shares of {} at {}'.format( | |
shares_to_buy, symbol, data.close | |
)) | |
try: | |
o = api.submit_order( | |
symbol=symbol, qty=str(shares_to_buy), side='buy', | |
type='limit', time_in_force='day', | |
limit_price=str(data.close) | |
) | |
open_orders[symbol] = o | |
latest_cost_basis[symbol] = data.close | |
except Exception as e: | |
print(e) | |
return | |
if( | |
since_market_open.seconds // 60 >= 24 and | |
until_market_close.seconds // 60 > 15 | |
): | |
# Check for liquidation signals | |
# We can't liquidate if there's no position | |
position = positions.get(symbol, 0) | |
if position == 0: | |
return | |
# Sell for a loss if it's fallen below our stop price | |
# Sell for a loss if it's below our cost basis and MACD < 0 | |
# Sell for a profit if it's above our target price | |
hist = macd( | |
minute_history[symbol]['close'].dropna(), | |
n_fast=13, | |
n_slow=21 | |
) | |
if ( | |
data.close <= stop_prices[symbol] or | |
(data.close >= target_prices[symbol] and hist[-1] <= 0) or | |
(data.close <= latest_cost_basis[symbol] and hist[-1] <= 0) | |
): | |
print('Submitting sell for {} shares of {} at {}'.format( | |
position, symbol, data.close | |
)) | |
try: | |
o = api.submit_order( | |
symbol=symbol, qty=str(position), side='sell', | |
type='limit', time_in_force='day', | |
limit_price=str(data.close) | |
) | |
open_orders[symbol] = o | |
latest_cost_basis[symbol] = data.close | |
except Exception as e: | |
print(e) | |
return | |
elif ( | |
until_market_close.seconds // 60 <= 15 | |
): | |
# Liquidate remaining positions on watched symbols at market | |
try: | |
position = api.get_position(symbol) | |
except Exception as e: | |
# Exception here indicates that we have no position | |
return | |
print('Trading over, liquidating remaining position in {}'.format( | |
symbol) | |
) | |
api.submit_order( | |
symbol=symbol, qty=position.qty, side='sell', | |
type='market', time_in_force='day' | |
) | |
symbols.remove(symbol) | |
if len(symbols) <= 0: | |
conn.close() | |
conn.deregister([ | |
'A.{}'.format(symbol), | |
'AM.{}'.format(symbol) | |
]) | |
# Replace aggregated 1s bars with incoming 1m bars | |
@conn.on(r'AM$') | |
async def handle_minute_bar(conn, channel, data): | |
ts = data.start | |
ts -= timedelta(microseconds=ts.microsecond) | |
minute_history[data.symbol].loc[ts] = [ | |
data.open, | |
data.high, | |
data.low, | |
data.close, | |
data.volume | |
] | |
volume_today[data.symbol] += data.volume | |
channels = ['trade_updates'] | |
for symbol in symbols: | |
symbol_channels = ['A.{}'.format(symbol), 'AM.{}'.format(symbol)] | |
channels += symbol_channels | |
print('Watching {} symbols.'.format(len(symbols))) | |
run_ws(conn, channels) | |
# Handle failed websocket connections by reconnecting | |
def run_ws(conn, channels): | |
try: | |
conn.run(channels) | |
except Exception as e: | |
print(e) | |
conn.close() | |
run_ws(conn, channels) | |
if __name__ == "__main__": | |
# Get when the market opens or opened today | |
nyc = timezone('America/New_York') | |
today = datetime.today().astimezone(nyc) | |
today_str = datetime.today().astimezone(nyc).strftime('%Y-%m-%d') | |
calendar = api.get_calendar(start=today_str, end=today_str)[0] | |
market_open = today.replace( | |
hour=calendar.open.hour, | |
minute=calendar.open.minute, | |
second=0 | |
) | |
market_open = market_open.astimezone(nyc) | |
market_close = today.replace( | |
hour=calendar.close.hour, | |
minute=calendar.close.minute, | |
second=0 | |
) | |
market_close = market_close.astimezone(nyc) | |
# Wait until just before we might want to trade | |
current_dt = datetime.today().astimezone(nyc) | |
since_market_open = current_dt - market_open | |
while since_market_open.seconds // 60 <= 14: | |
time.sleep(1) | |
since_market_open = current_dt - market_open | |
run(get_tickers(), market_open, market_close) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment