Skip to content

Instantly share code, notes, and snippets.

@poppingtonic
Created March 7, 2016 06:12
Show Gist options
  • Save poppingtonic/a4959e830c49eae055ce to your computer and use it in GitHub Desktop.
Save poppingtonic/a4959e830c49eae055ce to your computer and use it in GitHub Desktop.
Parsel: A Simple Function for Parallel Query in Postgres using Dblink
-- DROP FUNCTION IF EXISTS public.parsel(db text, table_to_chunk text, pkey text, query text, output_table text, table_to_chunk_alias text, num_chunks integer);
CREATE OR REPLACE FUNCTION public.parsel(db text, table_to_chunk text, pkey text, query text, output_table text, table_to_chunk_alias text default '', num_chunks integer default 2)
RETURNS text AS
$BODY$
DECLARE
sql TEXT;
min_id integer;
max_id integer;
step_size integer;
lbnd integer;
ubnd integer;
subquery text;
insert_query text;
i integer;
conn text;
n integer;
num_done integer;
status integer;
dispatch_result integer;
dispatch_error text;
part text;
rand text;
BEGIN
--find minimum pkey id
sql := 'SELECT min(' || pkey || ') from ' || table_to_chunk || ';';
execute sql into min_id;
--find maximum pkey id
sql := 'SELECT max(' || pkey || ') from ' || table_to_chunk || ';';
execute sql into max_id;
-- determine size of chunks based on min id, max id, and number of chunks
sql := 'SELECT ( ' || max_id || '-' || min_id || ')/' || num_chunks || ';';
EXECUTE sql into step_size;
-- loop through chunks
for lbnd,ubnd,i in
SELECT generate_series(min_id,max_id,step_size) as lbnd,
generate_series(min_id+step_size,max_id+step_size,step_size) as ubnd,
generate_series(1,num_chunks+1) as i
LOOP
--for debugging
RAISE NOTICE 'Chunk %: % >= % and % < %', i,pkey,lbnd,pkey,ubnd;
--make a new db connection
conn := 'conn_' || i;
sql := 'SELECT dblink_connect(' || QUOTE_LITERAL(conn) || ',' || QUOTE_LITERAL('dbname=' || db) ||');';
execute sql;
-- create a subquery string that will replace the table name in the original query
if table_to_chunk_alias = '' then
sql := 'select ''squery'' || ((10000*random())::integer::text);';
execute sql into table_to_chunk_alias;
part := '(SELECT * FROM ' || table_to_chunk || ' WHERE ' || pkey || ' >= ' || lbnd || ' AND ' || pkey || ' < ' || ubnd || ') ' || table_to_chunk_alias;
else
part := '(SELECT * FROM ' || table_to_chunk || ' WHERE ' || pkey || ' >= ' || lbnd || ' AND ' || pkey || ' < ' || ubnd || ') ';
end if;
--edit the input query using the subsquery string
sql := 'SELECT REPLACE(' || QUOTE_LITERAL(query) || ',' || QUOTE_LITERAL(table_to_chunk) || ',' || QUOTE_LITERAL(part) || ');';
execute sql into subquery;
insert_query := 'INSERT INTO ' || output_table || ' ' || subquery || ';';
raise NOTICE '%', insert_query;
-- --send the query asynchronously using the dblink connection
sql := 'SELECT dblink_send_query(' || QUOTE_LITERAL(conn) || ',' || QUOTE_LITERAL(insert_query) || ');';
execute sql into dispatch_result;
-- check for errors dispatching the query
if dispatch_result = 0 then
sql := 'SELECT dblink_error_message(' || QUOTE_LITERAL(conn) || ');';
execute sql into dispatch_error;
RAISE '%', dispatch_error;
end if;
end loop;
-- wait until all queries are finished
Loop
num_done := 0;
for i in 1..num_chunks+1
Loop
conn := 'conn_' || i;
sql := 'SELECT dblink_is_busy(' || QUOTE_LITERAL(conn) || ');';
execute sql into status;
if status = 0 THEN
-- check for error messages
sql := 'SELECT dblink_error_message(' || QUOTE_LITERAL(conn) || ');';
execute sql into dispatch_error;
if dispatch_error <> 'OK' THEN
RAISE '%', dispatch_error;
end if;
num_done := num_done + 1;
END if;
end loop;
if num_done >= num_chunks+1 then
exit;
end if;
END loop;
-- disconnect the dblinks
FOR i in 1..num_chunks+1
LOOP
conn := 'conn_' || i;
sql := 'SELECT dblink_disconnect(' || QUOTE_LITERAL(conn) || ');';
execute sql;
end loop;
RETURN 'Success';
-- error catching to disconnect dblink connections, if error occurs
exception when others then
BEGIN
RAISE NOTICE '% %', SQLERRM, SQLSTATE;
for n in
SELECT generate_series(1,i) as n
LOOP
conn := 'conn_' || n;
sql := 'SELECT dblink_disconnect(' || QUOTE_LITERAL(conn) || ');';
execute sql;
END LOOP;
exception when others then
RAISE NOTICE '% %', SQLERRM, SQLSTATE;
end;
END
$BODY$
LANGUAGE plpgsql STABLE
COST 100;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment