Skip to content

Instantly share code, notes, and snippets.

@Gabriella439
Last active December 4, 2020 18:30
Show Gist options
  • Save Gabriella439/3cc9e44f445923ac79d23b6ff8d02c1c to your computer and use it in GitHub Desktop.
Save Gabriella439/3cc9e44f445923ac79d23b6ff8d02c1c to your computer and use it in GitHub Desktop.
Code used for Kafka livestream example
let
nixpkgs = builtins.fetchTarball {
url = "https://github.com/NixOS/nixpkgs/archive/f3fc2f3326a23797b2c95297e68ed8e8de2a95e6.tar.gz";
sha256 = "0wsplccl8bv522zh3y8affacw9pmzsxm18i5hdgz78gxh8m7k933";
};
nixos = import "${nixpkgs}/nixos" {
system = "x86_64-linux";
configuration = { pkgs, ... }:
let
ghc = pkgs.haskellPackages.ghcWithPackages (haskell: [
haskell.hw-kafka-client
]);
script = pkgs.runCommand "haskell-script" {} ''
mkdir -p $out/bin
${ghc}/bin/ghc -threaded -O -outputdir . -Wall ${./test.hs} -o $out/bin/script
'';
in
{ environment.systemPackages = [ script ];
services = {
apache-kafka = {
enable = true;
extraProperties = ''
offsets.topic.replication.factor=1
'';
};
zookeeper.enable = true;
};
users = {
mutableUsers = false;
users.root.password = "";
};
virtualisation = {
graphics = false;
memorySize = 2048;
};
};
};
in
nixos.vm
{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
module Main where
import Kafka.Consumer
( BrokerAddress(..)
, ConsumerGroupId(..)
, ConsumerProperties
, ConsumerRecord(..)
, OffsetReset(..)
, Subscription
, Timeout(..)
, TopicName(..)
)
import Control.Exception (Exception)
import Kafka.Producer
( KafkaError(..)
, RdKafkaRespErrT(..)
, ProducePartition(..)
, ProducerProperties
, ProducerRecord(..)
)
import qualified Control.Concurrent as Concurrent
import qualified Control.Monad as Monad
import qualified Control.Exception as Exception
import qualified Kafka.Consumer as Consumer
import qualified Kafka.Producer as Producer
consumerProperties :: ConsumerProperties
consumerProperties =
Consumer.brokersList [ BrokerAddress "localhost:9092" ]
<> Consumer.noAutoCommit
<> Consumer.groupId (ConsumerGroupId "script")
producerProperties :: ProducerProperties
producerProperties =
Producer.brokersList [ BrokerAddress "localhost:9092" ]
subscription :: Subscription
subscription =
Consumer.topics [ TopicName "foo" ]
<> Consumer.offsetReset Earliest
throws :: Exception e => IO (Either e a) -> IO a
throws io = do
e <- io
case e of
Left exception -> Exception.throwIO exception
Right x -> return x
main :: IO ()
main = do
let open = do
throws (Producer.newProducer producerProperties)
Exception.bracket open Producer.closeProducer \producer -> do
m <- Producer.produceMessage producer ProducerRecord
{ prTopic = TopicName "foo"
, prPartition = UnassignedPartition
, prKey = Nothing
, prValue = Just "hello"
}
case m of
Just kafkaError -> Exception.throwIO kafkaError
Nothing -> return ()
let open = do
throws (Consumer.newConsumer consumerProperties subscription)
let close consumer = do
m <- Consumer.closeConsumer consumer
case m of
Just kafkaError ->
Exception.throwIO kafkaError
Nothing ->
return ()
Concurrent.threadDelay 1000
Exception.bracket open close \consumer -> Monad.forever do
e <- Consumer.pollMessage consumer (Timeout 1000)
case e of
Left (KafkaResponseError RdKafkaRespErrTimedOut) -> do
return ()
Left exception -> do
Exception.throwIO exception
Right ConsumerRecord{..} -> do
print crValue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment