Created
June 5, 2020 00:17
-
-
Save adamstrickland/bd28c7c6c3ce374547a38ca5dcdd0378 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# frozen_string_literal: true | |
require "delivery_boy" | |
class EventPublisher | |
KAFKA_TOPIC_PREFIX = ENV.fetch("KAFKA_TOPIC_PREFIX") | |
THREAD_LOCAL_KEY = :event_publisher_transaction_queue | |
DEFAULT_EVENT_DELIVERER = "ThreadLocalBackedDeliverer".freeze | |
delegate :add_event, :transaction_opened!, :transaction_committed!, :transaction_rolled_back!, to: :deliverer | |
def self.add_event(event) | |
deliverer.add_event(event) | |
end | |
def deliverer | |
deliverer_name = ENV.fetch("KAFKA_EVENT_DELIVERER", DEFAULT_EVENT_DELIVERER) | |
"EventPublisher::#{deliverer_name}".constantize | |
end | |
class SimpleArrayDeliverer | |
def self.add_event(event) | |
end | |
def self.transaction_opened! | |
@messages ||= [] | |
end | |
def self.transaction_committed! | |
end | |
def self.transaction_rolled_back! | |
end | |
end | |
class DeliveryBoyDeliverer | |
def self.add_event(event) | |
end | |
def self.transaction_opened! | |
@ | |
end | |
def self.transaction_committed! | |
end | |
def self.transaction_rolled_back! | |
end | |
end | |
class ThreadLocalBackedDeliverer | |
def self.transaction_opened! | |
Thread.current[THREAD_LOCAL_KEY] ||= [] | |
end | |
def self.transaction_committed! | |
transaction_events.each do |event| | |
publish!(event) | |
end | |
clear_transaction_events! | |
end | |
def self.transaction_rolled_back! | |
clear_transaction_events! | |
end | |
def self.transaction_open? | |
Thread.current[THREAD_LOCAL_KEY].is_a?(Array) | |
end | |
def self.add_event(event) | |
if transaction_open? | |
transaction_events << event | |
else | |
publish!(event) | |
end | |
end | |
def self.publish!(event) | |
return unless Flipper.enabled?(:event_publishing) | |
if KafkaClient.nil? | |
log_event!(event, message: "Kafka client not initialized") | |
else | |
publish_event!(event) | |
end | |
end | |
private_class_method def self.increment_metric! | |
NewRelic::Agent.increment_metric("Custom/EventPublisher/publish_event!", 1) | |
end | |
private_class_method def self.publish_event!(event) | |
table_name = event[:data].keys[0] | |
begin | |
KafkaClient.deliver_message(event.to_json, topic: "#{KAFKA_TOPIC_PREFIX}#{table_name}") | |
increment_metric! | |
rescue StandardError => e | |
# Broad rescue so that event publishing dooes not interfere with user experience/business logic | |
Rollbar.error(e) | |
log_event!(event, severity: :error, message: "Failed to send event to Kafka") | |
end | |
end | |
private_class_method def self.log_event!(event, severity: :debug, message: "Logging event") | |
Rails.logger.send(severity, "#{message}: #{event.to_json}") | |
end | |
private_class_method def self.clear_transaction_events! | |
Thread.current[THREAD_LOCAL_KEY] = nil | |
end | |
private_class_method def self.transaction_events | |
Thread.current[THREAD_LOCAL_KEY] | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment