Last active
February 5, 2020 05:24
-
-
Save brycesch/920eeca8aac93a8b7f71fccbf5289734 to your computer and use it in GitHub Desktop.
stream w2json to kafka
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# psql -At -f example.sql postgres | |
CREATE TABLE table_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c)); | |
CREATE TABLE table_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT); | |
BEGIN; | |
INSERT INTO table_with_pk (b, c) VALUES('Backup and Restore', now()); | |
INSERT INTO table_with_pk (b, c) VALUES('Tuning', now()); | |
INSERT INTO table_with_pk (b, c) VALUES('Replication', now()); | |
DELETE FROM table_with_pk WHERE a < 3; | |
INSERT INTO table_without_pk (b, c) VALUES(2.34, 'Tapir'); | |
-- it is not added to stream because there isn't a pk or a replica identity | |
UPDATE table_without_pk SET c = 'Anta' WHERE c = 'Tapir'; | |
COMMIT; | |
INSERT INTO table_with_pk (b, c) VALUES('Backup and Restore', now()); | |
INSERT INTO table_with_pk (b, c) VALUES('Tuning', now()); | |
INSERT INTO table_with_pk (b, c) VALUES('Replication', now()); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
source 'https://rubygems.org' | |
gem 'eventmachine' | |
gem 'eventmachine-tail' | |
gem 'yajl' | |
gem 'ruby-kafka' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
pg_recvlogical -d postgres --slot test_slot --create-slot -P wal2json | |
pg_recvlogical -d postgres --slot test_slot --start -f out.json | |
pg_recvlogical -d postgres --slot test_slot --drop-slot |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# ruby streamer.rb out.json | |
require "rubygems" | |
require "eventmachine" | |
require "eventmachine-tail" | |
require "yajl" | |
require "ruby-kafka" | |
require "json" | |
$kafka = Kafka.new(["kafka://localhost:9092","kafka://localhost:9093"], client_id: "streamer") | |
class Streamer < EventMachine::FileTail | |
def initialize(path, startpos=-1) | |
super(path, startpos) | |
@parser = Yajl::Parser.new(:symbolize_keys => true) | |
@parser.on_parse_complete = method(:object_parsed) | |
end | |
def object_parsed(obj) | |
puts "data: #{obj} \n\n" | |
$kafka.deliver_message(JSON.dump(obj), topic: $topic) | |
end | |
def receive_data(data) | |
@parser << data | |
end | |
end | |
def main(args) | |
if args.length == 0 | |
puts "Usage: #{$0} <path> [path2] [...]" | |
return 1 | |
end | |
$topic = $0 || "default" | |
EventMachine.run do | |
args.each do |path| | |
EventMachine::file_tail(path, Streamer) | |
end | |
end | |
end # def main | |
exit(main(ARGV)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment