Created
March 24, 2023 21:59
-
-
Save oguzhanmeteozturk/c0ec6bdb327ba4fbd357be3dabd66e35 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Listens to the database and creates pipelines when a flow is created. and passes the flow to the redis. | |
Copyright (c) 2023 Stroma Vision Inc. | |
""" | |
import json | |
import queue | |
from typing import Any | |
import redis | |
import requests | |
import uvicorn | |
from fastapi import FastAPI, status | |
from pydantic import BaseModel | |
from starlette.status import HTTP_200_OK | |
from krait.runner import KraitLoader, KraitRunner | |
from krait.runner.nodes.base import NodeReprFactory | |
app = FastAPI() | |
node_queue = queue.Queue() | |
edge_queue = queue.Queue() | |
flows = [{"id": 3, "name": "Flow 1", "nodes": [], "edges": []}, {"id": 2, "name": "Flow 2", "nodes": [], "edges": []}] | |
RUN_URL = "http://localhost:3333/run" | |
STOP_URL = "http://localhost:3333/stop" | |
redis_conn = redis.Redis(host="localhost", port=6379, db=0) | |
pipelines = {} | |
class PipelineConfig(BaseModel): | |
"""Pipeline config model.""" | |
name: str = "" | |
specie: str = "default" | |
props: dict[str, Any] = {} | |
nodes: list[dict[str, Any]] = [] | |
links: list[dict[str, Any]] = [] | |
def post_krait(data): | |
response = requests.post(STOP_URL, json=data) | |
response = requests.post(RUN_URL, json=data) | |
print(response.status_code) | |
print(response.text) | |
def find_flow(flow_id): | |
flow = next((flow for flow in flows if flow["id"] == flow_id), None) | |
if not flow: | |
flows.append( | |
{"id": flow_id, "name": f"Flow {flow_id}", "nodes": [], "edges": [], "total_nodes": 0, "total_links": 0} | |
) | |
return find_flow(flow_id) | |
else: | |
return flow | |
def filter_node_record(record): | |
return {k: v for k, v in record.items() if k in ["name", "id", "data"]} | |
def filter_edge_record(record): | |
return {k: v for k, v in record.items() if k in ["id", "sourceId", "targetId"]} | |
def find_node_by_id(node_id): | |
for flow in flows: | |
for node in flow["nodes"]: | |
if node["id"] == node_id: | |
return node | |
return None | |
def restructure_node(node: dict): | |
new_node = {} | |
# remove . from node name | |
new_node["name"] = node["name"].replace(".", "") | |
new_node["specie"] = node["name"] | |
return new_node | |
def restructure_edge(edge: dict): | |
new_edge = {} | |
src_node = find_node_by_id(edge["sourceId"])["name"] | |
node_type = src_node.split(".")[1].lower() | |
node_repr = NodeReprFactory.create(node_type) | |
try: | |
src_hook = node_repr.hook_names[1] | |
except IndexError: | |
src_hook = node_repr.hook_names[0] | |
dst_node = find_node_by_id(edge["targetId"])["name"] | |
node_type = dst_node.split(".")[1].lower() | |
node_repr = NodeReprFactory.create(node_type) | |
dst_hook = node_repr.hook_names[0] | |
src = {"name": src_node.replace(".", ""), "specie": src_hook} | |
dst = {"name": dst_node.replace(".", ""), "specie": dst_hook} | |
new_edge["src"] = src | |
new_edge["dst"] = dst | |
return new_edge | |
def restructure_flow(flow): | |
new_flow = {} | |
new_flow["name"] = flow["name"] | |
new_flow["specie"] = "default" | |
new_flow["props"] = {"dai": {"device_id": "1844301031B0381300"}, "gst": {}, "kra": {}} | |
new_flow["nodes"] = [restructure_node(node) for node in flow["nodes"]] | |
new_flow["links"] = [restructure_edge(edge) for edge in flow["edges"]] | |
return new_flow | |
def check_flow(flow_id): | |
flow = find_flow(flow_id) | |
if flow: | |
total_nodes = flow.get("total_nodes") | |
total_links = flow.get("total_links") | |
if node_queue.qsize() == total_nodes and edge_queue.qsize() == total_links: | |
while not node_queue.empty(): | |
node = node_queue.get() | |
flow["nodes"].append(node) | |
while not edge_queue.empty(): | |
edge = edge_queue.get() | |
flow["edges"].append(edge) | |
final_flow = restructure_flow(flow) | |
print("[FLOW] Complete") | |
post_krait(final_flow) | |
with open("flows.json", "w") as f: | |
json.dump(final_flow, f, indent=2) | |
del flows[flows.index(flow)] | |
return True | |
return None | |
def payload_handler(payload): | |
print(payload) | |
table = payload.get("table").lower() | |
record = payload.get("record") | |
if table == "flow": | |
flow_id = record.get("id") | |
total_nodes = record.get("total_nodes") | |
total_links = record.get("total_links") | |
flow = find_flow(flow_id) | |
if not flow.get("total_nodes"): | |
flow["total_nodes"] = total_nodes | |
flow["total_links"] = total_links | |
elif table == "node": | |
flow_id = record.get("flowId") | |
flow = find_flow(flow_id) | |
record = filter_node_record(record) | |
node_queue.put(record) | |
elif table == "edge": | |
flow_id = record.get("flowId") | |
flow = find_flow(flow_id) | |
record = filter_edge_record(record) | |
edge_queue.put(record) | |
check_flow(flow_id) | |
# print("[FLOWS]", flows) | |
@app.post("/") | |
def message(payload: dict): | |
payload_handler(payload) | |
print("[PAYLOAD]", payload) | |
return HTTP_200_OK | |
@app.post("/flow") | |
def flow(payload: dict): | |
"""Update pipeline config.""" | |
print("Received flow") | |
print("[PAYLOAD]", payload) | |
redis_conn.lpush("flow", json.dumps(payload)) # testing | |
# Implement the payload handler and put the payload in the queue with name of the flow | |
# redis_conn.put("flow_name", final_flow))") | |
payload = redis_conn.brpop("flow") | |
print("-----------------") | |
print(payload) | |
return HTTP_200_OK | |
@app.post("/start") | |
def start(payload: dict): | |
"""Run Krait pipeline with json config.""" | |
print("[PAYLOAD]", payload) | |
pipeline_name = payload.get("name") | |
redis_conn.lpush("task_queue", f"start_{pipeline_name}") | |
# Receiver side--------------------- | |
while True: | |
task = redis_conn.brpop("task_queue") | |
if task: | |
_, command = task | |
command_str = command.decode("utf-8") | |
action, pipeline_name = command_str.split("_") | |
if action == "start": | |
print(f"Starting pipeline {pipeline_name}") | |
# Call pipeline manager here with full pipeline config | |
# pipeline_config = redis_conn.get(pipeline_name) | |
if action == "stop": | |
print(f"Stopping pipeline {pipeline_name}") | |
return HTTP_200_OK | |
@app.post("/stop") | |
def stop(payload: dict): | |
"""Run Krait pipeline with json config.""" | |
print("[PAYLOAD]", payload) | |
pipeline_name = payload.get("name") | |
redis_conn.lpush("task_queue", f"stop_{pipeline_name}") | |
return HTTP_200_OK | |
class DBListener: | |
@classmethod | |
def run_api_server(cls): | |
"""Run the Krait API server.""" | |
uvicorn.run("serv:app", host="0.0.0.0", port=3232, log_level="info") | |
if __name__ == "__main__": | |
DBListener.run_api_server() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment