Skip to content

Instantly share code, notes, and snippets.

@elja
Created January 15, 2022 10:50
Show Gist options
  • Save elja/7aa5087a900ba6bc751b0c6b85d252e2 to your computer and use it in GitHub Desktop.
Save elja/7aa5087a900ba6bc751b0c6b85d252e2 to your computer and use it in GitHub Desktop.
import asyncio
import random
import faust
import sentry_sdk
import logging
import time
from scheduler.schemas import trade_schema, strategy_schema, StrategyScheduler
from scheduler.db_loader import load_strategies
from faust.sensors.prometheus import setup_prometheus_sensors
from config_loader import config
logging.basicConfig(level=config.LOG_LEVEL, format=config.LOG_FORMAT)
if config.SENTRY_URL:
sentry_sdk.init(
config.SENTRY_URL,
traces_sample_rate=config.SENTRY_SAMPLES_RATE,
)
app = faust.App(
id=f"scheduler_{config.ENV}",
broker=config.KAFKA_BOOTSTRAP_SERVERS
)
setup_prometheus_sensors(app)
trades_topic = app.topic(config.TRADES_TOPIC, schema=trade_schema, partitions=8)
strategy_topic = app.topic(config.STRATEGY_TOPIC, schema=strategy_schema, partitions=8)
task_scheduler_topic = app.topic(config.TASK_SCHEDULER_TOPIC, schema=strategy_schema, partitions=8)
task_queue_topic = app.topic(config.TASK_QUEUE_TOPIC, key_type=str, partitions=8)
trades_updated_at = app.GlobalTable('trades_updates_0', default=int, partitions=8)
schedulers_meta = app.Table('schedulers_meta_0', default=dict, partitions=8)
strategies_table = app.Table('strategies_0', partitions=8)
@app.agent(trades_topic)
async def trades_stream(stream: faust.Stream):
async for trade in stream:
trades_updated_at[trade.s] = trade.t
@app.agent(strategy_topic)
async def strategies_updates(stream: faust.Stream):
async for scheduler in stream:
if scheduler.active:
strategies_table[scheduler.strategy_id] = scheduler
else:
del strategies_table[scheduler.strategy_id]
@app.agent(task_scheduler_topic)
async def task_scheduler(stream: faust.Stream):
async for scheduler in stream:
meta = schedulers_meta[scheduler.strategy_id]
meta["job_sent_at"] = int(time.time() * 1000)
meta["is_processing"] = True
schedulers_meta[scheduler.strategy_id] = meta
await task_queue_topic.send(key=scheduler.strategy_id)
def task_results(strategy_id):
meta = schedulers_meta[strategy_id]
meta["job_processed_at"] = int(time.time() * 1000)
meta["is_processing"] = False
schedulers_meta[strategy_id] = meta
@app.agent(task_queue_topic, sink=[task_results], concurrency=10)
async def task_queue_topic(stream: faust.Stream):
async for event in stream.events():
async with event:
await asyncio.sleep(0.05)
print(f"Processed Strategy ID: {event.key}")
yield event.key
@app.task
async def scheduler_loop(app):
while not app.should_stop:
for scheduler in strategies_table.values():
meta = schedulers_meta[scheduler.strategy_id]
if scheduler.stale(meta, trades_updated_at):
await task_scheduler_topic.send(key=scheduler.strategy_id, value=scheduler)
if len(strategies_table) == 0:
await asyncio.sleep(1)
else:
await asyncio.sleep(0.001)
@app.task(on_leader=True)
async def preload_strategies():
async for strategy in load_strategies():
scheduler = StrategyScheduler(
strategy_id=strategy["id"],
active=strategy["active"],
symbols=strategy["data"]["symbols"],
strategy=strategy["data"]
)
await strategy_topic.send(
key=scheduler.strategy_id,
value=scheduler
)
if __name__ == '__main__':
app.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment