Last active
July 12, 2022 05:22
-
-
Save mjpowersjr/f8225fba3ba0ff37f11ecb1e2d19381e to your computer and use it in GitHub Desktop.
This is a basic web3 / ETL demo. Note: 'eth-event-stream' package + refs would need to be replaced with your own abstraction layer.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require("dotenv").config(); | |
const axios = require('axios'); | |
const _ = require('lodash'); | |
const util = require('util'); | |
const sqlite3 = require('sqlite3').verbose(); | |
const Web3 = require('web3'); | |
// REPLACE WITH YOUR OWN web3 polling / ws ABSTRACTION! | |
const { | |
EventStream | |
} = require('eth-event-stream'); | |
const RPC_ENDPOINT = process.env.RPC_ENDPOINT; | |
const BASE_ABI_URL = "https://api.etherscan.io/api?module=contract&action=getabi&address="; | |
const CONTRACT = process.env.CONTRACT; | |
const asyncify = (target) => { | |
target.getAsync = util.promisify(target.get); | |
target.allAsync = util.promisify(target.all); | |
target.runAsync = util.promisify(target.run); | |
} | |
const fetchABI = async (address) => { | |
let abiUrl = BASE_ABI_URL + CONTRACT; | |
let r = await axios.get(abiUrl); | |
let res = _.get(r, "data.result"); | |
if (!res) { | |
throw new Error(`unable to fetch ABI from ${abiUrl}`); | |
} | |
let abi = res; | |
if (typeof abi === 'string') { | |
abi = JSON.parse(res); | |
} | |
if (!abi.length) { | |
throw new Error(`unable to parse ABI: ${res}`); | |
} | |
return abi; | |
} | |
const db = new sqlite3.Database('test.db'); | |
asyncify(db); | |
const main = async () => { | |
console.log(`setting up database...`); | |
await db.runAsync(` | |
CREATE TABLE IF NOT EXISTS settings ( | |
name TEXT, | |
value BLOB | |
)`); | |
await db.runAsync(` | |
CREATE UNIQUE INDEX IF NOT EXISTS idx_settings_name ON settings (name); | |
`); | |
await db.runAsync(` | |
CREATE TABLE IF NOT EXISTS transfer ( | |
from_address TEXT, | |
to_address TEXT, | |
tokens UNSIGNED BIG INT, | |
block_number UNSIGNED BIG INT, | |
block_timestamp DATE, | |
block_hash TEXT, | |
txn_index UNSIGNED INT, | |
txn_hash TEXT | |
)`); | |
await db.runAsync(` | |
CREATE TABLE IF NOT EXISTS approval ( | |
token_owner TEXT, | |
spender TEXT, | |
tokens UNSIGNED BIG INT, | |
block_number UNSIGNED BIG INT, | |
block_timestamp DATE, | |
block_hash TEXT, | |
txn_index UNSIGNED INT, | |
txn_hash TEXT | |
)`); | |
const recordTransfer = db.prepare(` | |
INSERT INTO transfer VALUES (?, ?, ?, ?, ?, ?, ?, ?) | |
`); | |
asyncify(recordTransfer); | |
const recordApproval = db.prepare(` | |
INSERT INTO approval VALUES (?, ?, ?, ?, ?, ?, ?, ?) | |
`); | |
asyncify(recordApproval); | |
const recordSetting = db.prepare(` | |
INSERT OR REPLACE INTO settings(name, value) VALUES(?, ?); | |
`); | |
asyncify(recordSetting); | |
const getSetting = db.prepare(` | |
SELECT value FROM settings WHERE name = ?; | |
`); | |
asyncify(getSetting); | |
console.log(`fetching abi...`); | |
const abi = await fetchABI(CONTRACT); | |
console.log(`setting up web3`); | |
const web3 = new Web3(RPC_ENDPOINT); | |
console.log(`setting up stream`); | |
// REPLACE WITH YOUR OWN web3 polling / ws ABSTRACTION! | |
const stream = new EventStream({ | |
abi, | |
address: CONTRACT, | |
web3Factory: () => new Web3(RPC_ENDPOINT) | |
}); | |
const eventLogger = async (ctx) => { | |
let txn = ctx.transaction; | |
const { blockNumber } = txn; | |
console.log({ blockNumber }); | |
}; | |
const eventRecorder = async (ctx) => { | |
let txn = ctx.transaction; | |
const { blockNumber } = txn; | |
var name = 'Block ' + blockNumber | |
var type = 'block' | |
var trans = apm.startTransaction(name, type) | |
const transferEvents = txn.logEvents['Transfer']; | |
if (transferEvents) { | |
let transferEvent = transferEvents[0]; | |
await recordTransfer.runAsync( | |
transferEvent.returnValues.from, | |
transferEvent.returnValues.to, | |
transferEvent.returnValues.tokens, | |
transferEvent.blockNumber, | |
(transferEvent.blockTimestamp || Date.now()), /* FIXME */ | |
transferEvent.blockHash, | |
transferEvent.transactionIndex, | |
transferEvent.transactionHash | |
); | |
} | |
const approvalEvents = txn.logEvents['Approval']; | |
if (approvalEvents) { | |
let approvalEvent = approvalEvents[0]; | |
await recordApproval.runAsync( | |
approvalEvent.returnValues.tokenOwner, | |
approvalEvent.returnValues.spender, | |
approvalEvent.returnValues.tokens, | |
approvalEvent.blockNumber, | |
(approvalEvent.blockTimestamp || Date.now()), /* FIXME */ | |
approvalEvent.blockHash, | |
approvalEvent.transactionIndex, | |
approvalEvent.transactionHash | |
); | |
} | |
await recordSetting.runAsync("last_block", txn.blockNumber); | |
trans.result = 'success'; | |
trans.end(); | |
}; | |
stream.use(eventLogger); | |
stream.use(eventRecorder); | |
let start = await getSetting.getAsync("last_block"); | |
if (start) { | |
start = start['value']; | |
} else { | |
const latest = await web3.eth.getBlockNumber(); | |
start = latest - 10; | |
} | |
await stream.start({ | |
fromBlock: start | |
}); | |
} | |
function exitHandler(options, exitCode) { | |
// cleanup db connection... | |
db.close(); | |
if (options.exit) { | |
process.exit(); | |
} | |
} | |
process.on('exit', exitHandler.bind(null, {cleanup:true})); | |
main().catch(console.error); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks in advance.