Created
July 30, 2021 09:34
-
-
Save molind/ab52c5540cd33b54b1249b3484b1ca38 to your computer and use it in GitHub Desktop.
Parallel select and Parallel query
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- since it uses dblink it should be enabled in the database. | |
-- CREATE EXTENSION dblink; | |
-- And you'll may need to grant permissions to use it to your user. | |
-- GRANT EXECUTE ON FUNCTION dblink_connect_u(text) TO user; | |
-- GRANT EXECUTE ON FUNCTION dblink_connect_u(text, text) TO user; | |
-- Usage example: | |
-- select g_parsel('insert into osm_polygon_extra select osm_id, st_pointonsurface( st_collect( geom ) ) from osm_polygons group by osm_id;', 'osm_polygons', 12); | |
CREATE OR REPLACE FUNCTION public.g_parsel(query text, table_to_chunk text, num_chunks integer DEFAULT 2) | |
RETURNS text | |
LANGUAGE plpgsql | |
STABLE | |
AS $function$ | |
DECLARE | |
sql TEXT; | |
chunk_table text; | |
subquery text; | |
i integer; | |
conn text; | |
n integer; | |
num_done integer; | |
status integer; | |
dispatch_result integer; | |
dispatch_error text; | |
part text; | |
rand text; | |
BEGIN | |
-- loop through chunks | |
for i in | |
select generate_series(1,num_chunks) | |
LOOP | |
--for debugging | |
RAISE NOTICE 'Chunk %', i; | |
--make a new db connection | |
conn := 'conn_' || i; | |
sql := 'SELECT dblink_connect_u(' || QUOTE_LITERAL(conn) || ',' || QUOTE_LITERAL('dbname=osm') ||');'; | |
execute sql; | |
--part := '(SELECT * FROM ' || table_to_chunk || ' WHERE osm_id % ' || num_chunks || ' = ' || i-1 || ') p1'; | |
part := table_to_chunk || ' WHERE abs(osm_id % ' || num_chunks || ') = ' || i-1 || ' '; | |
--edit the input query using the subsquery string | |
sql := 'SELECT REPLACE(' || QUOTE_LITERAL(query) || ',' || QUOTE_LITERAL(table_to_chunk) || ',' || QUOTE_LITERAL(part) || ');'; | |
execute sql into subquery; | |
RAISE NOTICE '%', subquery; | |
sql := 'SELECT dblink_send_query(' || QUOTE_LITERAL(conn) || ',' || QUOTE_LITERAL(subquery) || ');'; | |
execute sql into dispatch_result; | |
end loop; | |
-- wait until all queries are finished | |
Loop | |
num_done := 0; | |
for i in 1..num_chunks | |
Loop | |
conn := 'conn_' || i; | |
sql := 'SELECT dblink_is_busy(' || QUOTE_LITERAL(conn) || ');'; | |
execute sql into status; | |
if status = 0 THEN | |
-- check for error messages | |
sql := 'SELECT dblink_error_message(' || QUOTE_LITERAL(conn) || ');'; | |
execute sql into dispatch_error; | |
if dispatch_error <> 'OK' THEN | |
RAISE '%', dispatch_error; | |
end if; | |
num_done := num_done + 1; | |
END if; | |
end loop; | |
sql := 'select pg_sleep(1);'; | |
execute sql; | |
if num_done >= num_chunks then | |
exit; | |
end if; | |
END loop; | |
-- disconnect the dblinks | |
FOR i in 1..num_chunks | |
LOOP | |
conn := 'conn_' || i; | |
sql := 'SELECT dblink_disconnect(' || QUOTE_LITERAL(conn) || ');'; | |
execute sql; | |
end loop; | |
RETURN 'Success'; | |
-- error catching to disconnect dblink connections, if error occurs | |
exception when others then | |
BEGIN | |
RAISE NOTICE '% %', SQLERRM, SQLSTATE; | |
for n in | |
SELECT generate_series(1,i) as n | |
LOOP | |
conn := 'conn_' || n; | |
sql := 'SELECT dblink_disconnect(' || QUOTE_LITERAL(conn) || ');'; | |
execute sql; | |
END LOOP; | |
exception when others then | |
RAISE NOTICE '% %', SQLERRM, SQLSTATE; | |
end; | |
END | |
$function$ | |
; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- Parallel query AKA pquery | |
-- It runs multiple queries in parallel and exits when everything is done. | |
-- Usage example | |
-- select public.g_pquery(ARRAY[ | |
-- 'CREATE INDEX "osm_polygons_extra_polyZoom" ON osm_polygons_extra USING brin(polyZoom);', | |
-- 'CREATE INDEX "osm_polygons_extra_centerPoint" ON osm_polygons_extra USING gist(centerPoint);' | |
-- ]); | |
CREATE OR REPLACE FUNCTION public.g_pquery(queries text[]) | |
RETURNS void | |
LANGUAGE plpgsql | |
STABLE | |
AS $function$ | |
DECLARE | |
sql text; | |
subquery text; | |
conn text; | |
i integer; | |
n integer; | |
num_done integer; | |
status integer; | |
dispatch_result integer; | |
dispatch_error text; | |
part text; | |
rand text; | |
BEGIN | |
i := 1; | |
FOREACH subquery IN ARRAY queries | |
loop | |
--for debugging | |
RAISE NOTICE 'Query %', i; | |
--make a new db connection | |
conn := 'pq_' || i; | |
sql := 'SELECT dblink_connect_u(' || QUOTE_LITERAL(conn) || ',' || QUOTE_LITERAL('dbname=osm') ||');'; | |
execute sql; | |
RAISE NOTICE '%', subquery; | |
sql := 'SELECT dblink_send_query(' || QUOTE_LITERAL(conn) || ',' || QUOTE_LITERAL(subquery) || ');'; | |
execute sql into dispatch_result; | |
i := i+1; | |
end loop; | |
n := i-1; | |
-- wait until all queries are finished | |
Loop | |
num_done := 0; | |
for i in 1..n | |
Loop | |
conn := 'pq_' || i; | |
sql := 'SELECT dblink_is_busy(' || QUOTE_LITERAL(conn) || ');'; | |
execute sql into status; | |
if status = 0 THEN | |
-- check for error messages | |
sql := 'SELECT dblink_error_message(' || QUOTE_LITERAL(conn) || ');'; | |
execute sql into dispatch_error; | |
if dispatch_error <> 'OK' THEN | |
RAISE '%', dispatch_error; | |
end if; | |
num_done := num_done + 1; | |
END if; | |
end loop; | |
sql := 'select pg_sleep(1);'; | |
execute sql; | |
if num_done >= n then | |
exit; | |
end if; | |
END loop; | |
-- disconnect the dblinks | |
FOR i in 1..n | |
LOOP | |
conn := 'pq_' || i; | |
sql := 'SELECT dblink_disconnect(' || QUOTE_LITERAL(conn) || ');'; | |
execute sql; | |
end loop; | |
END | |
$function$ | |
; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment