Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save oguzhanmeteozturk/c0ec6bdb327ba4fbd357be3dabd66e35 to your computer and use it in GitHub Desktop.
Save oguzhanmeteozturk/c0ec6bdb327ba4fbd357be3dabd66e35 to your computer and use it in GitHub Desktop.
"""Listens to the database and creates pipelines when a flow is created. and passes the flow to the redis.
Copyright (c) 2023 Stroma Vision Inc.
"""
import json
import queue
from typing import Any
import redis
import requests
import uvicorn
from fastapi import FastAPI, status
from pydantic import BaseModel
from starlette.status import HTTP_200_OK
from krait.runner import KraitLoader, KraitRunner
from krait.runner.nodes.base import NodeReprFactory
app = FastAPI()
node_queue = queue.Queue()
edge_queue = queue.Queue()
flows = [{"id": 3, "name": "Flow 1", "nodes": [], "edges": []}, {"id": 2, "name": "Flow 2", "nodes": [], "edges": []}]
RUN_URL = "http://localhost:3333/run"
STOP_URL = "http://localhost:3333/stop"
redis_conn = redis.Redis(host="localhost", port=6379, db=0)
pipelines = {}
class PipelineConfig(BaseModel):
"""Pipeline config model."""
name: str = ""
specie: str = "default"
props: dict[str, Any] = {}
nodes: list[dict[str, Any]] = []
links: list[dict[str, Any]] = []
def post_krait(data):
response = requests.post(STOP_URL, json=data)
response = requests.post(RUN_URL, json=data)
print(response.status_code)
print(response.text)
def find_flow(flow_id):
flow = next((flow for flow in flows if flow["id"] == flow_id), None)
if not flow:
flows.append(
{"id": flow_id, "name": f"Flow {flow_id}", "nodes": [], "edges": [], "total_nodes": 0, "total_links": 0}
)
return find_flow(flow_id)
else:
return flow
def filter_node_record(record):
return {k: v for k, v in record.items() if k in ["name", "id", "data"]}
def filter_edge_record(record):
return {k: v for k, v in record.items() if k in ["id", "sourceId", "targetId"]}
def find_node_by_id(node_id):
for flow in flows:
for node in flow["nodes"]:
if node["id"] == node_id:
return node
return None
def restructure_node(node: dict):
new_node = {}
# remove . from node name
new_node["name"] = node["name"].replace(".", "")
new_node["specie"] = node["name"]
return new_node
def restructure_edge(edge: dict):
new_edge = {}
src_node = find_node_by_id(edge["sourceId"])["name"]
node_type = src_node.split(".")[1].lower()
node_repr = NodeReprFactory.create(node_type)
try:
src_hook = node_repr.hook_names[1]
except IndexError:
src_hook = node_repr.hook_names[0]
dst_node = find_node_by_id(edge["targetId"])["name"]
node_type = dst_node.split(".")[1].lower()
node_repr = NodeReprFactory.create(node_type)
dst_hook = node_repr.hook_names[0]
src = {"name": src_node.replace(".", ""), "specie": src_hook}
dst = {"name": dst_node.replace(".", ""), "specie": dst_hook}
new_edge["src"] = src
new_edge["dst"] = dst
return new_edge
def restructure_flow(flow):
new_flow = {}
new_flow["name"] = flow["name"]
new_flow["specie"] = "default"
new_flow["props"] = {"dai": {"device_id": "1844301031B0381300"}, "gst": {}, "kra": {}}
new_flow["nodes"] = [restructure_node(node) for node in flow["nodes"]]
new_flow["links"] = [restructure_edge(edge) for edge in flow["edges"]]
return new_flow
def check_flow(flow_id):
flow = find_flow(flow_id)
if flow:
total_nodes = flow.get("total_nodes")
total_links = flow.get("total_links")
if node_queue.qsize() == total_nodes and edge_queue.qsize() == total_links:
while not node_queue.empty():
node = node_queue.get()
flow["nodes"].append(node)
while not edge_queue.empty():
edge = edge_queue.get()
flow["edges"].append(edge)
final_flow = restructure_flow(flow)
print("[FLOW] Complete")
post_krait(final_flow)
with open("flows.json", "w") as f:
json.dump(final_flow, f, indent=2)
del flows[flows.index(flow)]
return True
return None
def payload_handler(payload):
print(payload)
table = payload.get("table").lower()
record = payload.get("record")
if table == "flow":
flow_id = record.get("id")
total_nodes = record.get("total_nodes")
total_links = record.get("total_links")
flow = find_flow(flow_id)
if not flow.get("total_nodes"):
flow["total_nodes"] = total_nodes
flow["total_links"] = total_links
elif table == "node":
flow_id = record.get("flowId")
flow = find_flow(flow_id)
record = filter_node_record(record)
node_queue.put(record)
elif table == "edge":
flow_id = record.get("flowId")
flow = find_flow(flow_id)
record = filter_edge_record(record)
edge_queue.put(record)
check_flow(flow_id)
# print("[FLOWS]", flows)
@app.post("/")
def message(payload: dict):
payload_handler(payload)
print("[PAYLOAD]", payload)
return HTTP_200_OK
@app.post("/flow")
def flow(payload: dict):
"""Update pipeline config."""
print("Received flow")
print("[PAYLOAD]", payload)
redis_conn.lpush("flow", json.dumps(payload)) # testing
# Implement the payload handler and put the payload in the queue with name of the flow
# redis_conn.put("flow_name", final_flow))")
payload = redis_conn.brpop("flow")
print("-----------------")
print(payload)
return HTTP_200_OK
@app.post("/start")
def start(payload: dict):
"""Run Krait pipeline with json config."""
print("[PAYLOAD]", payload)
pipeline_name = payload.get("name")
redis_conn.lpush("task_queue", f"start_{pipeline_name}")
# Receiver side---------------------
while True:
task = redis_conn.brpop("task_queue")
if task:
_, command = task
command_str = command.decode("utf-8")
action, pipeline_name = command_str.split("_")
if action == "start":
print(f"Starting pipeline {pipeline_name}")
# Call pipeline manager here with full pipeline config
# pipeline_config = redis_conn.get(pipeline_name)
if action == "stop":
print(f"Stopping pipeline {pipeline_name}")
return HTTP_200_OK
@app.post("/stop")
def stop(payload: dict):
"""Run Krait pipeline with json config."""
print("[PAYLOAD]", payload)
pipeline_name = payload.get("name")
redis_conn.lpush("task_queue", f"stop_{pipeline_name}")
return HTTP_200_OK
class DBListener:
@classmethod
def run_api_server(cls):
"""Run the Krait API server."""
uvicorn.run("serv:app", host="0.0.0.0", port=3232, log_level="info")
if __name__ == "__main__":
DBListener.run_api_server()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment