Skip to content

Instantly share code, notes, and snippets.

@brycesch
Last active February 5, 2020 05:24
Show Gist options
  • Save brycesch/920eeca8aac93a8b7f71fccbf5289734 to your computer and use it in GitHub Desktop.
Save brycesch/920eeca8aac93a8b7f71fccbf5289734 to your computer and use it in GitHub Desktop.
stream w2json to kafka
# psql -At -f example.sql postgres
CREATE TABLE table_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c));
CREATE TABLE table_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT);
BEGIN;
INSERT INTO table_with_pk (b, c) VALUES('Backup and Restore', now());
INSERT INTO table_with_pk (b, c) VALUES('Tuning', now());
INSERT INTO table_with_pk (b, c) VALUES('Replication', now());
DELETE FROM table_with_pk WHERE a < 3;
INSERT INTO table_without_pk (b, c) VALUES(2.34, 'Tapir');
-- it is not added to stream because there isn't a pk or a replica identity
UPDATE table_without_pk SET c = 'Anta' WHERE c = 'Tapir';
COMMIT;
INSERT INTO table_with_pk (b, c) VALUES('Backup and Restore', now());
INSERT INTO table_with_pk (b, c) VALUES('Tuning', now());
INSERT INTO table_with_pk (b, c) VALUES('Replication', now());
source 'https://rubygems.org'
gem 'eventmachine'
gem 'eventmachine-tail'
gem 'yajl'
gem 'ruby-kafka'
pg_recvlogical -d postgres --slot test_slot --create-slot -P wal2json
pg_recvlogical -d postgres --slot test_slot --start -f out.json
pg_recvlogical -d postgres --slot test_slot --drop-slot
# ruby streamer.rb out.json
require "rubygems"
require "eventmachine"
require "eventmachine-tail"
require "yajl"
require "ruby-kafka"
require "json"
$kafka = Kafka.new(["kafka://localhost:9092","kafka://localhost:9093"], client_id: "streamer")
class Streamer < EventMachine::FileTail
def initialize(path, startpos=-1)
super(path, startpos)
@parser = Yajl::Parser.new(:symbolize_keys => true)
@parser.on_parse_complete = method(:object_parsed)
end
def object_parsed(obj)
puts "data: #{obj} \n\n"
$kafka.deliver_message(JSON.dump(obj), topic: $topic)
end
def receive_data(data)
@parser << data
end
end
def main(args)
if args.length == 0
puts "Usage: #{$0} <path> [path2] [...]"
return 1
end
$topic = $0 || "default"
EventMachine.run do
args.each do |path|
EventMachine::file_tail(path, Streamer)
end
end
end # def main
exit(main(ARGV))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment