-
-
Save oguzhanmeteozturk/a78504ebd498a08370803c8896733172 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE OR REPLACE FUNCTION check_flow_complete(flow_id TEXT) | |
RETURNS BOOLEAN AS $$ | |
DECLARE | |
flow_total_nodes INTEGER; | |
flow_total_links INTEGER; | |
node_count INTEGER; | |
edge_count INTEGER; | |
BEGIN | |
SELECT total_nodes, total_links INTO flow_total_nodes, flow_total_links FROM "Flow" WHERE id = flow_id; | |
SELECT COUNT(*) INTO node_count FROM "Node" WHERE "flowId" = flow_id; | |
SELECT COUNT(*) INTO edge_count FROM "Edge" WHERE "flowId" = flow_id; | |
IF flow_total_nodes = node_count AND flow_total_links = edge_count THEN | |
RETURN TRUE; | |
END IF; | |
RETURN FALSE; | |
END; | |
$$ LANGUAGE plpgsql; | |
CREATE OR REPLACE FUNCTION post_flow_update() | |
RETURNS TRIGGER | |
LANGUAGE plpgsql | |
SECURITY definer SET search_path = public | |
AS $$ | |
DECLARE | |
payload TEXT; | |
flow_id TEXT; | |
flow_url TEXT; | |
task_queue_url TEXT; | |
BEGIN | |
IF TG_TABLE_NAME = 'Flow' THEN | |
flow_id := NEW.id; | |
ELSIF TG_TABLE_NAME = 'Node' THEN | |
flow_id := NEW."flowId"; | |
ELSIF TG_TABLE_NAME = 'Edge' THEN | |
flow_id := NEW."flowId"; | |
END IF; | |
IF check_flow_complete(flow_id) THEN | |
SELECT json_build_object( | |
'flow_id', flow_id, | |
'name', (SELECT name FROM "Flow" WHERE id = flow_id), | |
'nodes', ( | |
SELECT json_agg(json_build_object('id', nodes.id, 'name', nodes.name)) | |
FROM "Node" AS nodes | |
WHERE nodes."flowId" = flow_id | |
), | |
'edges', ( | |
SELECT json_agg(json_build_object('id', edges.id, 'sourceId', edges."sourceId", 'targetId', edges."targetId")) | |
FROM "Edge" AS edges | |
WHERE edges."flowId" = flow_id | |
) | |
)::text INTO payload; | |
-- Construct the URL with the flowId and payload | |
flow_url := 'http://host.docker.internal:7379/LPUSH/flow/' || http.urlencode(flow_id) || '/' || http.urlencode(payload); | |
task_queue_url := 'http://host.docker.internal:7379/LPUSH/task_queue/' || 'start_' || http.urlencode(flow_id); | |
-- Perform the GET request | |
PERFORM http.http(('GET', flow_url, NULL, NULL, NULL)::http.http_request); | |
PERFORM http.http(('GET', task_queue_url, NULL, NULL, NULL)::http.http_request); | |
END IF; | |
RETURN NEW; | |
END; | |
$$; | |
DROP TRIGGER IF EXISTS flow_all_events ON "Flow"; | |
CREATE TRIGGER flow_all_events | |
AFTER INSERT OR UPDATE ON "Flow" | |
FOR EACH ROW | |
EXECUTE FUNCTION post_flow_update(); | |
DROP TRIGGER IF EXISTS node_all_events ON "Node"; | |
CREATE TRIGGER node_all_events | |
AFTER INSERT OR UPDATE ON "Node" | |
FOR EACH ROW | |
EXECUTE FUNCTION post_flow_update(); | |
DROP TRIGGER IF EXISTS edge_all_events ON "Edge"; | |
CREATE TRIGGER edge_all_events | |
AFTER INSERT OR UPDATE ON "Edge" | |
FOR EACH ROW | |
EXECUTE FUNCTION post_flow_update(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment