Created
December 11, 2024 10:37
-
-
Save benjiqq/3fb1f10b0f8bcf42e3dffe6f4be6715b to your computer and use it in GitHub Desktop.
mcaps stream example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import asyncio | |
import signal | |
import os | |
import sys | |
from datetime import datetime, timezone | |
from dotenv import load_dotenv | |
import nats | |
from nats.aio.msg import Msg | |
from log_config import get_logger | |
from nats_util import ( | |
ensure_consumer_exists, | |
delete_consumer, | |
) | |
load_dotenv() | |
from solana.rpc.async_api import AsyncClient | |
import signing | |
from collections import deque | |
import traceback | |
class Monitor: | |
""" | |
Monitor class manages the NATS connection, listens for transactions, | |
handles new pools, and manages bot instances for each pool. | |
""" | |
def __init__(self): | |
self.lock = asyncio.Lock() | |
self.logger_name = os.path.splitext(os.path.basename(__file__))[0] | |
self.logger = get_logger(self.logger_name) | |
self.logger.info("start monitor") | |
self.logger.info("setup client") | |
self.async_solana_client = AsyncClient(os.getenv("RPC_HOST")) | |
NOMI_RPC_URL = os.getenv( | |
"NOMI_RPC_URL", | |
"https://nozomi-preview.temporal.xyz/?c=sul-df6e83d2-fed9-4d3b-8aab-0c13f3b8ce8b", | |
) | |
self.nomi_client = AsyncClient(NOMI_RPC_URL) | |
self.keypair = signing.load_keypair_from_env() | |
# NATS Configuration | |
self.NATS_USER = os.getenv("NATS_USER") | |
self.NATS_PASSWORD = os.getenv("NATS_PASSWORD") | |
self.NATS_HOST = os.getenv("NATS_HOST") | |
self.NATS_PORT = "4222" | |
self.NATS_SUBJECT = "parsedtx.pumpfun" | |
self.nats_servers = f"nats://{self.NATS_HOST}:{self.NATS_PORT}" | |
# Metrics to track overall activity | |
self.metrics = { | |
"total_trades": 0, | |
"start_time": datetime.now(), | |
} | |
# NATS client and JetStream context | |
self.nc = None | |
self.js = None | |
self.latency_tracker = deque(maxlen=100) | |
self.latency_avg = 0 | |
async def init_nats(self): | |
""" | |
Initializes the NATS client and JetStream context. | |
""" | |
self.logger.info(f"Connecting to NATS at {self.nats_servers}") | |
try: | |
self.nc = await nats.connect( | |
servers=[self.nats_servers], | |
user=self.NATS_USER, | |
password=self.NATS_PASSWORD, | |
connect_timeout=10, | |
ping_interval=60, | |
reconnect_time_wait=5, | |
max_reconnect_attempts=5, | |
error_cb=self.error_cb, | |
) | |
self.js = self.nc.jetstream() | |
self.logger.info("Connected to NATS successfully!") | |
except Exception as e: | |
self.logger.error(f"Error connecting to NATS: {e}") | |
sys.exit(1) | |
async def subscribe_nats(self): | |
""" | |
Subscribe to the NATS subject. | |
""" | |
try: | |
await self.js.subscribe( | |
subject=self.NATS_SUBJECT, | |
deliver_policy=nats.js.api.DeliverPolicy.LAST, | |
stream="PARSED_PUMPFUN", | |
cb=self.message_handler | |
) | |
self.logger.info( | |
f"Subscribed using ephemeral consumer on subject {self.NATS_SUBJECT}" | |
) | |
except nats.js.errors.Error as e: | |
self.logger.error(f"Subscription failed: {e}") | |
await self.nc.close() | |
sys.exit(1) | |
async def error_cb(self, e): | |
self.logger.error(f"NATS Error: {e}") | |
async def message_handler(self, msg: Msg): | |
""" | |
Handle incoming messages from NATS. | |
""" | |
try: | |
data = json.loads(msg.data.decode()) | |
#ts = data['timestamp_service'] | |
#timestamp = datetime.fromtimestamp(ts, tz=timezone.utc) | |
#now = datetime.now(timezone.utc) | |
# time_difference_ms = int((now - timestamp).total_seconds() * 1000) | |
# self.latency_tracker.append(time_difference_ms) | |
# self.latency_avg = sum(self.latency_tracker) / len(self.latency_tracker) | |
#self.logger.info(f"Average Latency (last 100 messages): {average_latency:.2f} ms") | |
ttype = data.get("ttype") | |
if ttype == "INIT": | |
await self.handle_pool(data) | |
elif ttype in ["BUY", "SELL"]: | |
await self.handle_swap(data) | |
else: | |
self.logger.warning(f"Unknown message type: {ttype}") | |
except Exception as e: | |
self.logger.error(f"Failed to process message: {e}") | |
finally: | |
await msg.ack() | |
async def handle_pool(self, pool_data): | |
""" | |
Handle INIT messages to create a new bot. | |
""" | |
token = pool_data.get("token") | |
self.logger.debug(f"Handling pool for token: {token} pool_data {pool_data}") | |
except Exception as e: | |
self.logger.error(f"issue with setting socials {str(e)} {traceback.print_exc()}") | |
async def handle_swap(self, swap_data): | |
""" | |
Handle BUY/SELL trades and route them to the appropriate bot. | |
""" | |
token = swap_data.get("token") | |
#self.logger.debug(f"Handling trade for token: {token}") | |
self.metrics["total_trades"] += 1 | |
async def report_metrics(self): | |
""" | |
Report metrics every 10 seconds. | |
""" | |
while True: | |
elapsed_time = (datetime.now() - self.metrics["start_time"]).total_seconds() | |
self.logger.info( | |
f"Metrics - Total Trades: {self.metrics['total_trades']}, " | |
f"Uptime: {elapsed_time:.1f}s, Trades/sec: {self.metrics['total_trades']/elapsed_time:.2f} latency_avg {self.latency_avg}" | |
) | |
await asyncio.sleep(10) | |
async def shutdown(self, sig): | |
""" | |
Handle shutdown signals gracefully. | |
""" | |
self.logger.info(f"Received exit signal {sig.name}...") | |
try: | |
self.logger.info("Shutting down...") | |
#await delete_consumer(self.js, "PARSED_PUMPFUN", self.durable_consumer) | |
# await remove_stream_subject(self.js, "PARSED_PUMPFUN", self.NATS_SUBJECT) | |
except Exception as e: | |
self.logger.error(f"Error during shutdown: {e}") | |
finally: | |
await self.nc.close() | |
self.logger.info("NATS connection closed.") | |
asyncio.get_event_loop().stop() | |
async def run(self): | |
""" | |
Main coroutine to initialize NATS, subscribe to the subject, | |
start reporting metrics, and handle graceful shutdown. | |
""" | |
await self.init_nats() | |
await self.subscribe_nats() | |
# Start the metrics reporting task | |
metrics_task = asyncio.create_task(self.report_metrics()) | |
# Register signal handlers for graceful shutdown | |
loop = asyncio.get_running_loop() | |
for sig in (signal.SIGINT, signal.SIGTERM): | |
loop.add_signal_handler(sig, lambda s=sig: asyncio.create_task(self.shutdown(s))) | |
# Keep the main coroutine alive | |
try: | |
await asyncio.Event().wait() | |
except asyncio.CancelledError: | |
pass | |
finally: | |
metrics_task.cancel() | |
await self.nc.close() | |
self.logger.info("Monitor has been shut down.") | |
if __name__ == "__main__": | |
monitor = Monitor() | |
try: | |
asyncio.run(monitor.run()) | |
except KeyboardInterrupt: | |
print("Program interrupted by user. Exiting...") | |
except Exception as e: | |
print(f"Program terminated with exception: {e}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment