Created
November 28, 2018 18:10
-
-
Save molind/fffb9fc2096bd7234f65335c78379c4f to your computer and use it in GitHub Desktop.
Parallel select function for PostgreSQL.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- | |
-- Befor using it you should enable dblink extension in database and allow user to run dblink_connect_u | |
-- You may need to change 'dbname=osm' to your db connection options in line 34. | |
-- CREATE EXTENSION dblink; | |
-- GRANT EXECUTE ON FUNCTION dblink_connect_u(text) TO user; | |
-- GRANT EXECUTE ON FUNCTION dblink_connect_u(text, text) TO user; | |
-- | |
DROP FUNCTION IF EXISTS public.g_parsel(query text, table_to_chunk text, num_chunks integer); | |
CREATE OR REPLACE FUNCTION public.g_parsel(query text, table_to_chunk text, num_chunks integer default 2) | |
RETURNS text AS | |
$BODY$ | |
DECLARE | |
sql TEXT; | |
chunk_table text; | |
subquery text; | |
i integer; | |
conn text; | |
n integer; | |
num_done integer; | |
status integer; | |
dispatch_result integer; | |
dispatch_error text; | |
part text; | |
rand text; | |
BEGIN | |
-- loop through chunks | |
for i in | |
select generate_series(1,num_chunks) | |
LOOP | |
--for debugging | |
RAISE NOTICE 'Chunk %', i; | |
--make a new db connection | |
conn := 'conn_' || i; | |
sql := 'SELECT dblink_connect_u(' || QUOTE_LITERAL(conn) || ',' || QUOTE_LITERAL('dbname=osm') ||');'; | |
execute sql; | |
--part := '(SELECT * FROM ' || table_to_chunk || ' WHERE osm_id % ' || num_chunks || ' = ' || i-1 || ') p1'; | |
part := table_to_chunk || ' WHERE abs(osm_id % ' || num_chunks || ') = ' || i-1 || ' '; | |
--edit the input query using the subsquery string | |
sql := 'SELECT REPLACE(' || QUOTE_LITERAL(query) || ',' || QUOTE_LITERAL(table_to_chunk) || ',' || QUOTE_LITERAL(part) || ');'; | |
execute sql into subquery; | |
RAISE NOTICE '%', subquery; | |
sql := 'SELECT dblink_send_query(' || QUOTE_LITERAL(conn) || ',' || QUOTE_LITERAL(subquery) || ');'; | |
execute sql into dispatch_result; | |
end loop; | |
-- wait until all queries are finished | |
Loop | |
num_done := 0; | |
for i in 1..num_chunks | |
Loop | |
conn := 'conn_' || i; | |
sql := 'SELECT dblink_is_busy(' || QUOTE_LITERAL(conn) || ');'; | |
execute sql into status; | |
if status = 0 THEN | |
-- check for error messages | |
sql := 'SELECT dblink_error_message(' || QUOTE_LITERAL(conn) || ');'; | |
execute sql into dispatch_error; | |
if dispatch_error <> 'OK' THEN | |
RAISE '%', dispatch_error; | |
end if; | |
num_done := num_done + 1; | |
END if; | |
end loop; | |
sql := 'select pg_sleep(1);'; | |
execute sql; | |
if num_done >= num_chunks then | |
exit; | |
end if; | |
END loop; | |
-- disconnect the dblinks | |
FOR i in 1..num_chunks | |
LOOP | |
conn := 'conn_' || i; | |
sql := 'SELECT dblink_disconnect(' || QUOTE_LITERAL(conn) || ');'; | |
execute sql; | |
end loop; | |
RETURN 'Success'; | |
-- error catching to disconnect dblink connections, if error occurs | |
exception when others then | |
BEGIN | |
RAISE NOTICE '% %', SQLERRM, SQLSTATE; | |
for n in | |
SELECT generate_series(1,i) as n | |
LOOP | |
conn := 'conn_' || n; | |
sql := 'SELECT dblink_disconnect(' || QUOTE_LITERAL(conn) || ');'; | |
execute sql; | |
END LOOP; | |
exception when others then | |
RAISE NOTICE '% %', SQLERRM, SQLSTATE; | |
end; | |
END | |
$BODY$ | |
LANGUAGE plpgsql STABLE | |
COST 100; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment