Skip to content

Instantly share code, notes, and snippets.

@18182324
Last active May 29, 2025 13:35
Show Gist options
  • Save 18182324/c3cd48a9a5e11056e1efba16bfa3f42c to your computer and use it in GitHub Desktop.
Save 18182324/c3cd48a9a5e11056e1efba16bfa3f42c to your computer and use it in GitHub Desktop.
import alpaca_trade_api as tradeapi
from ta.trend import MACD
import pandas as pd
import numpy as np
import time
import datetime as dt
import logging
# ============ CONFIGURATION ============
# Replace with your actual Alpaca API credentials
API_KEY = "YOUR_API_KEY"
API_SECRET = "YOUR_API_SECRET"
BASE_URL = "https://paper-api.alpaca.markets" # Use live URL for real trading
# Define your trading parameters and universe
SYMBOLS = ["AAPL", "AMD", "TSLA", "NVDA"] # Watchlist
RISK_PER_TRADE = 0.01 # Max portfolio percentage at risk per trade
STOP_LOSS_BUFFER = 0.05 # 5% below entry price
TAKE_PROFIT_BUFFER = 0.10 # 10% above entry price
TRADE_START_DELAY = 15 # Minutes after market open to begin trading
TRADE_END_DELAY = 60 # Stop buying after 1 hour of open
DAY_END_BUFFER = 15 # Close positions 15 minutes before market close
POSITION_SIZE = 1000 # Fixed dollar amount per trade
# ============ INITIAL SETUP ============
# Initialize logger and Alpaca API client
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
api = tradeapi.REST(API_KEY, API_SECRET, BASE_URL, api_version="v2")
# ============ HELPER FUNCTIONS ============
def get_market_open_close():
"""Fetch market open and close times for today."""
clock = api.get_clock()
today = clock.timestamp.date()
calendar = api.get_calendar(start=today, end=today)[0]
return pd.to_datetime(calendar.open), pd.to_datetime(calendar.close)
def get_data(symbol, limit=100):
"""Retrieve intraday bar data for a given stock."""
barset = api.get_bars(symbol, tradeapi.TimeFrame.Minute, limit=limit)
df = pd.DataFrame([bar.__dict__ for bar in barset])
df["t"] = pd.to_datetime(df["t"])
df.set_index("t", inplace=True)
df = df[["o", "h", "l", "c", "v"]]
df.columns = ["open", "high", "low", "close", "volume"]
return df
def apply_macd(df):
"""Add MACD indicator values to the dataframe."""
macd = MACD(close=df["close"], window_slow=26, window_fast=12, window_sign=9)
df["macd"] = macd.macd()
df["macd_signal"] = macd.macd_signal()
df["macd_diff"] = macd.macd_diff()
return df
def should_buy(df):
"""Determine whether conditions for buying are met."""
current = df.iloc[-1]
prev = df.iloc[-2]
price_change = (current["close"] - prev["close"]) / prev["close"]
breakout = current["close"] > df.iloc[:15]["high"].max() # High of first 15 mins
macd_signal = current["macd"] > 0 and current["macd"] > prev["macd"] # MACD positive and rising
return price_change > 0.04 and breakout and macd_signal
def calculate_qty(price):
"""Compute how many shares to buy based on position size."""
return int(POSITION_SIZE / price)
def place_bracket_order(symbol, qty, entry_price):
"""Place a bracket order with stop-loss and take-profit levels."""
try:
stop_price = round(entry_price * (1 - STOP_LOSS_BUFFER), 2)
take_profit_price = round(entry_price * (1 + TAKE_PROFIT_BUFFER), 2)
api.submit_order(
symbol=symbol,
qty=qty,
side="buy",
type="market",
time_in_force="day",
order_class="bracket",
stop_loss={"stop_price": stop_price},
take_profit={"limit_price": take_profit_price}
)
logger.info(f"Order placed: {symbol} | Entry: {entry_price} | SL: {stop_price} | TP: {take_profit_price}")
except Exception as e:
logger.error(f"Failed to place bracket order for {symbol}: {e}")
def liquidate_positions():
"""Sell all current open positions at market price."""
positions = api.list_positions()
for pos in positions:
try:
api.submit_order(
symbol=pos.symbol,
qty=pos.qty,
side="sell",
type="market",
time_in_force="day"
)
logger.info(f"Liquidated: {pos.symbol}")
except Exception as e:
logger.error(f"Error liquidating {pos.symbol}: {e}")
# ============ MAIN EXECUTION LOOP ============
if __name__ == "__main__":
open_time, close_time = get_market_open_close()
start_buy = open_time + dt.timedelta(minutes=TRADE_START_DELAY)
end_buy = open_time + dt.timedelta(minutes=TRADE_END_DELAY)
final_sell = close_time - dt.timedelta(minutes=DAY_END_BUFFER)
bought = set() # Track purchased tickers
logger.info(f"Trading window: {start_buy.time()} to {end_buy.time()}")
logger.info(f"Liquidation time: {final_sell.time()} | Market close: {close_time.time()}")
while dt.datetime.now(dt.timezone.utc) < close_time:
now = dt.datetime.now(dt.timezone.utc)
# Check for buys during entry window
if start_buy <= now <= end_buy:
for symbol in SYMBOLS:
if symbol in bought:
continue
df = get_data(symbol, limit=50)
df = apply_macd(df)
if should_buy(df):
price = df.iloc[-1]["close"]
qty = calculate_qty(price)
place_bracket_order(symbol, qty, price)
bought.add(symbol)
# Liquidate positions before close
elif now >= final_sell:
logger.info("Closing all positions before end of day...")
liquidate_positions()
break
time.sleep(30) # Reduce API calls with delay
logger.info("Session ended. Strategy execution complete.")
#requirements
#alpaca-trade-api>=0.25
#ta
#sklearn
import alpaca_trade_api as tradeapi
import requests
import time
from ta import macd
import numpy as np
from datetime import datetime, timedelta
from pytz import timezone
# Replace these with your API connection info from the dashboard
base_url = 'Your API URL'
api_key_id = 'Your API Key'
api_secret = 'Your API Secret'
api = tradeapi.REST(
base_url=base_url,
key_id=api_key_id,
secret_key=api_secret
)
session = requests.session()
# We only consider stocks with per-share prices inside this range
min_share_price = 2.0
max_share_price = 13.0
# Minimum previous-day dollar volume for a stock we might consider
min_last_dv = 500000
# Stop limit to default to
default_stop = .95
# How much of our portfolio to allocate to any one position
risk = 0.001
def get_1000m_history_data(symbols):
print('Getting historical data...')
minute_history = {}
c = 0
for symbol in symbols:
minute_history[symbol] = api.polygon.historic_agg(
size="minute", symbol=symbol, limit=1000
).df
c += 1
print('{}/{}'.format(c, len(symbols)))
print('Success.')
return minute_history
def get_tickers():
print('Getting current ticker data...')
tickers = api.polygon.all_tickers()
print('Success.')
assets = api.list_assets()
symbols = [asset.symbol for asset in assets if asset.tradable]
return [ticker for ticker in tickers if (
ticker.ticker in symbols and
ticker.lastTrade['p'] >= min_share_price and
ticker.lastTrade['p'] <= max_share_price and
ticker.prevDay['v'] * ticker.lastTrade['p'] > min_last_dv and
ticker.todaysChangePerc >= 3.5
)]
def find_stop(current_value, minute_history, now):
series = minute_history['low'][-100:] \
.dropna().resample('5min').min()
series = series[now.floor('1D'):]
diff = np.diff(series.values)
low_index = np.where((diff[:-1] <= 0) & (diff[1:] > 0))[0] + 1
if len(low_index) > 0:
return series[low_index[-1]] - 0.01
return current_value * default_stop
def run(tickers, market_open_dt, market_close_dt):
# Establish streaming connection
conn = tradeapi.StreamConn(base_url=base_url, key_id=api_key_id, secret_key=api_secret)
# Update initial state with information from tickers
volume_today = {}
prev_closes = {}
for ticker in tickers:
symbol = ticker.ticker
prev_closes[symbol] = ticker.prevDay['c']
volume_today[symbol] = ticker.day['v']
symbols = [ticker.ticker for ticker in tickers]
print('Tracking {} symbols.'.format(len(symbols)))
minute_history = get_1000m_history_data(symbols)
portfolio_value = float(api.get_account().portfolio_value)
open_orders = {}
positions = {}
# Cancel any existing open orders on watched symbols
existing_orders = api.list_orders(limit=500)
for order in existing_orders:
if order.symbol in symbols:
api.cancel_order(order.id)
stop_prices = {}
latest_cost_basis = {}
# Track any positions bought during previous executions
existing_positions = api.list_positions()
for position in existing_positions:
if position.symbol in symbols:
positions[position.symbol] = float(position.qty)
# Recalculate cost basis and stop price
latest_cost_basis[position.symbol] = float(position.cost_basis)
stop_prices[position.symbol] = (
float(position.cost_basis) * default_stop
)
# Keep track of what we're buying/selling
target_prices = {}
partial_fills = {}
# Use trade updates to keep track of our portfolio
@conn.on(r'trade_update')
async def handle_trade_update(conn, channel, data):
symbol = data.order['symbol']
last_order = open_orders.get(symbol)
if last_order is not None:
event = data.event
if event == 'partial_fill':
qty = int(data.order['filled_qty'])
if data.order['side'] == 'sell':
qty = qty * -1
positions[symbol] = (
positions.get(symbol, 0) - partial_fills.get(symbol, 0)
)
partial_fills[symbol] = qty
positions[symbol] += qty
open_orders[symbol] = data.order
elif event == 'fill':
qty = int(data.order['filled_qty'])
if data.order['side'] == 'sell':
qty = qty * -1
positions[symbol] = (
positions.get(symbol, 0) - partial_fills.get(symbol, 0)
)
partial_fills[symbol] = 0
positions[symbol] += qty
open_orders[symbol] = None
elif event == 'canceled' or event == 'rejected':
partial_fills[symbol] = 0
open_orders[symbol] = None
@conn.on(r'A$')
async def handle_second_bar(conn, channel, data):
symbol = data.symbol
# First, aggregate 1s bars for up-to-date MACD calculations
ts = data.start
ts -= timedelta(seconds=ts.second, microseconds=ts.microsecond)
try:
current = minute_history[data.symbol].loc[ts]
except KeyError:
current = None
new_data = []
if current is None:
new_data = [
data.open,
data.high,
data.low,
data.close,
data.volume
]
else:
new_data = [
current.open,
data.high if data.high > current.high else current.high,
data.low if data.low < current.low else current.low,
data.close,
current.volume + data.volume
]
minute_history[symbol].loc[ts] = new_data
# Next, check for existing orders for the stock
existing_order = open_orders.get(symbol)
if existing_order is not None:
# Make sure the order's not too old
submission_ts = existing_order.submitted_at.astimezone(
timezone('America/New_York')
)
order_lifetime = ts - submission_ts
if order_lifetime.seconds // 60 > 1:
# Cancel it so we can try again for a fill
api.cancel_order(existing_order.id)
return
# Now we check to see if it might be time to buy or sell
since_market_open = ts - market_open_dt
until_market_close = market_close_dt - ts
if (
since_market_open.seconds // 60 > 15 and
since_market_open.seconds // 60 < 60
):
# Check for buy signals
# See if we've already bought in first
position = positions.get(symbol, 0)
if position > 0:
return
# See how high the price went during the first 15 minutes
lbound = market_open_dt
ubound = lbound + timedelta(minutes=15)
high_15m = 0
try:
high_15m = minute_history[symbol][lbound:ubound]['high'].max()
except Exception as e:
# Because we're aggregating on the fly, sometimes the datetime
# index can get messy until it's healed by the minute bars
return
# Get the change since yesterday's market close
daily_pct_change = (
(data.close - prev_closes[symbol]) / prev_closes[symbol]
)
if (
daily_pct_change > .04 and
data.close > high_15m and
volume_today[symbol] > 30000
):
# check for a positive, increasing MACD
hist = macd(
minute_history[symbol]['close'].dropna(),
n_fast=12,
n_slow=26
)
if (
hist[-1] < 0 or
not (hist[-3] < hist[-2] < hist[-1])
):
return
hist = macd(
minute_history[symbol]['close'].dropna(),
n_fast=40,
n_slow=60
)
if hist[-1] < 0 or np.diff(hist)[-1] < 0:
return
# Stock has passed all checks; figure out how much to buy
stop_price = find_stop(
data.close, minute_history[symbol], ts
)
stop_prices[symbol] = stop_price
target_prices[symbol] = data.close + (
(data.close - stop_price) * 3
)
shares_to_buy = portfolio_value * risk // (
data.close - stop_price
)
if shares_to_buy == 0:
shares_to_buy = 1
shares_to_buy -= positions.get(symbol, 0)
if shares_to_buy <= 0:
return
print('Submitting buy for {} shares of {} at {}'.format(
shares_to_buy, symbol, data.close
))
try:
o = api.submit_order(
symbol=symbol, qty=str(shares_to_buy), side='buy',
type='limit', time_in_force='day',
limit_price=str(data.close)
)
open_orders[symbol] = o
latest_cost_basis[symbol] = data.close
except Exception as e:
print(e)
return
if(
since_market_open.seconds // 60 >= 24 and
until_market_close.seconds // 60 > 15
):
# Check for liquidation signals
# We can't liquidate if there's no position
position = positions.get(symbol, 0)
if position == 0:
return
# Sell for a loss if it's fallen below our stop price
# Sell for a loss if it's below our cost basis and MACD < 0
# Sell for a profit if it's above our target price
hist = macd(
minute_history[symbol]['close'].dropna(),
n_fast=13,
n_slow=21
)
if (
data.close <= stop_prices[symbol] or
(data.close >= target_prices[symbol] and hist[-1] <= 0) or
(data.close <= latest_cost_basis[symbol] and hist[-1] <= 0)
):
print('Submitting sell for {} shares of {} at {}'.format(
position, symbol, data.close
))
try:
o = api.submit_order(
symbol=symbol, qty=str(position), side='sell',
type='limit', time_in_force='day',
limit_price=str(data.close)
)
open_orders[symbol] = o
latest_cost_basis[symbol] = data.close
except Exception as e:
print(e)
return
elif (
until_market_close.seconds // 60 <= 15
):
# Liquidate remaining positions on watched symbols at market
try:
position = api.get_position(symbol)
except Exception as e:
# Exception here indicates that we have no position
return
print('Trading over, liquidating remaining position in {}'.format(
symbol)
)
api.submit_order(
symbol=symbol, qty=position.qty, side='sell',
type='market', time_in_force='day'
)
symbols.remove(symbol)
if len(symbols) <= 0:
conn.close()
conn.deregister([
'A.{}'.format(symbol),
'AM.{}'.format(symbol)
])
# Replace aggregated 1s bars with incoming 1m bars
@conn.on(r'AM$')
async def handle_minute_bar(conn, channel, data):
ts = data.start
ts -= timedelta(microseconds=ts.microsecond)
minute_history[data.symbol].loc[ts] = [
data.open,
data.high,
data.low,
data.close,
data.volume
]
volume_today[data.symbol] += data.volume
channels = ['trade_updates']
for symbol in symbols:
symbol_channels = ['A.{}'.format(symbol), 'AM.{}'.format(symbol)]
channels += symbol_channels
print('Watching {} symbols.'.format(len(symbols)))
run_ws(conn, channels)
# Handle failed websocket connections by reconnecting
def run_ws(conn, channels):
try:
conn.run(channels)
except Exception as e:
print(e)
conn.close()
run_ws(conn, channels)
if __name__ == "__main__":
# Get when the market opens or opened today
nyc = timezone('America/New_York')
today = datetime.today().astimezone(nyc)
today_str = datetime.today().astimezone(nyc).strftime('%Y-%m-%d')
calendar = api.get_calendar(start=today_str, end=today_str)[0]
market_open = today.replace(
hour=calendar.open.hour,
minute=calendar.open.minute,
second=0
)
market_open = market_open.astimezone(nyc)
market_close = today.replace(
hour=calendar.close.hour,
minute=calendar.close.minute,
second=0
)
market_close = market_close.astimezone(nyc)
# Wait until just before we might want to trade
current_dt = datetime.today().astimezone(nyc)
since_market_open = current_dt - market_open
while since_market_open.seconds // 60 <= 14:
time.sleep(1)
since_market_open = current_dt - market_open
run(get_tickers(), market_open, market_close)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment