This document will not disclose the topic why you need fsm.
But if you implement fsm, then you need to do it as nearly as possible in the data.
This document will explain how you can implement it with PostgreSQL and gives developers maximum freedom.
We create two normalized tables, in one the information about the order is stored
in the other movement for each order.id
.
/* list of order */
create table "order" (
id text primary key,
created_at timestamp with time zone default current_timestamp,
processed_at timestamp with time zone,
processed_state text
);
/* processed transition on order.id */
create table order_events (
id bigserial,
order_id text references "order" (id),
operation text not null,
state text not null,
processed_at timestamp with time zone not null default current_timestamp
);
In the table order_event_transition
we will store the allowed transition from one state to another:
/* list of allowed fsm transition */
create table order_event_transition (
operation text not null,
from_state text not null,
to_state text not null,
PRIMARY KEY (operation, from_state)
);
In the table order_pending
we will store the transactions that need to be processed.
The use of such a table is necessary to avoid bloating on the main tables.
/* hot table for unresolved order's and queue processing */
create table order_pending (
id bigserial primary key,
order_id text unique,
created_at timestamp with time zone default current_timestamp,
locked_at timestamp with time zone
);
Helper function to lock specific order.id
to avoid a race condition.
create or replace function obtain_lock_order_id(id text) returns void as $$
begin
/* lock order_id */
perform pg_advisory_xact_lock(('x'||substr(md5($1),1,16))::bit(64)::bigint);
end
$$ language 'plpgsql';
Helper function that tries to lock specific order.id
. It is used to get unprocessed orders.
The only difference is that it doesn't wait for lock to be released and returns false
if the lock has already been taken.
create or replace function try_obtain_lock_order_id(id text) returns bool as $$
begin
return (select pg_try_advisory_xact_lock(('x'||substr(md5($1),1,16))::bit(64)::bigint));
end
$$ language 'plpgsql';
This is a minimal example to check if a transaction change is allowed.
It's works automatically as a constraint trigger
:
begin;
create or replace function allowed_transition(order_id text) returns setof order_event_transition as $$
select o.* from order_event_transition o where
coalesce(o.from_state, '<null>') in (
select coalesce(state, '<null>')
from order_events e where e.order_id = $1 order by processed_at desc limit 2);
$$ language 'sql';
create or replace function order_events_check_valid_insert() returns trigger as $$
begin
perform obtain_lock_order_id(new.order_id);
-- check new state
if new.state not in (select to_state from order_event_transition where operation = new.operation) then
raise 'cant perform operation "%" to state "%"', new.operation, new.state using errcode = 'unique_violation';
end if;
-- check current state
if not exists (select 1 from allowed_transition(new.order_id) where operation = new.operation and to_state = new.state) then
raise 'cant perform operation "%" to current order state', new.operation using errcode = 'unique_violation';
end if;
return new;
end
$$ language 'plpgsql';
create constraint trigger fsm_order_events_check_valid_insert_trigger
after insert on order_events for each row
execute procedure order_events_check_valid_insert();
Create allowed transitions:
insert into order_event_transition (operation, from_state, to_state) values ('pending', null, 'create');
insert into order_event_transition (operation, from_state, to_state) values ('start', 'create', 'awaiting_payment');
insert into order_event_transition (operation, from_state, to_state) values ('pay', 'awaiting_payment', 'awaiting_shipment');
insert into order_event_transition (operation, from_state, to_state) values ('ship', 'awaiting_shipment', 'shipped');
insert into order_event_transition (operation, from_state, to_state) values ('cancel', 'awaiting_shipment', 'awaiting_refund');
insert into order_event_transition (operation, from_state, to_state) values ('refund', 'awaiting_refund', 'canceled');
begin;
select obtain_lock_order_id('a929b8ce50234aa3ec428ed8db640622');
insert into "order" (id) values ('a929b8ce50234aa3ec428ed8db640622');
insert into order_events (order_id, operation, state) values ('a929b8ce50234aa3ec428ed8db640622', 'pending', 'create');
insert into order_pending (order_id) values ('a929b8ce50234aa3ec428ed8db640622');
commit;
begin;
select
order_id
from order_pending where locked_at is null and try_obtain_lock_order_id(order_id) limit 10;
-- load order_id to application
update order_pending set locked_at = current_timestamp where order_id in (...);
commit;
begin;
select obtain_lock_order_id('a929b8ce50234aa3ec428ed8db640622');
insert into order_events (order_id, operation, state) values ('a929b8ce50234aa3ec428ed8db640622', 'start', 'awaiting_payment');
commit;
begin;
select obtain_lock_order_id('a929b8ce50234aa3ec428ed8db640622');
insert into order_events (order_id, operation, state) values ('a929b8ce50234aa3ec428ed8db640622', 'pay', 'awaiting_shipment');
commit;
begin;
select obtain_lock_order_id('a929b8ce50234aa3ec428ed8db640622');
insert into order_events (order_id, operation, state) values ('a929b8ce50234aa3ec428ed8db640622', 'ship', 'shipped');
update "order" set processed_state = 'shipped', processed_at = current_timestamp where id = 'a929b8ce50234aa3ec428ed8db640622';
delete from order_pending where order_id = 'a929b8ce50234aa3ec428ed8db640622';
commit;
/* get history */
begin;
select obtain_lock_order_id('a929b8ce50234aa3ec428ed8db640622');
select
o.id,
o.created_at,
e.operation,
e.state,
e.processed_at
from
"order" o
inner join "order_events" e on e.order_id = o.id
where o.id = 'a929b8ce50234aa3ec428ed8db640622'
order by e.processed_at;
commit;
/*
id | created_at | operation | state | processed_at
----------------------------------+-------------------------------+-----------+-------------------+-------------------------------
a929b8ce50234aa3ec428ed8db640622 | 2019-04-09 16:46:13.730044+03 | pending | create | 2019-04-09 16:46:13.730044+03
a929b8ce50234aa3ec428ed8db640622 | 2019-04-09 16:46:13.730044+03 | start | awaiting_payment | 2019-04-09 16:46:13.737338+03
a929b8ce50234aa3ec428ed8db640622 | 2019-04-09 16:46:13.730044+03 | pay | awaiting_shipment | 2019-04-09 16:46:13.738623+03
a929b8ce50234aa3ec428ed8db640622 | 2019-04-09 16:46:13.730044+03 | ship | shipped | 2019-04-09 16:46:13.739792+03
*/
Your first seed state doesn't work,
I guess null should be
<null>
or maybe thefrom_state
type is wrong FYI.