Created
June 24, 2023 21:00
-
-
Save kind3r/74a2b2eeae46e943560670b19b6c7ae3 to your computer and use it in GitHub Desktop.
Streamable version of Solana's getProgramAccounts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Program } from "@project-serum/anchor"; | |
import { Connection } from "@solana/web3.js"; | |
import PQueue from "p-queue"; | |
import delay from "delay"; | |
import fetch from "node-fetch"; | |
import { parser } from 'stream-json'; | |
import { pick } from "stream-json/filters/Pick"; | |
import { ignore } from "stream-json/filters/Ignore"; | |
import { streamArray } from "stream-json/streamers/StreamArray"; | |
type ParsedAccount = { | |
pubKey: string; | |
parsed: any; | |
} | |
export async function getProgramAccountsChunks( | |
connection: Connection, | |
program: Program<any>, | |
accountIndex: number, // index of the account in the IDL | |
process: (parsedAccounts: ParsedAccount[]) => Promise<void>, // function to process the parsed accounts in chunks of 10k | |
accountProcessLimit: number = 20000, // number of chunks to split the data | |
) { | |
const endpoint = connection.rpcEndpoint; | |
const account = program.idl.accounts[accountIndex]; | |
const discriminatorFilter = program.coder.accounts.memcmp(account.name); | |
const params = { | |
jsonrpc: "2.0", | |
id: 1, | |
method: "getProgramAccounts", | |
params: [ | |
program.programId.toBase58(), | |
{ | |
encoding: "base64", | |
filters: [ | |
{ | |
"memcmp": discriminatorFilter | |
} | |
] | |
} | |
] | |
} | |
const paramsStr = JSON.stringify(params); | |
try { | |
let startTime = Date.now(); | |
const res = await fetch(endpoint, { | |
method: "POST", | |
body: paramsStr | |
}); | |
if (res.ok) { | |
console.log(`[getProgramAccountsChunks:${program.programId.toBase58()}] Response is OK and took ${(Date.now() - startTime) / 1000} seconds`); | |
const processPipeline = () => { | |
return new Promise<void>((resolve) => { | |
const queue = new PQueue({ | |
autoStart: true, | |
concurrency: 1 | |
}); | |
let accounts = 0; | |
const pipeline = res.body | |
.pipe(parser()) | |
.pipe(pick({ filter: 'result' })) | |
.pipe(ignore({ filter: /(owner|executable|lamports|rentEpoch)/i }),) | |
.pipe(streamArray()); | |
let allAccounts: any[] = []; | |
pipeline.on("data", (data) => { | |
if (data.value) { | |
allAccounts.push(data.value); | |
accounts++; | |
if (allAccounts.length >= accountProcessLimit) { | |
const tmpAccounts = allAccounts.splice(Math.max(allAccounts.length - accountProcessLimit, 0), accountProcessLimit); | |
queue.add(async () => { | |
await processAccounts(program, account, tmpAccounts, process); | |
}); | |
} | |
} | |
}); | |
pipeline.on("end", async () => { | |
console.log(`[getProgramAccountsChunks:${program.programId.toBase58()}] Read ${accounts} accounts`); | |
while (allAccounts.length > 0) { | |
const tmpAccounts = allAccounts.splice(Math.max(allAccounts.length - accountProcessLimit, 0), accountProcessLimit); | |
queue.add(async () => { | |
await processAccounts(program, account, tmpAccounts, process); | |
}); | |
} | |
do { | |
await delay(1000); | |
} while (queue.size + queue.pending > 0); | |
resolve(); | |
}); | |
}); | |
} | |
await processPipeline(); | |
} else { | |
console.warn(`Response is NOT OK and took ${(Date.now() - startTime) / 1000} seconds`); | |
} | |
} catch (error) { | |
console.error(error); | |
} | |
} | |
async function processAccounts( | |
program: Program<any>, | |
account: any, | |
accounts: any[], | |
process: (parsedAccounts: ParsedAccount[]) => Promise<void> | |
): Promise<void> { | |
let parsedAccounts: ParsedAccount[] = []; | |
for (const tmp of accounts) { | |
try { | |
parsedAccounts.push({ | |
pubKey: tmp.pubkey, | |
parsed: program.coder.accounts.decode(account.name, Buffer.from(tmp.account.data[0], "base64")) | |
}); | |
} catch (error) { | |
console.error(error); | |
} | |
} | |
await process(parsedAccounts); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment