Skip to content

Instantly share code, notes, and snippets.

@benjiqq
Created December 11, 2024 10:37
Show Gist options
  • Save benjiqq/3fb1f10b0f8bcf42e3dffe6f4be6715b to your computer and use it in GitHub Desktop.
Save benjiqq/3fb1f10b0f8bcf42e3dffe6f4be6715b to your computer and use it in GitHub Desktop.
mcaps stream example
import json
import asyncio
import signal
import os
import sys
from datetime import datetime, timezone
from dotenv import load_dotenv
import nats
from nats.aio.msg import Msg
from log_config import get_logger
from nats_util import (
ensure_consumer_exists,
delete_consumer,
)
load_dotenv()
from solana.rpc.async_api import AsyncClient
import signing
from collections import deque
import traceback
class Monitor:
"""
Monitor class manages the NATS connection, listens for transactions,
handles new pools, and manages bot instances for each pool.
"""
def __init__(self):
self.lock = asyncio.Lock()
self.logger_name = os.path.splitext(os.path.basename(__file__))[0]
self.logger = get_logger(self.logger_name)
self.logger.info("start monitor")
self.logger.info("setup client")
self.async_solana_client = AsyncClient(os.getenv("RPC_HOST"))
NOMI_RPC_URL = os.getenv(
"NOMI_RPC_URL",
"https://nozomi-preview.temporal.xyz/?c=sul-df6e83d2-fed9-4d3b-8aab-0c13f3b8ce8b",
)
self.nomi_client = AsyncClient(NOMI_RPC_URL)
self.keypair = signing.load_keypair_from_env()
# NATS Configuration
self.NATS_USER = os.getenv("NATS_USER")
self.NATS_PASSWORD = os.getenv("NATS_PASSWORD")
self.NATS_HOST = os.getenv("NATS_HOST")
self.NATS_PORT = "4222"
self.NATS_SUBJECT = "parsedtx.pumpfun"
self.nats_servers = f"nats://{self.NATS_HOST}:{self.NATS_PORT}"
# Metrics to track overall activity
self.metrics = {
"total_trades": 0,
"start_time": datetime.now(),
}
# NATS client and JetStream context
self.nc = None
self.js = None
self.latency_tracker = deque(maxlen=100)
self.latency_avg = 0
async def init_nats(self):
"""
Initializes the NATS client and JetStream context.
"""
self.logger.info(f"Connecting to NATS at {self.nats_servers}")
try:
self.nc = await nats.connect(
servers=[self.nats_servers],
user=self.NATS_USER,
password=self.NATS_PASSWORD,
connect_timeout=10,
ping_interval=60,
reconnect_time_wait=5,
max_reconnect_attempts=5,
error_cb=self.error_cb,
)
self.js = self.nc.jetstream()
self.logger.info("Connected to NATS successfully!")
except Exception as e:
self.logger.error(f"Error connecting to NATS: {e}")
sys.exit(1)
async def subscribe_nats(self):
"""
Subscribe to the NATS subject.
"""
try:
await self.js.subscribe(
subject=self.NATS_SUBJECT,
deliver_policy=nats.js.api.DeliverPolicy.LAST,
stream="PARSED_PUMPFUN",
cb=self.message_handler
)
self.logger.info(
f"Subscribed using ephemeral consumer on subject {self.NATS_SUBJECT}"
)
except nats.js.errors.Error as e:
self.logger.error(f"Subscription failed: {e}")
await self.nc.close()
sys.exit(1)
async def error_cb(self, e):
self.logger.error(f"NATS Error: {e}")
async def message_handler(self, msg: Msg):
"""
Handle incoming messages from NATS.
"""
try:
data = json.loads(msg.data.decode())
#ts = data['timestamp_service']
#timestamp = datetime.fromtimestamp(ts, tz=timezone.utc)
#now = datetime.now(timezone.utc)
# time_difference_ms = int((now - timestamp).total_seconds() * 1000)
# self.latency_tracker.append(time_difference_ms)
# self.latency_avg = sum(self.latency_tracker) / len(self.latency_tracker)
#self.logger.info(f"Average Latency (last 100 messages): {average_latency:.2f} ms")
ttype = data.get("ttype")
if ttype == "INIT":
await self.handle_pool(data)
elif ttype in ["BUY", "SELL"]:
await self.handle_swap(data)
else:
self.logger.warning(f"Unknown message type: {ttype}")
except Exception as e:
self.logger.error(f"Failed to process message: {e}")
finally:
await msg.ack()
async def handle_pool(self, pool_data):
"""
Handle INIT messages to create a new bot.
"""
token = pool_data.get("token")
self.logger.debug(f"Handling pool for token: {token} pool_data {pool_data}")
except Exception as e:
self.logger.error(f"issue with setting socials {str(e)} {traceback.print_exc()}")
async def handle_swap(self, swap_data):
"""
Handle BUY/SELL trades and route them to the appropriate bot.
"""
token = swap_data.get("token")
#self.logger.debug(f"Handling trade for token: {token}")
self.metrics["total_trades"] += 1
async def report_metrics(self):
"""
Report metrics every 10 seconds.
"""
while True:
elapsed_time = (datetime.now() - self.metrics["start_time"]).total_seconds()
self.logger.info(
f"Metrics - Total Trades: {self.metrics['total_trades']}, "
f"Uptime: {elapsed_time:.1f}s, Trades/sec: {self.metrics['total_trades']/elapsed_time:.2f} latency_avg {self.latency_avg}"
)
await asyncio.sleep(10)
async def shutdown(self, sig):
"""
Handle shutdown signals gracefully.
"""
self.logger.info(f"Received exit signal {sig.name}...")
try:
self.logger.info("Shutting down...")
#await delete_consumer(self.js, "PARSED_PUMPFUN", self.durable_consumer)
# await remove_stream_subject(self.js, "PARSED_PUMPFUN", self.NATS_SUBJECT)
except Exception as e:
self.logger.error(f"Error during shutdown: {e}")
finally:
await self.nc.close()
self.logger.info("NATS connection closed.")
asyncio.get_event_loop().stop()
async def run(self):
"""
Main coroutine to initialize NATS, subscribe to the subject,
start reporting metrics, and handle graceful shutdown.
"""
await self.init_nats()
await self.subscribe_nats()
# Start the metrics reporting task
metrics_task = asyncio.create_task(self.report_metrics())
# Register signal handlers for graceful shutdown
loop = asyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, lambda s=sig: asyncio.create_task(self.shutdown(s)))
# Keep the main coroutine alive
try:
await asyncio.Event().wait()
except asyncio.CancelledError:
pass
finally:
metrics_task.cancel()
await self.nc.close()
self.logger.info("Monitor has been shut down.")
if __name__ == "__main__":
monitor = Monitor()
try:
asyncio.run(monitor.run())
except KeyboardInterrupt:
print("Program interrupted by user. Exiting...")
except Exception as e:
print(f"Program terminated with exception: {e}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment