Last active
February 14, 2023 15:35
-
-
Save dasch/af38c28e62117e19f6ba to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ParseEntries | |
def process(message) | |
entry = JSON.parse(message.value) | |
[entry] | |
end | |
end | |
class CombineEntries | |
def initialize | |
@open_transactions = {} | |
end | |
def process(entry) | |
txnid = entry.fetch("txnid") | |
if entry.fetch("commit") | |
entries = @open_transactions.delete(txnid) | |
entries << entry | |
[entries] | |
else | |
@open_transactions[txnid] ||= [] | |
@open_transactions[txnid] << entry | |
[] | |
end | |
end | |
end | |
class PublishTransactions | |
def initialize | |
kafka = Kafka.new(...) | |
@producer = kafka.producer | |
end | |
def process(transaction_entries) | |
data = JSON.dump(transaction_entries) | |
@producer.produce(data, topic: "maxwell-transactions") | |
@producer.deliver | |
[] | |
end | |
end | |
class Pipeline | |
def initialize(topic:, group_id:) | |
brokers = ENV.fetch("KAFKA_BROKERS").split(",") | |
kafka = Kafka.new(seed_brokers: brokers) | |
@consumer = kafka.consumer(group_id: group_id) | |
@consumer.subscribe(topic) | |
@stages = [] | |
end | |
def add_stage(stage) | |
@stages << stage | |
end | |
def run | |
@consumer.each_message do |message| | |
@stages.inject([message] do |records, stage| | |
records.flat_map {|record| stage.process(record) } | |
end | |
end | |
end | |
end | |
pipeline = Pipeline.new(topic: "maxwell", group_id: "maxwell-transaction-combiner") | |
pipeline.add_stage(ParseEntries.new) | |
pipeline.add_stage(CombineEntries.new) | |
pipeline.add_stage(PublishTransactions.new) | |
pipeline.run |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I believe line 61 should read:
(closing parenthesis on method call, before the
do
)