Skip to content

Instantly share code, notes, and snippets.

@JoshCheek
Last active April 1, 2022 19:24
Show Gist options
  • Save JoshCheek/e0d435113f7ac8c07284880be6c41d0a to your computer and use it in GitHub Desktop.
Save JoshCheek/e0d435113f7ac8c07284880be6c41d0a to your computer and use it in GitHub Desktop.
Example of using postgres's notification system to alert processes to available work
require 'pg'
require 'json'
$db = PG.connect
$db.exec <<~SQL
create table if not exists outbox (
id serial primary key,
processed bool default false not null,
payload jsonb not null
);
create or replace function notify_outbox_updated() returns trigger as $$ begin
perform pg_notify('outbox_updated', new.id::text);
-- raise notice 'sql notifying outbox_updated for record %', new;
return new;
end $$ language plpgsql;
create or replace trigger outbox_updated_trigger
after insert on outbox
for each statement execute procedure notify_outbox_updated();
SQL
require_relative 'common'
$db.exec 'listen outbox_updated;'
wait_seconds = 1
batch_size = (ARGV[0] || 100).to_i
done = false
trap(:INT) { done = true }
puts "Processing in batches of #{batch_size}"
process = lambda do
$db.exec 'begin'
records = $db.exec_params(<<~SQL, [batch_size]).to_a
select id, payload
from outbox
where not processed
order by id
for update
skip locked
limit $1
SQL
puts "Processing batch of #{records.size} records:" unless records.empty?
records.each.with_index 1 do |rec, index|
$db.exec_params 'update outbox set processed = true where id = $1', [rec['id']]
printf(
" %#{records.size.digits.size}d/%d outbox processed %s: payload=%p\n",
index,
records.size,
rec['id'],
JSON.parse(rec["payload"]),
)
end
$db.exec 'commit'
process.call if !done && records.size == batch_size
end
process.call
$db.wait_for_notify(wait_seconds) && process.call until done
require_relative 'common'
words = File.readlines "/usr/share/dict/words", chomp: true
if ARGV[0] =~ /\A\d+\z/
offset = $db.exec('select count(1) from outbox').first['count'].to_i
words = words.drop(offset).take ARGV[0].to_i
else
words = words.cycle
end
words.each_slice 1000 do |words|
payloads = words.map { JSON.dump message: _1 }
values = (1..payloads.size).map { "($#{_1})" }.join(", ")
records = $db.exec_params <<~SQL, payloads
insert into outbox (payload)
values #{values}
returning *
SQL
puts records.map { |rec|
"app inserted: #{rec['id']} #{JSON.parse(rec['payload']).inspect}"
}
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment