Created
January 15, 2022 10:50
-
-
Save elja/7aa5087a900ba6bc751b0c6b85d252e2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import random | |
import faust | |
import sentry_sdk | |
import logging | |
import time | |
from scheduler.schemas import trade_schema, strategy_schema, StrategyScheduler | |
from scheduler.db_loader import load_strategies | |
from faust.sensors.prometheus import setup_prometheus_sensors | |
from config_loader import config | |
logging.basicConfig(level=config.LOG_LEVEL, format=config.LOG_FORMAT) | |
if config.SENTRY_URL: | |
sentry_sdk.init( | |
config.SENTRY_URL, | |
traces_sample_rate=config.SENTRY_SAMPLES_RATE, | |
) | |
app = faust.App( | |
id=f"scheduler_{config.ENV}", | |
broker=config.KAFKA_BOOTSTRAP_SERVERS | |
) | |
setup_prometheus_sensors(app) | |
trades_topic = app.topic(config.TRADES_TOPIC, schema=trade_schema, partitions=8) | |
strategy_topic = app.topic(config.STRATEGY_TOPIC, schema=strategy_schema, partitions=8) | |
task_scheduler_topic = app.topic(config.TASK_SCHEDULER_TOPIC, schema=strategy_schema, partitions=8) | |
task_queue_topic = app.topic(config.TASK_QUEUE_TOPIC, key_type=str, partitions=8) | |
trades_updated_at = app.GlobalTable('trades_updates_0', default=int, partitions=8) | |
schedulers_meta = app.Table('schedulers_meta_0', default=dict, partitions=8) | |
strategies_table = app.Table('strategies_0', partitions=8) | |
@app.agent(trades_topic) | |
async def trades_stream(stream: faust.Stream): | |
async for trade in stream: | |
trades_updated_at[trade.s] = trade.t | |
@app.agent(strategy_topic) | |
async def strategies_updates(stream: faust.Stream): | |
async for scheduler in stream: | |
if scheduler.active: | |
strategies_table[scheduler.strategy_id] = scheduler | |
else: | |
del strategies_table[scheduler.strategy_id] | |
@app.agent(task_scheduler_topic) | |
async def task_scheduler(stream: faust.Stream): | |
async for scheduler in stream: | |
meta = schedulers_meta[scheduler.strategy_id] | |
meta["job_sent_at"] = int(time.time() * 1000) | |
meta["is_processing"] = True | |
schedulers_meta[scheduler.strategy_id] = meta | |
await task_queue_topic.send(key=scheduler.strategy_id) | |
def task_results(strategy_id): | |
meta = schedulers_meta[strategy_id] | |
meta["job_processed_at"] = int(time.time() * 1000) | |
meta["is_processing"] = False | |
schedulers_meta[strategy_id] = meta | |
@app.agent(task_queue_topic, sink=[task_results], concurrency=10) | |
async def task_queue_topic(stream: faust.Stream): | |
async for event in stream.events(): | |
async with event: | |
await asyncio.sleep(0.05) | |
print(f"Processed Strategy ID: {event.key}") | |
yield event.key | |
@app.task | |
async def scheduler_loop(app): | |
while not app.should_stop: | |
for scheduler in strategies_table.values(): | |
meta = schedulers_meta[scheduler.strategy_id] | |
if scheduler.stale(meta, trades_updated_at): | |
await task_scheduler_topic.send(key=scheduler.strategy_id, value=scheduler) | |
if len(strategies_table) == 0: | |
await asyncio.sleep(1) | |
else: | |
await asyncio.sleep(0.001) | |
@app.task(on_leader=True) | |
async def preload_strategies(): | |
async for strategy in load_strategies(): | |
scheduler = StrategyScheduler( | |
strategy_id=strategy["id"], | |
active=strategy["active"], | |
symbols=strategy["data"]["symbols"], | |
strategy=strategy["data"] | |
) | |
await strategy_topic.send( | |
key=scheduler.strategy_id, | |
value=scheduler | |
) | |
if __name__ == '__main__': | |
app.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment