Last active
October 7, 2018 18:20
-
-
Save RocketPuppy/018fc5f60b05e557de0c37e0749232a5 to your computer and use it in GitHub Desktop.
Kafka Connect Setup
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Create and deploy the test cluster, may take a while | |
nixops create -d kafka-test cluster.nix | |
nixops deploy -d kafka-test --force-reboot | |
# Check up on kafka-connect | |
nixops ssh -d kafka-test kafka-0 | |
> journalctl -u apache-kafka-connect.service | |
> systemctl start apache-kafka-connect.service | |
# Tear it down | |
nixops stop -d kafka-test | |
nixops destroy -d kafka-test | |
nixops delete -d kafka-test |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
let infra = import ./infrastructure-vbox.nix; | |
kafkaInfra = infra.kafka; | |
zookeeperInfra = infra.zookeeper; | |
logInfra = infra.log-collector; | |
nixpkgs = import ./nixpkgs.nix {}; | |
kafka-connect = import ./kafka-connect.nix; | |
makeKafkaServer = (import ./kafka.nix { | |
infra = kafkaInfra; | |
zookeeperServers = zookeeperServers; | |
kafka = nixpkgs.apacheKafka; | |
extraServices = (kafka-connect { | |
kafka = { | |
servers = builtins.map ({ name, ... }: { ip = name; port = 9092; }) kafkaServers; | |
package = nixpkgs.apacheKafka; | |
}; | |
}); | |
}).generate; | |
makeZookeeperServer = (import ./zookeeper.nix { infra = zookeeperInfra; zookeeperServers = zookeeperServers; }).generate; | |
zookeeperServers = builtins.genList makeZookeeperServer 1; | |
kafkaServers = builtins.genList makeKafkaServer 1; | |
logCollector = import ./log-collector.nix { infra = logInfra; }; | |
in { network.description = "End-to-end setup"; log-collector = logCollector; } | |
// builtins.listToAttrs zookeeperServers | |
// builtins.listToAttrs kafkaServers |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
kafka = | |
{ deployment.targetEnv = "virtualbox"; | |
deployment.virtualbox.memorySize = 1024; | |
}; | |
zookeeper = | |
{ deployment.targetEnv = "virtualbox"; | |
deployment.virtualbox.memorySize = 256; | |
}; | |
log-collector = | |
{ deployment.targetEnv = "virtualbox"; | |
deployment.virtualbox.memorySize = 128; | |
}; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ kafka }: | |
let bootstrapServers = builtins.concatStringsSep "," (builtins.map ({ip, port }: "${ip}:${builtins.toString port}") kafka.servers); | |
logConfig = builtins.toFile "log4j.properties" '' | |
log4j.rootLogger=DEBUG, stderr | |
log4j.appender.stderr=org.apache.log4j.ConsoleAppender | |
log4j.appender.stderr.layout=org.apache.log4j.PatternLayout | |
log4j.appender.stderr.layout.ConversionPattern=[KAFKA-CONNECT] [%d] %p %m (%c)%n | |
log4j.appender.stderr.Target=System.err | |
''; | |
config = builtins.toFile "distributed-config.properties" '' | |
bootstrap.servers = ${bootstrapServers} | |
key.converter = org.apache.kafka.connect.json.JsonConverter | |
value.converter = org.apache.kafka.connect.json.JsonConverter | |
group.id = connect-cluster | |
rest.port = 8080 | |
config.storage.topic = connect-config | |
config.storage.replication.factor = 1 | |
offset.storage.topic = connect-offsets | |
offset.storage.replication.factor = 1 | |
offset.storage.partitions=1 | |
status.storage.topic = connect-status | |
status.storage.replication.factor = 1 | |
status.storage.partitions=1 | |
''; | |
in | |
{ | |
apache-kafka-connect = { | |
description = "Apache Connect Distributed"; | |
script = '' | |
export KAFKA_HEAP_OPTS="-Xms128M -Xmx512M" | |
export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${logConfig}" | |
${kafka.package}/bin/connect-distributed.sh ${config} | |
''; | |
after = [ "apache-kafka.service" ]; | |
partOf = [ "apache-kafka.service" ]; | |
requires = [ "apache-kafka.service" ]; | |
wantedBy = [ "default.target" ]; | |
enable = true; | |
}; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ infra, kafka, zookeeperServers, extraServices }: | |
let generate = id: | |
let name = "kafka-${toString id}"; | |
in | |
{ name = name; | |
value = { config, pkgs, lib, ... }: | |
let ip = config.networking.privateIPv4; | |
in | |
{ services.apache-kafka.enable = true; | |
services.apache-kafka.brokerId = id; | |
services.apache-kafka.zookeeper = lib.concatStringsSep "," (map (x: x.name) zookeeperServers); | |
services.apache-kafka.hostname = ip; | |
services.apache-kafka.extraProperties = '' | |
listeners=PLAINTEXT://${ip}:9092 | |
zookeeper.connection.timeout.ms=600000 | |
zookeeper.session.timeout.ms=12000 | |
''; | |
services.apache-kafka.package = kafka; | |
# Use only 128 MB of memory | |
services.apache-kafka.jvmOptions = | |
[ "-server" | |
"-Xmx512M" | |
"-Xms128M" | |
"-XX:+UseCompressedOops" | |
"-XX:+UseParNewGC" | |
"-XX:+UseConcMarkSweepGC" | |
"-XX:+CMSClassUnloadingEnabled" | |
"-XX:+CMSScavengeBeforeRemark" | |
"-XX:+DisableExplicitGC" | |
"-Djava.awt.headless=true" | |
"-Djava.net.preferIPv4Stack=true" | |
]; | |
systemd.services = extraServices; | |
services.journald = { | |
rateLimitBurst = 0; | |
rateLimitInterval = "0"; | |
}; | |
networking.firewall = { | |
enable = true; | |
allowedTCPPorts = [2181 9092 2888 3888 514 8080]; | |
allowedUDPPorts = [2181 9092 2888 3888 514 8080]; | |
}; | |
services.syslog-ng = { | |
enable = true; | |
extraConfig = '' | |
source s_local { | |
system(); | |
internal(); | |
}; | |
destination d_syslog_tcp { | |
syslog("log-collector" transport("tcp") port(514)); | |
}; | |
log { | |
source(s_local); | |
destination(d_syslog_tcp); | |
}; | |
''; | |
}; | |
} // infra; | |
}; | |
in | |
{ | |
generate = generate; | |
server = generate 0; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ infra }: | |
{ config, pkgs, lib, ... }: | |
{ | |
services.syslog-ng = { | |
enable = true; | |
extraConfig = '' | |
options { | |
time-reap(30); | |
mark-freq(10); | |
keep-hostname(yes); | |
}; | |
source s_local { | |
system(); | |
internal(); | |
}; | |
source s_network { | |
syslog(transport(tcp) port(514)); | |
}; | |
destination d_local { | |
file("/var/log/syslog-ng/messages_$${HOST}"); | |
}; | |
destination d_logs { | |
file( | |
"/var/log/syslog-ng/logs.txt" | |
owner("root") | |
group("root") | |
perm(0777) | |
); | |
}; | |
log { | |
source(s_local); | |
source(s_network); | |
destination(d_logs); | |
}; | |
''; | |
}; | |
networking.firewall = { | |
enable = true; | |
allowedTCPPorts = [514]; | |
allowedUDPPorts = [514]; | |
}; | |
} // infra |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
let source = | |
rec { | |
owner = "NixOS"; | |
repo = "nixpkgs-channels"; | |
rev = "0a7e258012b60cbe530a756f09a4f2516786d370"; | |
sha256 = "1qcnxkqkw7bffyc17mqifcwjfqwbvn0vs0xgxnjvh9w0ssl2s036"; | |
name = "nixpkgs${rev}"; | |
}; | |
fetch = (import <nixpkgs> {}).fetchFromGitHub; | |
in | |
import (fetch (source)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ infra, zookeeperServers }: | |
let generate = id: | |
{ name = "zookeeper-${toString id}"; | |
value = { config, pkgs, lib, ... }: | |
{ | |
services.nginx.enable = true; | |
services.zookeeper.dataDir = "/data"; | |
services.zookeeper.enable = true; | |
services.zookeeper.id = id; | |
services.zookeeper.logging = | |
'' | |
zookeeper.root.logger=INFO, CONSOLE | |
log4j.rootLogger=INFO, CONSOLE | |
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender | |
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout | |
log4j.appender.CONSOLE.layout.ConversionPattern=[myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n | |
''; | |
services.zookeeper.servers = | |
let toLine = n: x: "server.${toString (builtins.sub n 1)}=${x.name}:2888:3888\n"; | |
in lib.concatImapStrings toLine zookeeperServers; | |
networking.firewall = { | |
enable = true; | |
allowedTCPPorts = [2181 9092 2888 3888 514]; | |
allowedUDPPorts = [2181 9092 2888 3888 514]; | |
}; | |
services.syslog-ng = { | |
enable = true; | |
extraConfig = '' | |
source s_local { | |
system(); | |
internal(); | |
}; | |
destination d_syslog_tcp { | |
syslog("log-collector" transport("tcp") port(514)); | |
}; | |
log { | |
source(s_local); | |
destination(d_syslog_tcp); | |
}; | |
''; | |
}; | |
} // infra; | |
}; | |
in { | |
generate = generate; | |
server = generate 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment