Last active
June 4, 2020 13:10
-
-
Save rponte/0c5b0e3c1b84c0c2e49c863215c2c0f4 to your computer and use it in GitHub Desktop.
Oracle: Playing a little bit with SELECT...FOR UPDATE SKIP LOCKED
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- | |
-- tabela JOB_QUEUE | |
-- | |
CREATE SEQUENCE SEQ_JOB_QUEUE; | |
CREATE TABLE JOB_QUEUE ( | |
ID NUMBER NOT NULL ENABLE, | |
CONTENT VARCHAR2(4000 CHAR) NOT NULL ENABLE, | |
STATUS VARCHAR2(20 CHAR) NOT NULL ENABLE, | |
THREAD_NAME VARCHAR2(60 CHAR), | |
CONSTRAINT JOB_QUEUE_PK PRIMARY KEY (ID)) | |
; | |
-- | |
-- insere dados na JOB_QUEUE | |
-- | |
delete from job_queue; | |
insert all | |
into job_queue values (1, 'c1', 'NEW', null) | |
into job_queue values (2, 'c2', 'NEW', null) | |
into job_queue values (3, 'c3', 'NEW', null) | |
into job_queue values (4, 'c4', 'NEW', null) | |
into job_queue values (5, 'c5', 'NEW', null) | |
select * from dual | |
; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- | |
-- package | |
-- | |
create or replace package job_queue_processor as | |
type Job_t is record ( | |
id job_queue.id%type | |
,content job_queue.content%type | |
,status job_queue.status%type | |
,thread_name job_queue.thread_name%type | |
); | |
type Job_List_t is table of Job_t; | |
/** | |
* Processa jobs pendentes | |
*/ | |
procedure process_new_jobs(p_thread_name varchar2 | |
,p_row_limit integer := 1); | |
end job_queue_processor; | |
/ | |
create or replace package body job_queue_processor as | |
procedure process_job(p_job Job_t | |
, p_thread_name varchar2) as | |
l_seconds number; | |
begin | |
dbms_output.put_line('Row: ' || p_job.content); | |
update job_queue j | |
set j.status = 'FINISHED' | |
,j.thread_name = p_thread_name | |
where j.id = p_job.id | |
; | |
l_seconds := floor(dbms_random.value(1, 3)); | |
dbms_lock.sleep(seconds => l_seconds); | |
end; | |
procedure process_new_jobs(p_thread_name varchar2 | |
,p_row_limit integer := 1) as | |
cursor new_jobs_cursor is | |
select jq.* | |
from job_queue jq | |
where jq.status = 'NEW' | |
for update skip locked | |
; | |
l_job_list Job_List_t; | |
l_job Job_t; | |
begin | |
open new_jobs_cursor; | |
loop | |
fetch new_jobs_cursor bulk collect into l_job_list limit p_row_limit; | |
dbms_output.put_line('Fetched ' || l_job_list.COUNT || ' rows.'); | |
for i in 1..l_job_list.count loop | |
l_job := l_job_list(i); | |
-- processa job | |
process_job(p_job => l_job | |
,p_thread_name => p_thread_name); | |
end loop; | |
exit when new_jobs_cursor%NOTFOUND; | |
end loop; | |
commit; | |
close new_jobs_cursor; | |
end process_new_jobs; | |
end job_queue_processor; | |
/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- configuring number of processes | |
-- | |
-- select value from v$parameter where name='job_queue_processes'; | |
-- alter system set job_queue_processes = 20; | |
-- | |
set timing on | |
set serveroutput on | |
declare | |
number_of_jobs_in_the_queue constant integer := 100000; | |
number_of_threads constant integer := 10; | |
batch_size constant integer := 100; | |
job_name varchar2(200); | |
plsql_block varchar2(1000); | |
begin | |
-- clean and populate database | |
delete from job_queue; | |
insert into job_queue(id, content, status, thread_name) | |
select level as id | |
,('c' || level) as content | |
,'NEW' as status | |
,null as thread_name | |
from dual | |
connect by level <= number_of_jobs_in_the_queue | |
; | |
commit; | |
-- // | |
-- execute workers (threads) in background | |
for i in 1..number_of_threads loop | |
job_name := 'thread_' || i; | |
plsql_block := 'BEGIN | |
job_queue_processor.process_new_jobs(p_thread_name => ''%s'' | |
,p_row_limit => %s); | |
END;'; | |
plsql_block := utl_lms.format_message(plsql_block, job_name, to_char(batch_size)); | |
DBMS_SCHEDULER.create_job ( | |
job_name => job_name, | |
job_type => 'PLSQL_BLOCK', | |
job_action => plsql_block, | |
enabled => TRUE | |
); | |
end loop; | |
end; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- | |
-- REUSING CURSOR + BULK COLLECT + LIMIT | |
-- | |
create or replace package body job_queue_processor as | |
procedure process_job(p_job Job_t | |
, p_thread_name varchar2) as | |
l_seconds number; | |
begin | |
dbms_output.put_line('Row: ' || p_job.content); | |
update job_queue j | |
set j.status = 'FINISHED' | |
,j.thread_name = p_thread_name | |
where j.id = p_job.id | |
; | |
l_seconds := round(dbms_random.value(), 2); | |
dbms_lock.sleep(seconds => l_seconds); | |
end; | |
procedure process_new_jobs(p_thread_name varchar2 | |
,p_row_limit integer := 1) as | |
cursor new_jobs_cursor is | |
select jq.* | |
from job_queue jq | |
where jq.status = 'NEW' | |
for update skip locked | |
; | |
l_job_list Job_List_t; | |
l_job Job_t; | |
begin | |
loop | |
-- reusando cursor | |
open new_jobs_cursor; | |
fetch new_jobs_cursor bulk collect into l_job_list limit p_row_limit; | |
dbms_output.put_line('Fetched ' || l_job_list.COUNT || ' rows.'); | |
for i in 1..l_job_list.count loop | |
l_job := l_job_list(i); | |
-- processa job | |
process_job(p_job => l_job | |
,p_thread_name => p_thread_name); | |
end loop; | |
commit; | |
close new_jobs_cursor; | |
exit when l_job_list.count = 0; | |
end loop; | |
end process_new_jobs; | |
end job_queue_processor; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- | |
-- REUSING CURSOR + SIMPLE FETCH ROW-BY-ROW | |
-- | |
create or replace package body job_queue_processor as | |
procedure process_job(p_job Job_t | |
, p_thread_name varchar2) as | |
l_seconds number; | |
begin | |
dbms_output.put_line('Row: ' || p_job.content); | |
update job_queue j | |
set j.status = 'FINISHED' | |
,j.thread_name = p_thread_name | |
where j.id = p_job.id | |
; | |
l_seconds := round(dbms_random.value(), 2); | |
dbms_lock.sleep(seconds => l_seconds); | |
end; | |
procedure process_new_jobs(p_thread_name varchar2 | |
,p_row_limit integer := 1) as | |
cursor new_jobs_cursor is | |
select jq.* | |
from job_queue jq | |
where jq.status = 'NEW' | |
for update skip locked | |
; | |
l_job_list Job_List_t; | |
l_job Job_t; | |
begin | |
loop | |
open new_jobs_cursor; | |
fetch new_jobs_cursor into l_job; | |
exit when new_jobs_cursor%NOTFOUND; | |
-- processa job | |
process_job(p_job => l_job | |
,p_thread_name => p_thread_name); | |
commit; | |
close new_jobs_cursor; | |
end loop; | |
end process_new_jobs; | |
end job_queue_processor; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- | |
-- Using FOR-Loop | |
-- | |
create or replace package body job_queue_processor as | |
procedure process_job(p_job Job_t | |
, p_thread_name varchar2) as | |
l_seconds number; | |
begin | |
dbms_output.put_line('Row: ' || p_job.content); | |
update job_queue j | |
set j.status = 'FINISHED' | |
,j.thread_name = p_thread_name | |
where j.id = p_job.id | |
; | |
l_seconds := round(dbms_random.value(), 2); | |
dbms_lock.sleep(seconds => l_seconds); | |
end; | |
procedure process_new_jobs(p_thread_name varchar2 | |
,p_row_limit integer := 1) as | |
cursor new_jobs_cursor is | |
select jq.* | |
from job_queue jq | |
where jq.status = 'NEW' | |
for update skip locked | |
; | |
l_job_list Job_List_t; | |
l_job Job_t; | |
begin | |
for l_job in new_jobs_cursor | |
loop | |
-- processa job | |
process_job(p_job => l_job | |
,p_thread_name => p_thread_name); | |
end loop; | |
commit; -- commit somente no final quando NAO sabemos a qtd de linhas eh PERIGOSO! | |
end process_new_jobs; | |
end job_queue_processor; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
set timing on | |
set serveroutput on | |
begin | |
job_queue_processor.process_new_jobs(p_thread_name => 'thread_1' | |
,p_row_limit => 1); | |
end; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
select jq.thread_name as thread_name | |
,count(1) as total | |
,sum(count(1)) over() as sum_up | |
from job_queue jq | |
where jq.status = 'FINISHED' | |
group by jq.thread_name | |
order by total desc | |
; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Gist: Listing Oracle Jobs