Created
July 22, 2025 18:51
-
-
Save madjin/400261a4325db204fc21b5c0b330ee9c to your computer and use it in GitHub Desktop.
Listener for ai16z solana transactions containing memos, for collecting votes
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| memo_listener.py – Minimal version | |
| ------------------------------------------- | |
| Fetch all ai16z transfers into the prize wallet or its ATA from a given slot onward, | |
| then print to stdout (as table or NDJSON). | |
| Usage: | |
| export HELIUS_API_KEY= | |
| export PRIZE_WALLET_ADDRESS= | |
| python memo_listener.py --since {BLOCK) | |
| python memo_listener.py --since {BLOCK} --json | |
| """ | |
| import os | |
| import asyncio | |
| import aiohttp | |
| import argparse | |
| import json | |
| import base58 | |
| from datetime import datetime | |
| from typing import Dict, Any, List, Set | |
| from tabulate import tabulate | |
| # Constants | |
| AI16Z_MINT = "HeLp6NuQkmYB4pYWo2zYs22mESHXPQYzXbB8n4V98jwC" | |
| HELIUS_BASE = "https://api.helius.xyz/v0" | |
| RPC_BASE = "https://mainnet.helius-rpc.com" | |
| MEMO_PID = "MemoSq4gqABAXKb96qnH8TysNcWxMyWCqXgDLGmfcHr" | |
| PRIZE_WALLET = os.getenv("PRIZE_WALLET_ADDRESS") | |
| API_KEY = os.getenv("HELIUS_API_KEY") | |
| if not (PRIZE_WALLET and API_KEY): | |
| raise SystemExit("❌ Set PRIZE_WALLET_ADDRESS and HELIUS_API_KEY") | |
| def api_url(path: str) -> str: | |
| return f"{HELIUS_BASE}{path}?api-key={API_KEY}" | |
| def rpc_payload(method: str, params: list[Any]) -> Dict[str, Any]: | |
| return {"jsonrpc": "2.0", "id": 1, "method": method, "params": params} | |
| async def fetch_json(session: aiohttp.ClientSession, url: str): | |
| async with session.get(url, timeout=30) as r: | |
| r.raise_for_status() | |
| return await r.json() | |
| async def find_ata(session: aiohttp.ClientSession, wallet: str) -> str | None: | |
| payload = rpc_payload("getTokenAccountsByOwner", [wallet, {"mint": AI16Z_MINT}, {"encoding": "jsonParsed"}]) | |
| async with session.post(f"{RPC_BASE}/?api-key={API_KEY}", json=payload) as r: | |
| data = await r.json() | |
| accounts = data.get("result", {}).get("value", []) | |
| return accounts[0]["pubkey"] if accounts else None | |
| def extract_memo(tx: Dict[str, Any]) -> str | None: | |
| # First check top-level memo fields | |
| if (memos := tx.get("memos")): | |
| if isinstance(memos, list) and memos and isinstance(memos[0], str): | |
| return memos[0].strip() | |
| if memo := tx.get("memo"): | |
| return memo.strip() | |
| # Check instructions for memo program | |
| for ix in tx.get("instructions", []): | |
| if ix.get("programId") == MEMO_PID: | |
| data = ix.get("data", "") | |
| if data: | |
| try: | |
| # Try base58 decode first | |
| decoded = base58.b58decode(data).decode('utf-8') | |
| return decoded.strip() | |
| except: | |
| # Fallback to raw data | |
| return data.strip() or None | |
| return None | |
| async def collect_from_address(session: aiohttp.ClientSession, address: str, since_slot: int, dest_accounts: Set[str]) -> List[Dict[str, Any]]: | |
| base = f"{HELIUS_BASE}/addresses/{address}/transactions?api-key={API_KEY}&limit=100" | |
| collected = [] | |
| before = None | |
| while True: | |
| url = base + (f"&before={before}" if before else "") | |
| page = await fetch_json(session, url) | |
| if not page: | |
| break | |
| for tx in page: | |
| if tx.get("slot", 0) < since_slot: | |
| return collected | |
| # Check for ai16z token transfers to our destination accounts | |
| for t in tx.get("tokenTransfers", []): | |
| if t.get("mint") == AI16Z_MINT and t.get("toUserAccount") in dest_accounts: | |
| collected.append(tx) | |
| break | |
| if not page: | |
| break | |
| before = page[-1]["signature"] | |
| if len(page) < 100: | |
| break | |
| return collected | |
| async def collect(session: aiohttp.ClientSession, since_slot: int, wallet: str, ata: str | None) -> List[Dict[str, Any]]: | |
| dests = {wallet} | |
| if ata: | |
| dests.add(ata) | |
| addresses = list(dests) | |
| tasks = [collect_from_address(session, addr, since_slot, dests) for addr in addresses] | |
| all_txs = [tx for sub in await asyncio.gather(*tasks) for tx in sub] | |
| seen = set() | |
| unique = [] | |
| for tx in sorted(all_txs, key=lambda x: x["slot"], reverse=True): | |
| sig = tx["signature"] | |
| if sig not in seen: | |
| seen.add(sig) | |
| unique.append(tx) | |
| return unique | |
| def enrich(txs: List[Dict[str, Any]]): | |
| enriched = [] | |
| for tx in txs: | |
| memo = extract_memo(tx) | |
| ts_val = tx.get("timestamp") | |
| ts_iso = datetime.fromtimestamp(ts_val).isoformat() if ts_val else "" | |
| enriched.append({ | |
| "slot": tx.get("slot"), | |
| "timestamp": ts_iso, | |
| "memo": memo, | |
| "sender": tx.get("feePayer"), | |
| "signature": tx.get("signature"), | |
| }) | |
| return enriched | |
| def main(): | |
| ap = argparse.ArgumentParser() | |
| ap.add_argument("--since", type=int, required=True, help="Start slot") | |
| ap.add_argument("--json", action="store_true", help="Output NDJSON") | |
| args = ap.parse_args() | |
| async def runner(): | |
| async with aiohttp.ClientSession() as session: | |
| ata = await find_ata(session, PRIZE_WALLET) | |
| if not args.json: | |
| print(f"Using wallet: {PRIZE_WALLET}") | |
| print(f"Detected ATA: {ata}") | |
| raw = await collect(session, args.since, PRIZE_WALLET, ata) | |
| rows = enrich(raw) | |
| if args.json: | |
| print(json.dumps(rows, indent=2)) | |
| else: | |
| table = [ | |
| [r["slot"], r["timestamp"], r["memo"] or "∅", (r["sender"] or "")[:8] + "…", r["signature"][:16] + "…"] | |
| for r in rows | |
| ] | |
| print(tabulate(table, headers=["slot", "time", "memo", "sender", "sig"])) | |
| asyncio.run(runner()) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment