Last active
April 1, 2022 19:24
-
-
Save JoshCheek/e0d435113f7ac8c07284880be6c41d0a to your computer and use it in GitHub Desktop.
Example of using postgres's notification system to alert processes to available work
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| require 'pg' | |
| require 'json' | |
| $db = PG.connect | |
| $db.exec <<~SQL | |
| create table if not exists outbox ( | |
| id serial primary key, | |
| processed bool default false not null, | |
| payload jsonb not null | |
| ); | |
| create or replace function notify_outbox_updated() returns trigger as $$ begin | |
| perform pg_notify('outbox_updated', new.id::text); | |
| -- raise notice 'sql notifying outbox_updated for record %', new; | |
| return new; | |
| end $$ language plpgsql; | |
| create or replace trigger outbox_updated_trigger | |
| after insert on outbox | |
| for each statement execute procedure notify_outbox_updated(); | |
| SQL |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| require_relative 'common' | |
| $db.exec 'listen outbox_updated;' | |
| wait_seconds = 1 | |
| batch_size = (ARGV[0] || 100).to_i | |
| done = false | |
| trap(:INT) { done = true } | |
| puts "Processing in batches of #{batch_size}" | |
| process = lambda do | |
| $db.exec 'begin' | |
| records = $db.exec_params(<<~SQL, [batch_size]).to_a | |
| select id, payload | |
| from outbox | |
| where not processed | |
| order by id | |
| for update | |
| skip locked | |
| limit $1 | |
| SQL | |
| puts "Processing batch of #{records.size} records:" unless records.empty? | |
| records.each.with_index 1 do |rec, index| | |
| $db.exec_params 'update outbox set processed = true where id = $1', [rec['id']] | |
| printf( | |
| " %#{records.size.digits.size}d/%d outbox processed %s: payload=%p\n", | |
| index, | |
| records.size, | |
| rec['id'], | |
| JSON.parse(rec["payload"]), | |
| ) | |
| end | |
| $db.exec 'commit' | |
| process.call if !done && records.size == batch_size | |
| end | |
| process.call | |
| $db.wait_for_notify(wait_seconds) && process.call until done |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| require_relative 'common' | |
| words = File.readlines "/usr/share/dict/words", chomp: true | |
| if ARGV[0] =~ /\A\d+\z/ | |
| offset = $db.exec('select count(1) from outbox').first['count'].to_i | |
| words = words.drop(offset).take ARGV[0].to_i | |
| else | |
| words = words.cycle | |
| end | |
| words.each_slice 1000 do |words| | |
| payloads = words.map { JSON.dump message: _1 } | |
| values = (1..payloads.size).map { "($#{_1})" }.join(", ") | |
| records = $db.exec_params <<~SQL, payloads | |
| insert into outbox (payload) | |
| values #{values} | |
| returning * | |
| SQL | |
| puts records.map { |rec| | |
| "app inserted: #{rec['id']} #{JSON.parse(rec['payload']).inspect}" | |
| } | |
| end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment