Created
March 7, 2016 06:12
-
-
Save poppingtonic/a4959e830c49eae055ce to your computer and use it in GitHub Desktop.
Parsel: A Simple Function for Parallel Query in Postgres using Dblink
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- DROP FUNCTION IF EXISTS public.parsel(db text, table_to_chunk text, pkey text, query text, output_table text, table_to_chunk_alias text, num_chunks integer); | |
CREATE OR REPLACE FUNCTION public.parsel(db text, table_to_chunk text, pkey text, query text, output_table text, table_to_chunk_alias text default '', num_chunks integer default 2) | |
RETURNS text AS | |
$BODY$ | |
DECLARE | |
sql TEXT; | |
min_id integer; | |
max_id integer; | |
step_size integer; | |
lbnd integer; | |
ubnd integer; | |
subquery text; | |
insert_query text; | |
i integer; | |
conn text; | |
n integer; | |
num_done integer; | |
status integer; | |
dispatch_result integer; | |
dispatch_error text; | |
part text; | |
rand text; | |
BEGIN | |
--find minimum pkey id | |
sql := 'SELECT min(' || pkey || ') from ' || table_to_chunk || ';'; | |
execute sql into min_id; | |
--find maximum pkey id | |
sql := 'SELECT max(' || pkey || ') from ' || table_to_chunk || ';'; | |
execute sql into max_id; | |
-- determine size of chunks based on min id, max id, and number of chunks | |
sql := 'SELECT ( ' || max_id || '-' || min_id || ')/' || num_chunks || ';'; | |
EXECUTE sql into step_size; | |
-- loop through chunks | |
for lbnd,ubnd,i in | |
SELECT generate_series(min_id,max_id,step_size) as lbnd, | |
generate_series(min_id+step_size,max_id+step_size,step_size) as ubnd, | |
generate_series(1,num_chunks+1) as i | |
LOOP | |
--for debugging | |
RAISE NOTICE 'Chunk %: % >= % and % < %', i,pkey,lbnd,pkey,ubnd; | |
--make a new db connection | |
conn := 'conn_' || i; | |
sql := 'SELECT dblink_connect(' || QUOTE_LITERAL(conn) || ',' || QUOTE_LITERAL('dbname=' || db) ||');'; | |
execute sql; | |
-- create a subquery string that will replace the table name in the original query | |
if table_to_chunk_alias = '' then | |
sql := 'select ''squery'' || ((10000*random())::integer::text);'; | |
execute sql into table_to_chunk_alias; | |
part := '(SELECT * FROM ' || table_to_chunk || ' WHERE ' || pkey || ' >= ' || lbnd || ' AND ' || pkey || ' < ' || ubnd || ') ' || table_to_chunk_alias; | |
else | |
part := '(SELECT * FROM ' || table_to_chunk || ' WHERE ' || pkey || ' >= ' || lbnd || ' AND ' || pkey || ' < ' || ubnd || ') '; | |
end if; | |
--edit the input query using the subsquery string | |
sql := 'SELECT REPLACE(' || QUOTE_LITERAL(query) || ',' || QUOTE_LITERAL(table_to_chunk) || ',' || QUOTE_LITERAL(part) || ');'; | |
execute sql into subquery; | |
insert_query := 'INSERT INTO ' || output_table || ' ' || subquery || ';'; | |
raise NOTICE '%', insert_query; | |
-- --send the query asynchronously using the dblink connection | |
sql := 'SELECT dblink_send_query(' || QUOTE_LITERAL(conn) || ',' || QUOTE_LITERAL(insert_query) || ');'; | |
execute sql into dispatch_result; | |
-- check for errors dispatching the query | |
if dispatch_result = 0 then | |
sql := 'SELECT dblink_error_message(' || QUOTE_LITERAL(conn) || ');'; | |
execute sql into dispatch_error; | |
RAISE '%', dispatch_error; | |
end if; | |
end loop; | |
-- wait until all queries are finished | |
Loop | |
num_done := 0; | |
for i in 1..num_chunks+1 | |
Loop | |
conn := 'conn_' || i; | |
sql := 'SELECT dblink_is_busy(' || QUOTE_LITERAL(conn) || ');'; | |
execute sql into status; | |
if status = 0 THEN | |
-- check for error messages | |
sql := 'SELECT dblink_error_message(' || QUOTE_LITERAL(conn) || ');'; | |
execute sql into dispatch_error; | |
if dispatch_error <> 'OK' THEN | |
RAISE '%', dispatch_error; | |
end if; | |
num_done := num_done + 1; | |
END if; | |
end loop; | |
if num_done >= num_chunks+1 then | |
exit; | |
end if; | |
END loop; | |
-- disconnect the dblinks | |
FOR i in 1..num_chunks+1 | |
LOOP | |
conn := 'conn_' || i; | |
sql := 'SELECT dblink_disconnect(' || QUOTE_LITERAL(conn) || ');'; | |
execute sql; | |
end loop; | |
RETURN 'Success'; | |
-- error catching to disconnect dblink connections, if error occurs | |
exception when others then | |
BEGIN | |
RAISE NOTICE '% %', SQLERRM, SQLSTATE; | |
for n in | |
SELECT generate_series(1,i) as n | |
LOOP | |
conn := 'conn_' || n; | |
sql := 'SELECT dblink_disconnect(' || QUOTE_LITERAL(conn) || ');'; | |
execute sql; | |
END LOOP; | |
exception when others then | |
RAISE NOTICE '% %', SQLERRM, SQLSTATE; | |
end; | |
END | |
$BODY$ | |
LANGUAGE plpgsql STABLE | |
COST 100; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment