Skip to content

Instantly share code, notes, and snippets.

@madjin
Created July 22, 2025 18:51
Show Gist options
  • Select an option

  • Save madjin/400261a4325db204fc21b5c0b330ee9c to your computer and use it in GitHub Desktop.

Select an option

Save madjin/400261a4325db204fc21b5c0b330ee9c to your computer and use it in GitHub Desktop.
Listener for ai16z solana transactions containing memos, for collecting votes
#!/usr/bin/env python3
"""
memo_listener.py – Minimal version
-------------------------------------------
Fetch all ai16z transfers into the prize wallet or its ATA from a given slot onward,
then print to stdout (as table or NDJSON).
Usage:
export HELIUS_API_KEY=
export PRIZE_WALLET_ADDRESS=
python memo_listener.py --since {BLOCK)
python memo_listener.py --since {BLOCK} --json
"""
import os
import asyncio
import aiohttp
import argparse
import json
import base58
from datetime import datetime
from typing import Dict, Any, List, Set
from tabulate import tabulate
# Constants
AI16Z_MINT = "HeLp6NuQkmYB4pYWo2zYs22mESHXPQYzXbB8n4V98jwC"
HELIUS_BASE = "https://api.helius.xyz/v0"
RPC_BASE = "https://mainnet.helius-rpc.com"
MEMO_PID = "MemoSq4gqABAXKb96qnH8TysNcWxMyWCqXgDLGmfcHr"
PRIZE_WALLET = os.getenv("PRIZE_WALLET_ADDRESS")
API_KEY = os.getenv("HELIUS_API_KEY")
if not (PRIZE_WALLET and API_KEY):
raise SystemExit("❌ Set PRIZE_WALLET_ADDRESS and HELIUS_API_KEY")
def api_url(path: str) -> str:
return f"{HELIUS_BASE}{path}?api-key={API_KEY}"
def rpc_payload(method: str, params: list[Any]) -> Dict[str, Any]:
return {"jsonrpc": "2.0", "id": 1, "method": method, "params": params}
async def fetch_json(session: aiohttp.ClientSession, url: str):
async with session.get(url, timeout=30) as r:
r.raise_for_status()
return await r.json()
async def find_ata(session: aiohttp.ClientSession, wallet: str) -> str | None:
payload = rpc_payload("getTokenAccountsByOwner", [wallet, {"mint": AI16Z_MINT}, {"encoding": "jsonParsed"}])
async with session.post(f"{RPC_BASE}/?api-key={API_KEY}", json=payload) as r:
data = await r.json()
accounts = data.get("result", {}).get("value", [])
return accounts[0]["pubkey"] if accounts else None
def extract_memo(tx: Dict[str, Any]) -> str | None:
# First check top-level memo fields
if (memos := tx.get("memos")):
if isinstance(memos, list) and memos and isinstance(memos[0], str):
return memos[0].strip()
if memo := tx.get("memo"):
return memo.strip()
# Check instructions for memo program
for ix in tx.get("instructions", []):
if ix.get("programId") == MEMO_PID:
data = ix.get("data", "")
if data:
try:
# Try base58 decode first
decoded = base58.b58decode(data).decode('utf-8')
return decoded.strip()
except:
# Fallback to raw data
return data.strip() or None
return None
async def collect_from_address(session: aiohttp.ClientSession, address: str, since_slot: int, dest_accounts: Set[str]) -> List[Dict[str, Any]]:
base = f"{HELIUS_BASE}/addresses/{address}/transactions?api-key={API_KEY}&limit=100"
collected = []
before = None
while True:
url = base + (f"&before={before}" if before else "")
page = await fetch_json(session, url)
if not page:
break
for tx in page:
if tx.get("slot", 0) < since_slot:
return collected
# Check for ai16z token transfers to our destination accounts
for t in tx.get("tokenTransfers", []):
if t.get("mint") == AI16Z_MINT and t.get("toUserAccount") in dest_accounts:
collected.append(tx)
break
if not page:
break
before = page[-1]["signature"]
if len(page) < 100:
break
return collected
async def collect(session: aiohttp.ClientSession, since_slot: int, wallet: str, ata: str | None) -> List[Dict[str, Any]]:
dests = {wallet}
if ata:
dests.add(ata)
addresses = list(dests)
tasks = [collect_from_address(session, addr, since_slot, dests) for addr in addresses]
all_txs = [tx for sub in await asyncio.gather(*tasks) for tx in sub]
seen = set()
unique = []
for tx in sorted(all_txs, key=lambda x: x["slot"], reverse=True):
sig = tx["signature"]
if sig not in seen:
seen.add(sig)
unique.append(tx)
return unique
def enrich(txs: List[Dict[str, Any]]):
enriched = []
for tx in txs:
memo = extract_memo(tx)
ts_val = tx.get("timestamp")
ts_iso = datetime.fromtimestamp(ts_val).isoformat() if ts_val else ""
enriched.append({
"slot": tx.get("slot"),
"timestamp": ts_iso,
"memo": memo,
"sender": tx.get("feePayer"),
"signature": tx.get("signature"),
})
return enriched
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--since", type=int, required=True, help="Start slot")
ap.add_argument("--json", action="store_true", help="Output NDJSON")
args = ap.parse_args()
async def runner():
async with aiohttp.ClientSession() as session:
ata = await find_ata(session, PRIZE_WALLET)
if not args.json:
print(f"Using wallet: {PRIZE_WALLET}")
print(f"Detected ATA: {ata}")
raw = await collect(session, args.since, PRIZE_WALLET, ata)
rows = enrich(raw)
if args.json:
print(json.dumps(rows, indent=2))
else:
table = [
[r["slot"], r["timestamp"], r["memo"] or "∅", (r["sender"] or "")[:8] + "…", r["signature"][:16] + "…"]
for r in rows
]
print(tabulate(table, headers=["slot", "time", "memo", "sender", "sig"]))
asyncio.run(runner())
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment