This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Kafka exposing (..) | |
type alias KafkaRecord = | |
{ topic : String | |
, partition : Int | |
, offset : Int | |
, key : Maybe String | |
, value : Maybe String | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Kafka exposing (..) | |
import Stream exposing (Source, Sink) | |
source : String -> Source String String | |
sink : String -> Sink String String |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Stream exposing (Stream, Table) | |
import Json.Decode | |
type alias Username = String | |
type alias Clicks = Integer | |
type alias Region = String | |
userClicks : Stream Username Clicks | |
userClicks = Stream.stream "user-clicks" | |
|> Stream.mapValue String.toInt |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module EventSourcing exposing | |
type alias EventRecord a = | |
{ timestamp : Time | |
, eventNumber : Int | |
, event : a | |
} | |
type alias EventLog a = | |
{ aggregateId : String |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
consumer = kafka.consumer(group_id: "worker") | |
consumer.subscribe("jobs") | |
# This could all be done in a high-level library. | |
loop do | |
# `poll` fetches a set of message batches. | |
batches = consumer.poll |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
consumer = kafka.consumer(group_id: "worker") | |
# If the user block raises an exception, the consumer will move on to the | |
# other partitions it's handling. Once finished, it will retry, up to x times. | |
# Once all attempts have been made, the partition is put on a pause list and | |
# not longer fetched from, until some timeout. | |
consumer.subscribe("jobs", retries: 3, failure_policy: :pause_partition) | |
# In this API, it's assumed that we'll pause a partition, so the configuration | |
# is in terms of the timeout. I guess if 0 is passed we would skip pausing... |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# When no group id is passed there's no coordination. | |
consumer = kafka.consumer | |
consumer.subscribe("topic1") | |
# You can seek separately for each partition. | |
consumer.seek("topic1", partition: 0, offset: 42) | |
consumer.seek("topic1", partition: 1, offset: 13) | |
# `poll` returns a list of batches -- useful for e.g. checking against | |
# the highwater mark. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ParseEntries | |
def process(message) | |
entry = JSON.parse(message.value) | |
[entry] | |
end | |
end | |
class CombineEntries | |
def initialize | |
@open_transactions = {} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "kafka" | |
# Current: | |
cluster = Kafka.new(seed_brokers: [...], client_id: "test", socket_timeout: 1) | |
producer = kafka.get_producer(required_acks: 2, ack_timeout: 2) | |
# Suggested: | |
cluster = Kafka::Cluster.new(seed_brokers: [...], client_id: "test", socket_timeout: 1) | |
producer = Kafka::Producer.new(cluster: cluster, required_acks: 2, ack_timeout: 2) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'kafka' | |
kafka_brokers = ENV.fetch("KAFKA_BROKERS").split(",") | |
kafka_topic = "application.events" | |
kafka = Kafka.new(brokers: kafka_brokers, logger: logger) | |
consumer = kafka.consumer(group_id: "my-consumer", autocommit: true) | |
consumer.subscribe(kafka_topic) do |messages| |