Skip to content

Instantly share code, notes, and snippets.

@chrispeterson3
Last active April 23, 2025 00:01
Show Gist options
  • Save chrispeterson3/ace50527037ad7307650fd7909e531cb to your computer and use it in GitHub Desktop.
Save chrispeterson3/ace50527037ad7307650fd7909e531cb to your computer and use it in GitHub Desktop.
import { polygonClient } from "@/clients/polygon";
import { inngest } from "./client";
import { getAggsPeriod } from "@/utils/get-aggs-period";
import { getAtr } from "@/ta/get-atr";
import { prisma } from "@/clients/prisma";
import { getAverageVolume } from "@/ta/get-average-volume";
import { getConsecutiveGreenDays } from "@/ta/get-consecutive-green-days";
import { getConsecutiveRedDays } from "@/ta/get-consecutive-red-days";
import { getAverageRoc } from "@/ta/get-average-roc";
import { getPreMarketSession } from "@/utils/get-pre-market-session";
import { getRegularSession } from "@/utils/get-regular-session";
export type SnapshotEventData = {
ticker: string;
open: null | number;
high: null | number;
low: null | number;
close: null | number;
volume: null | number;
vwap: null | number;
previousOpen: null | number;
previousHigh: null | number;
previousLow: null | number;
previousClose: null | number;
previousVolume: null | number;
};
type SaveSnapshotEvent = {
name: "snapshots/save";
data: SnapshotEventData;
};
type SaveStatsEvent = {
name: "stats/save";
data: {
ticker: string;
date: string;
};
};
export const dispatchSnapshots = inngest.createFunction(
{ id: "dispatch-snapshots" },
process.env.NODE_ENV === "development"
? { event: "snapshots/dispatch" }
: { cron: "1 20 * * 1-5" }, // 1 21 * * 1-5 during non-daylight saving
async () => {
const snapshots = await polygonClient.stocks.snapshotAllTickers();
if (!snapshots.tickers) return "No snapshots found";
const events: Array<SaveSnapshotEvent> = snapshots.tickers
.filter((d) => !!d.ticker)
.filter((d) => d.day?.c && d.day.c >= 1)
.filter((d) => d.day?.v && d.day.v >= 1000000)
.map((snapshot) => ({
name: "snapshots/save",
data: {
ticker: snapshot.ticker!,
open: snapshot.day?.o ?? null,
high: snapshot.day?.h ?? null,
low: snapshot.day?.l ?? null,
close: snapshot.day?.c ?? null,
volume: snapshot.day?.v ?? null,
vwap: snapshot.day?.vw ?? null,
previousOpen: snapshot.prevDay?.o ?? null,
previousHigh: snapshot.prevDay?.h ?? null,
previousLow: snapshot.prevDay?.l ?? null,
previousClose: snapshot.prevDay?.c ?? null,
previousVolume: snapshot.prevDay?.v ?? null,
},
}));
await inngest.send(events);
return `${events.length} subscriptions dispatched`;
}
);
/*
Polygon.io RE: while we do not have a specific rate limit, we do monitor usage to ensure that no
single user affects the quality of service for others. To avoid any throttling issues,
we recommend staying under 100 requests per second.
*/
export const saveSnapshot = inngest.createFunction(
{
id: "save-snapshot",
retries: 0,
throttle: {
limit: 1,
period: "2s",
burst: 75,
},
},
{ event: "snapshots/save" },
async ({ event }) => {
try {
const { ticker, ...eventData } = event.data;
// get daily bars from polygon
const aggregates = await getAggsPeriod(ticker, 500);
const intradayAggregates = await getAggsPeriod(ticker, 0, "minute");
const preMarketSession = getPreMarketSession(intradayAggregates);
const regularSession = getRegularSession(intradayAggregates);
const { open, high, low, close, previousClose, volume } = eventData;
const averageVolume = getAverageVolume(aggregates);
const fiveDaysAgoClose = aggregates.results?.[5]?.c ?? null;
const marketCap = await polygonClient.reference.tickerDetails(ticker);
const sessionResults = regularSession?.results?.reverse();
const openBar = sessionResults?.[0];
const twoMinuteBar = sessionResults?.[1];
const tenMinuteBar = sessionResults?.[9];
const tenMinuteRange = sessionResults?.slice(1, 10);
const fifteenMinuteBar = sessionResults?.[14];
const fifteenMinuteRange = sessionResults?.slice(1, 15);
const data = {
...eventData,
atr: getAtr(aggregates),
averageVolume,
change:
close && previousClose
? (close - previousClose) / previousClose
: null,
spike: high && open ? (high - open) / open : null,
pull: open && low ? (low - open) / open : null,
gap:
open && previousClose ? (open - previousClose) / previousClose : null,
consecutiveGreenDays: getConsecutiveGreenDays(aggregates),
consecutiveRedDays: getConsecutiveRedDays(aggregates),
rvol: volume && averageVolume ? volume / averageVolume : null,
marketCap: marketCap.results?.market_cap ?? null,
rateOfChange:
close && fiveDaysAgoClose
? (close - fiveDaysAgoClose) / fiveDaysAgoClose
: null,
averageRateOfChange: getAverageRoc(aggregates),
preMarketVolume: preMarketSession.results?.reduce(
(prev, curr) => prev + (curr?.v ?? 0),
0
),
oneMinuteVolume: openBar?.v ?? null,
oneMinuteVwap: openBar?.vw ?? null,
oneMinuteClose: openBar?.c ?? null,
oneMinuteHigh: openBar?.h ?? null,
oneMinuteLow: openBar?.l ?? null,
twoMinuteHigh: twoMinuteBar?.h ?? null,
twoMinuteLow: twoMinuteBar?.l ?? null,
openingRange10MinuteHigh: Math.max(
...(tenMinuteRange?.map((d) => d.h ?? 0) ?? [])
),
openingRange10MinuteLow: Math.min(
...(tenMinuteRange?.map((d) => d.l ?? 0) ?? [])
),
openingRange10MinuteClose: tenMinuteBar?.c ?? null,
openingRange15MinuteHigh: Math.max(
...(fifteenMinuteRange?.map((d) => d.h ?? 0) ?? [])
),
openingRange15MinuteLow: Math.min(
...(fifteenMinuteRange?.map((d) => d.l ?? 0) ?? [])
),
openingRange15MinuteClose: fifteenMinuteBar?.c ?? null,
};
await prisma.ticker.upsert({
where: {
ticker,
},
update: data,
create: {
ticker,
...data,
},
});
} catch (error) {
console.log(error);
throw new Error("Error saving snapshot");
}
}
);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment