Skip to content

Instantly share code, notes, and snippets.

@oguzhanmeteozturk
Last active March 28, 2023 00:17
Show Gist options
  • Save oguzhanmeteozturk/a78504ebd498a08370803c8896733172 to your computer and use it in GitHub Desktop.
Save oguzhanmeteozturk/a78504ebd498a08370803c8896733172 to your computer and use it in GitHub Desktop.
CREATE OR REPLACE FUNCTION check_flow_complete(flow_id TEXT)
RETURNS BOOLEAN AS $$
DECLARE
flow_total_nodes INTEGER;
flow_total_links INTEGER;
node_count INTEGER;
edge_count INTEGER;
BEGIN
SELECT total_nodes, total_links INTO flow_total_nodes, flow_total_links FROM "Flow" WHERE id = flow_id;
SELECT COUNT(*) INTO node_count FROM "Node" WHERE "flowId" = flow_id;
SELECT COUNT(*) INTO edge_count FROM "Edge" WHERE "flowId" = flow_id;
IF flow_total_nodes = node_count AND flow_total_links = edge_count THEN
RETURN TRUE;
END IF;
RETURN FALSE;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION post_flow_update()
RETURNS TRIGGER
LANGUAGE plpgsql
SECURITY definer SET search_path = public
AS $$
DECLARE
payload TEXT;
flow_id TEXT;
flow_url TEXT;
task_queue_url TEXT;
BEGIN
IF TG_TABLE_NAME = 'Flow' THEN
flow_id := NEW.id;
ELSIF TG_TABLE_NAME = 'Node' THEN
flow_id := NEW."flowId";
ELSIF TG_TABLE_NAME = 'Edge' THEN
flow_id := NEW."flowId";
END IF;
IF check_flow_complete(flow_id) THEN
SELECT json_build_object(
'flow_id', flow_id,
'name', (SELECT name FROM "Flow" WHERE id = flow_id),
'nodes', (
SELECT json_agg(json_build_object('id', nodes.id, 'name', nodes.name))
FROM "Node" AS nodes
WHERE nodes."flowId" = flow_id
),
'edges', (
SELECT json_agg(json_build_object('id', edges.id, 'sourceId', edges."sourceId", 'targetId', edges."targetId"))
FROM "Edge" AS edges
WHERE edges."flowId" = flow_id
)
)::text INTO payload;
-- Construct the URL with the flowId and payload
flow_url := 'http://host.docker.internal:7379/LPUSH/flow/' || http.urlencode(flow_id) || '/' || http.urlencode(payload);
task_queue_url := 'http://host.docker.internal:7379/LPUSH/task_queue/' || 'start_' || http.urlencode(flow_id);
-- Perform the GET request
PERFORM http.http(('GET', flow_url, NULL, NULL, NULL)::http.http_request);
PERFORM http.http(('GET', task_queue_url, NULL, NULL, NULL)::http.http_request);
END IF;
RETURN NEW;
END;
$$;
DROP TRIGGER IF EXISTS flow_all_events ON "Flow";
CREATE TRIGGER flow_all_events
AFTER INSERT OR UPDATE ON "Flow"
FOR EACH ROW
EXECUTE FUNCTION post_flow_update();
DROP TRIGGER IF EXISTS node_all_events ON "Node";
CREATE TRIGGER node_all_events
AFTER INSERT OR UPDATE ON "Node"
FOR EACH ROW
EXECUTE FUNCTION post_flow_update();
DROP TRIGGER IF EXISTS edge_all_events ON "Edge";
CREATE TRIGGER edge_all_events
AFTER INSERT OR UPDATE ON "Edge"
FOR EACH ROW
EXECUTE FUNCTION post_flow_update();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment