Skip to content

Instantly share code, notes, and snippets.

View dasch's full-sized avatar
💭
LOOKING INTENTLY INTO THE VOID

Daniel Schierbeck dasch

💭
LOOKING INTENTLY INTO THE VOID
View GitHub Profile
@dasch
dasch / Kafka.elm
Last active August 7, 2021 06:09
Mock stream processing API in Elm
module Kafka exposing (..)
type alias KafkaRecord =
{ topic : String
, partition : Int
, offset : Int
, key : Maybe String
, value : Maybe String
}
@dasch
dasch / Kafka.elm
Last active January 29, 2017 03:30
module Kafka exposing (..)
import Stream exposing (Source, Sink)
source : String -> Source String String
sink : String -> Sink String String
import Stream exposing (Stream, Table)
import Json.Decode
type alias Username = String
type alias Clicks = Integer
type alias Region = String
userClicks : Stream Username Clicks
userClicks = Stream.stream "user-clicks"
|> Stream.mapValue String.toInt
module EventSourcing exposing
type alias EventRecord a =
{ timestamp : Time
, eventNumber : Int
, event : a
}
type alias EventLog a =
{ aggregateId : String
consumer = kafka.consumer(group_id: "worker")
consumer.subscribe("jobs")
# This could all be done in a high-level library.
loop do
# `poll` fetches a set of message batches.
batches = consumer.poll
consumer = kafka.consumer(group_id: "worker")
# If the user block raises an exception, the consumer will move on to the
# other partitions it's handling. Once finished, it will retry, up to x times.
# Once all attempts have been made, the partition is put on a pause list and
# not longer fetched from, until some timeout.
consumer.subscribe("jobs", retries: 3, failure_policy: :pause_partition)
# In this API, it's assumed that we'll pause a partition, so the configuration
# is in terms of the timeout. I guess if 0 is passed we would skip pausing...
# When no group id is passed there's no coordination.
consumer = kafka.consumer
consumer.subscribe("topic1")
# You can seek separately for each partition.
consumer.seek("topic1", partition: 0, offset: 42)
consumer.seek("topic1", partition: 1, offset: 13)
# `poll` returns a list of batches -- useful for e.g. checking against
# the highwater mark.
class ParseEntries
def process(message)
entry = JSON.parse(message.value)
[entry]
end
end
class CombineEntries
def initialize
@open_transactions = {}
@dasch
dasch / kafka-api.rb
Last active February 8, 2016 14:32
require "kafka"
# Current:
cluster = Kafka.new(seed_brokers: [...], client_id: "test", socket_timeout: 1)
producer = kafka.get_producer(required_acks: 2, ack_timeout: 2)
# Suggested:
cluster = Kafka::Cluster.new(seed_brokers: [...], client_id: "test", socket_timeout: 1)
producer = Kafka::Producer.new(cluster: cluster, required_acks: 2, ack_timeout: 2)
@dasch
dasch / kafka_consumer.rb
Last active January 18, 2016 09:58
Example Ruby Kafka client API
require 'kafka'
kafka_brokers = ENV.fetch("KAFKA_BROKERS").split(",")
kafka_topic = "application.events"
kafka = Kafka.new(brokers: kafka_brokers, logger: logger)
consumer = kafka.consumer(group_id: "my-consumer", autocommit: true)
consumer.subscribe(kafka_topic) do |messages|