Created
July 17, 2024 01:00
-
-
Save rubpy/6b474669a4ddb82277edaaf3f1748b59 to your computer and use it in GitHub Desktop.
Crude example of a gRPC/WebSocket-based Solana token price monitor.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Big from "big.js"; | |
import * as web3 from "@solana/web3.js"; | |
import raydium from "@raydium-io/raydium-sdk"; | |
import GeyserClient from "@triton-one/yellowstone-grpc"; | |
import { TokenAccountMonitor, TokenAccountMultiState } from "./TokenAccountMonitor"; | |
////////////////////////////////////////////////// | |
(async (rpcUrl: string, grpcAuth?: [string, string]) => { | |
const GeyserClientConstructor: typeof GeyserClient = ((GeyserClient as any).default || GeyserClient); | |
const grpcConn = !grpcAuth ? null : new GeyserClientConstructor(grpcAuth[0], grpcAuth[1], {}); | |
const conn = new web3.Connection(rpcUrl, "processed"); | |
const poolAddress = new web3.PublicKey("6noCDgnA6CnXq5j3J9fiRdRAEVCNtFQAroDPwuf2jAm7"); | |
const poolRawState = await conn.getAccountInfo(poolAddress); | |
if (!poolRawState) return; | |
const poolState = raydium.LIQUIDITY_STATE_LAYOUT_V4.decode(poolRawState.data); | |
// NOTE: assuming it's a token/SOL or SOL/token pair. | |
const poolBaseAndQuoteSwapped = poolState.baseMint.toBase58() === raydium.WSOL.mint; | |
const tokenVault = poolBaseAndQuoteSwapped ? poolState.quoteVault : poolState.baseVault; | |
const tokenDecimals = Number(poolBaseAndQuoteSwapped ? poolState.quoteDecimal : poolState.baseDecimal); | |
const refVault = poolBaseAndQuoteSwapped ? poolState.baseVault : poolState.quoteVault; | |
const refDecimals = Number(poolBaseAndQuoteSwapped ? poolState.baseDecimal : poolState.quoteDecimal); | |
{ | |
const vaults = { | |
token: { | |
address: tokenVault.toBase58(), | |
decimals: tokenDecimals, | |
balance: BigInt((await conn.getTokenAccountBalance(tokenVault)).value.amount), | |
}, | |
ref: { | |
address: refVault.toBase58(), | |
decimals: refDecimals, | |
balance: BigInt((await conn.getTokenAccountBalance(refVault)).value.amount), | |
}, | |
}; | |
function amountToBig(amount: bigint, decimals: number) { | |
return new Big(String(amount)).div(new Big(10).pow(decimals)); | |
} | |
function logPrice() { | |
const price = | |
amountToBig(vaults.ref.balance, vaults.ref.decimals) | |
.div(amountToBig(vaults.token.balance, vaults.token.decimals)); | |
console.log(`[${new Date().toISOString()}] price: ${price.toFixed(10)} SOL`); | |
} | |
logPrice(); | |
const monitor = new TokenAccountMonitor(grpcConn ? { grpc: grpcConn } : { rpc: conn }); | |
await monitor.monitorSlotAssociatedAccounts( | |
[vaults.token.address, vaults.ref.address], | |
(states: TokenAccountMultiState) => { | |
vaults.token.balance = BigInt(states[vaults.token.address].data?.amount || "0"); | |
vaults.ref.balance = BigInt(states[vaults.ref.address].data?.amount || "0"); | |
logPrice(); | |
}, | |
); | |
} | |
})( | |
process.env.SOL_RPC_URL || "https://mainnet.helius-rpc.com/?api-key=00000000-0000-0000-0000-000000000000", | |
[ | |
process.env.SOL_GRPC_URL || "https://abcde-dedicated-lb.helius-rpc.com:2053/", | |
process.env.SOL_GRPC_TOKEN || "00000000-0000-0000-0000-000000000000", | |
], | |
); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import * as web3 from "@solana/web3.js"; | |
import * as spl from "@solana/spl-token"; | |
import bs58 from "bs58"; | |
import GeyserClient, { | |
SubscribeRequest as GeyserSubscribeRequest, | |
SubscribeUpdate as GeyserSubscribeUpdate, | |
CommitmentLevel as GeyserCommitmentLevel, | |
} from "@triton-one/yellowstone-grpc"; | |
import { ClientDuplexStream } from "@grpc/grpc-js"; | |
////////////////////////////////////////////////// | |
export const MissingContext = new Error("missing connection context (did you provide an RPC/gRPC client?)"); | |
export type ProviderClients = KeyedProviderClients<ProviderContexts>; | |
type KeyedProviderClients<T extends Record<keyof T, ProviderContext>> = { [P in keyof T]: T[P]["client"] }; | |
interface ProviderContext<C extends any = {}, S extends any = {}> { | |
client: C; | |
state: S; | |
} | |
interface ProviderContexts { | |
rpc: ProviderContext<web3.Connection, {}>; | |
grpc: ProviderContext<GeyserClient, { | |
stream?: ClientDuplexStream<GeyserSubscribeRequest, GeyserSubscribeUpdate>, | |
streamKey?: string, | |
subscribed?: Map<string, TokenAccountStateCallback | undefined>, | |
}>; | |
} | |
export interface SubscriptionContext { | |
rpc: number | string; | |
grpc: boolean; | |
}; | |
export type Commitment = "processed" | "confirmed" | "finalized"; | |
////////////////////////////////////////////////// | |
export interface TokenAccountState { | |
slot: number; | |
data: spl.RawAccount | null; | |
}; | |
export type TokenAccountMultiState = Record<string, TokenAccountState>; | |
export type TokenAccountStateCallback = (tokenAccount: string, state: TokenAccountState) => void; | |
export type TokenAccountMultiStateCallback = (tokenAccountsState: TokenAccountMultiState) => void; | |
export class TokenAccountMonitor { | |
protected _providers: Partial<ProviderContexts>; | |
protected _subscriptions: Map<string, Partial<SubscriptionContext>> = new Map(); | |
protected _state: Map<string, TokenAccountState> = new Map(); | |
constructor(clients: Partial<ProviderClients>) { | |
const p: Partial<ProviderContexts> = {}; | |
Object.keys(clients).forEach(k => { | |
p[k as keyof typeof this._providers] = { | |
client: clients[k as keyof typeof clients], | |
state: {}, | |
} as any; | |
}); | |
this._providers = p; | |
} | |
public async monitorSlotAssociatedAccounts(tokenAccounts: string[], callback?: TokenAccountMultiStateCallback, commitment?: Commitment): Promise<Record<string, Partial<SubscriptionContext> | null>> { | |
if (!Array.isArray(tokenAccounts) || !tokenAccounts.length) { | |
return {}; | |
} | |
const assoc = { | |
slot: 0, | |
count: 0, | |
tracked: <string[]>[], | |
stateCache: <Record<string, TokenAccountState>>{}, | |
}; | |
const converge: TokenAccountStateCallback | undefined = !callback ? undefined : | |
(tokenAccount: string, state: TokenAccountState) => { | |
if (!assoc.tracked.length || state.slot < assoc.slot) { | |
return; | |
} | |
assoc.stateCache[tokenAccount] = state; | |
if (state.slot > assoc.slot) { | |
assoc.slot = state.slot; | |
assoc.count = 1; | |
return; | |
} | |
if (++assoc.count >= assoc.tracked.length) { | |
const states: TokenAccountMultiState = {}; | |
for (const tokenAccount of assoc.tracked) { | |
if (!(tokenAccount in assoc.stateCache)) { | |
return; | |
} | |
states[tokenAccount] = assoc.stateCache[tokenAccount]; | |
} | |
callback(states); | |
assoc.slot = 0; | |
} | |
}; | |
const subs = await this.monitor(tokenAccounts, converge, commitment); | |
for (const sub in subs) { | |
if (!subs.hasOwnProperty(sub)) continue; | |
assoc.tracked.push(sub); | |
} | |
return subs; | |
} | |
public async monitor(tokenAccounts: string[], callback?: TokenAccountStateCallback, commitment?: Commitment): Promise<Record<string, Partial<SubscriptionContext> | null>> { | |
if (!Array.isArray(tokenAccounts)) { | |
throw new TypeError("tokenAccounts must be an array"); | |
} | |
tokenAccounts = tokenAccounts.filter(s => !this._subscriptions.has(s)); | |
if (!tokenAccounts.length) { | |
return {}; | |
} | |
let subs: Record<string, Partial<SubscriptionContext> | null> | null = null; | |
if (this._providers.grpc) { | |
let grpcCommitment = | |
commitment === "processed" ? GeyserCommitmentLevel.PROCESSED : | |
commitment === "confirmed" ? GeyserCommitmentLevel.CONFIRMED : | |
commitment === "finalized" ? GeyserCommitmentLevel.FINALIZED : | |
undefined; | |
subs = await subscribeGrpcAccounts(this, this._providers.grpc, tokenAccounts, callback, grpcCommitment || GeyserCommitmentLevel.PROCESSED); | |
} else if (this._providers.rpc) { | |
subs = await subscribeRpcAccounts(this, this._providers.rpc, tokenAccounts, callback, commitment); | |
} else { | |
throw MissingContext; | |
} | |
for (const sub in subs) { | |
if (!subs.hasOwnProperty(sub) || !subs[sub]) continue; | |
this._subscriptions.set(sub, subs[sub]!); | |
} | |
return subs; | |
} | |
public getAccountState(tokenAccount: string): TokenAccountState | null { | |
return this._state.get(tokenAccount) || null; | |
} | |
public updateAccountState(tokenAccount: string, slot: number, data: spl.RawAccount): TokenAccountState | null { | |
let state = this._state.get(tokenAccount) || { slot: 0, data: null }; | |
if (state && state.slot >= slot) { | |
return null; | |
} | |
state.slot = slot; | |
state.data = data; | |
this._state.set(tokenAccount, state); | |
return state; | |
} | |
}; | |
////////////////////////////////////////////////// | |
async function subscribeRpcAccounts( | |
monitor: TokenAccountMonitor, | |
rpc: ProviderContexts["rpc"], | |
tokenAccounts: string[], | |
callback?: TokenAccountStateCallback, | |
commitment?: web3.Commitment, | |
): Promise<Record<string, Partial<SubscriptionContext> | null>> { | |
const subs: Record<string, Partial<SubscriptionContext> | null> = {}; | |
for (const tokenAccount of tokenAccounts) { | |
const sub = ((monitor, tokenAccount, callback, commitment) => { | |
return rpc.client.onAccountChange(new web3.PublicKey(tokenAccount), (info: web3.AccountInfo<Buffer>, ctx: web3.Context) => { | |
if (!info || !info.data || info.data.byteLength !== spl.ACCOUNT_SIZE) { | |
return; | |
} | |
let data: spl.RawAccount | null = null; | |
try { | |
data = spl.AccountLayout.decode(info.data); | |
} catch (e) {} | |
if (!data) { | |
return; | |
} | |
const state = monitor.updateAccountState(tokenAccount, ctx.slot, data); | |
if (state && callback) { | |
callback(tokenAccount, state); | |
} | |
}, commitment); | |
})(monitor, tokenAccount, callback, commitment); | |
subs[tokenAccount] = sub ? { rpc: sub } : null; | |
} | |
return subs; | |
} | |
async function subscribeGrpcAccounts( | |
monitor: TokenAccountMonitor, | |
grpc: ProviderContexts["grpc"], | |
tokenAccounts: string[], | |
callback?: TokenAccountStateCallback, | |
commitment?: GeyserCommitmentLevel, | |
): Promise<Record<string, Partial<SubscriptionContext> | null>> { | |
const stream = (grpc.state.stream || await initializeGrpcSubscription(monitor, grpc)); | |
if (!stream || !grpc.state.subscribed) { | |
throw new Error("failed to subscribe to GeyserClient"); | |
} | |
const accounts = [...grpc.state.subscribed.keys()]; | |
const subs: Record<string, Partial<SubscriptionContext> | null> = {}; | |
for (const tokenAccount of tokenAccounts) { | |
if (!accounts.includes(tokenAccount)) { | |
accounts.push(tokenAccount); | |
} | |
grpc.state.subscribed.set(tokenAccount, callback); | |
subs[tokenAccount] = { grpc: true }; | |
} | |
const request: GeyserSubscribeRequest = { | |
accounts: { | |
[grpc.state.streamKey || ""]: { | |
account: accounts, | |
owner: [], | |
filters: [], | |
}, | |
}, | |
slots: {}, | |
transactions: {}, | |
blocks: {}, | |
blocksMeta: {}, | |
entry: {}, | |
accountsDataSlice: [], | |
ping: undefined, | |
}; | |
if (commitment) { | |
request.commitment = commitment; | |
} | |
await new Promise<void>((resolve, reject) => { | |
stream.write(request, (err: any) => { | |
if (err === null || err === undefined) { | |
resolve(); | |
} else { | |
reject(err); | |
} | |
}); | |
}).catch((reason: any) => { | |
throw reason; | |
}); | |
return subs; | |
} | |
async function initializeGrpcSubscription(monitor: TokenAccountMonitor, grpc: ProviderContexts["grpc"]): Promise<ClientDuplexStream<GeyserSubscribeRequest, GeyserSubscribeUpdate> | null> { | |
grpc.state.subscribed = new Map(); | |
grpc.state.streamKey = Date.now().toString(16); | |
const stream = await grpc.client.subscribe(); | |
if (!stream) { | |
return null; | |
} | |
stream.on("error", e => { | |
stream.end(); | |
}); | |
stream.on("data", (update: GeyserSubscribeUpdate) => { | |
if (!(update !== null && typeof update === "object") | |
|| !(Array.isArray(update.filters) && (update.filters as string[]).includes(grpc.state.streamKey || "")) | |
|| !(update.account !== null && typeof update.account === "object") | |
|| !(update.account.account && update.account.account.pubkey && update.account.slot) | |
|| !(update.account.account.data && update.account.account.data.byteLength === spl.ACCOUNT_SIZE)) { | |
return; | |
} | |
const tokenAccount = bs58.encode(update.account.account.pubkey); | |
const callback = grpc.state.subscribed ? grpc.state.subscribed.get(tokenAccount) : undefined; | |
if (typeof callback !== "function") { | |
return; | |
} | |
let data: spl.RawAccount | null = null; | |
try { | |
data = spl.AccountLayout.decode(update.account.account.data); | |
} catch (e) {} | |
if (!data) { | |
return; | |
} | |
const state = monitor.updateAccountState(tokenAccount, parseInt(update.account.slot), data); | |
if (state && callback) { | |
callback(tokenAccount, state); | |
} | |
}); | |
return stream; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment