Last active
January 29, 2020 08:58
-
-
Save vadv/294fb98b76c44aba800f4065bb00e79b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* list of order */ | |
create table "order" ( | |
id text primary key, | |
created_at timestamp with time zone default current_timestamp, | |
processed_at timestamp with time zone, | |
processed_state text | |
); | |
/* processed transition on order.id */ | |
create table order_events ( | |
id bigserial, | |
order_id text references "order" (id), | |
operation text not null, | |
state text not null, | |
processed_at timestamp with time zone not null default current_timestamp | |
) | |
/* | |
partition by range (processed_at); | |
create index order_events_order_id_idx on order_events(order_id); | |
create table order_events_2019_04 partition of order_events for values from ('2019-04-01') to ('2019-04-30'); | |
*/; | |
/* list of allowed fsm transition */ | |
create table order_event_transition ( | |
operation text not null, | |
from_state text not null, | |
to_state text not null, | |
PRIMARY KEY (operation, from_state) | |
); | |
/* hot table for unresolved order's and queue processing */ | |
create table order_pending ( | |
id bigserial primary key, | |
order_id text unique, | |
created_at timestamp with time zone default current_timestamp, | |
locked_at timestamp with time zone | |
); | |
create or replace function try_obtain_lock_order_id(id text) returns void as $$ | |
begin | |
/* try to lock order_id */ | |
perform pg_try_advisory_xact_lock(('x'||substr(md5($1),1,16))::bit(64)::bigint); | |
end | |
$$ language 'plpgsql'; | |
create or replace function order_operation_for(order_id text) returns text as $$ | |
/* | |
get allowed operations for specified order_id | |
*/ | |
with current_state as ( | |
select state from order_events e where e.order_id = $1 order by processed_at limit 1 | |
) | |
select | |
o.operation | |
from | |
order_event_transition o, current_state c on c.state = o.from_state | |
$$ language 'sql'; | |
create or replace function order_events_check_valid_insert() returns trigger as $$ | |
begin | |
perform try_obtain_lock_order_id(new.order_id); | |
-- check new state | |
if new.state not in (select to_state from order_event_transition where operation = new.state) then | |
raise 'cant perform operation "%" to state "%"', new.operation, new.state using errcode = 'unique_violation'; | |
end if; | |
-- check current state | |
if new.operation not in (select * from order_operation_for(new.order_id)) then | |
raise 'cant perform operation "%" to current order state', new.operation using errcode = 'unique_violation'; | |
end if; | |
return new; | |
end | |
$$ language 'plpgsql'; | |
create constraint trigger fsm_order_events_check_valid_insert_trigger | |
after insert on order_events for each row | |
execute procedure order_events_check_valid_insert(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
insert into order_event_transition (operation, from_state, to_state) values ('pending', 'create', 'create'); | |
insert into order_event_transition (operation, from_state, to_state) values ('start', 'create', 'awaiting_payment'); | |
insert into order_event_transition (operation, from_state, to_state) values ('pay', 'awaiting_payment', 'awaiting_shipment'); | |
insert into order_event_transition (operation, from_state, to_state) values ('ship', 'awaiting_shipment', 'shipped'); | |
insert into order_event_transition (operation, from_state, to_state) values ('cancel', 'awaiting_shipment', 'awaiting_refund'); | |
insert into order_event_transition (operation, from_state, to_state) values ('refund', 'awaiting_refund', 'canceled'); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* pending */ | |
begin; | |
select try_obtain_lock_order_id('a929b8ce50234aa3ec428ed8db640622'); | |
insert into "order" (id) values ('a929b8ce50234aa3ec428ed8db640622'); | |
insert into order_events (order_id, operation, state) values ('a929b8ce50234aa3ec428ed8db640622', 'pending', 'create'); | |
insert into order_pending (order_id) values ('a929b8ce50234aa3ec428ed8db640622'); | |
commit; | |
/* get unprocessed from queue */ | |
begin; | |
select | |
order_id, try_obtain_lock_order_id(order_id) | |
from order_pending where locked_at is null for update skip locked limit 10; | |
-- load order_id to application | |
update order_pending set locked_at = current_timestamp where order_id in (...); | |
commit; | |
/* start */ | |
begin; | |
select try_obtain_lock_order_id('a929b8ce50234aa3ec428ed8db640622'); | |
insert into order_events (order_id, operation, state) values ('a929b8ce50234aa3ec428ed8db640622', 'start', 'awaiting_payment'); | |
commit; | |
/* pay */ | |
begin; | |
select try_obtain_lock_order_id('a929b8ce50234aa3ec428ed8db640622'); | |
insert into order_events (order_id, operation, state) values ('a929b8ce50234aa3ec428ed8db640622', 'pay', 'awaiting_shipment'); | |
commit; | |
/* ship */ | |
begin; | |
select try_obtain_lock_order_id('a929b8ce50234aa3ec428ed8db640622'); | |
insert into order_events (order_id, operation, state) values ('a929b8ce50234aa3ec428ed8db640622', 'ship', 'shipped'); | |
update "order" set processed_state = 'shipped', processed_at = current_timestamp where id = 'a929b8ce50234aa3ec428ed8db640622'; | |
delete from order_pending where order_id = 'a929b8ce50234aa3ec428ed8db640622'; | |
commit; | |
/* get history */ | |
begin; | |
select try_obtain_lock_order_id('a929b8ce50234aa3ec428ed8db640622'); | |
select | |
o.id, | |
o.created_at, | |
e.operation, | |
e.state, | |
e.processed_at | |
from | |
"order" o | |
inner join "order_events" e on e.order_id = o.id | |
where o.id = 'a929b8ce50234aa3ec428ed8db640622' | |
order by e.processed_at; | |
commit; | |
/* | |
id | created_at | operation | state | processed_at | |
----------------------------------+-------------------------------+-----------+-------------------+------------------------------- | |
a929b8ce50234aa3ec428ed8db640622 | 2019-04-09 16:46:13.730044+03 | pending | create | 2019-04-09 16:46:13.730044+03 | |
a929b8ce50234aa3ec428ed8db640622 | 2019-04-09 16:46:13.730044+03 | start | awaiting_payment | 2019-04-09 16:46:13.737338+03 | |
a929b8ce50234aa3ec428ed8db640622 | 2019-04-09 16:46:13.730044+03 | pay | awaiting_shipment | 2019-04-09 16:46:13.738623+03 | |
a929b8ce50234aa3ec428ed8db640622 | 2019-04-09 16:46:13.730044+03 | ship | shipped | 2019-04-09 16:46:13.739792+03 | |
*/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This doesn't work? I guess there's an issue with secondary
SELECT
due to aliases.