Skip to content

Instantly share code, notes, and snippets.

@RocketPuppy
Last active October 7, 2018 18:20
Show Gist options
  • Save RocketPuppy/018fc5f60b05e557de0c37e0749232a5 to your computer and use it in GitHub Desktop.
Save RocketPuppy/018fc5f60b05e557de0c37e0749232a5 to your computer and use it in GitHub Desktop.
Kafka Connect Setup
# Create and deploy the test cluster, may take a while
nixops create -d kafka-test cluster.nix
nixops deploy -d kafka-test --force-reboot
# Check up on kafka-connect
nixops ssh -d kafka-test kafka-0
> journalctl -u apache-kafka-connect.service
> systemctl start apache-kafka-connect.service
# Tear it down
nixops stop -d kafka-test
nixops destroy -d kafka-test
nixops delete -d kafka-test
let infra = import ./infrastructure-vbox.nix;
kafkaInfra = infra.kafka;
zookeeperInfra = infra.zookeeper;
logInfra = infra.log-collector;
nixpkgs = import ./nixpkgs.nix {};
kafka-connect = import ./kafka-connect.nix;
makeKafkaServer = (import ./kafka.nix {
infra = kafkaInfra;
zookeeperServers = zookeeperServers;
kafka = nixpkgs.apacheKafka;
extraServices = (kafka-connect {
kafka = {
servers = builtins.map ({ name, ... }: { ip = name; port = 9092; }) kafkaServers;
package = nixpkgs.apacheKafka;
};
});
}).generate;
makeZookeeperServer = (import ./zookeeper.nix { infra = zookeeperInfra; zookeeperServers = zookeeperServers; }).generate;
zookeeperServers = builtins.genList makeZookeeperServer 1;
kafkaServers = builtins.genList makeKafkaServer 1;
logCollector = import ./log-collector.nix { infra = logInfra; };
in { network.description = "End-to-end setup"; log-collector = logCollector; }
// builtins.listToAttrs zookeeperServers
// builtins.listToAttrs kafkaServers
{
kafka =
{ deployment.targetEnv = "virtualbox";
deployment.virtualbox.memorySize = 1024;
};
zookeeper =
{ deployment.targetEnv = "virtualbox";
deployment.virtualbox.memorySize = 256;
};
log-collector =
{ deployment.targetEnv = "virtualbox";
deployment.virtualbox.memorySize = 128;
};
}
{ kafka }:
let bootstrapServers = builtins.concatStringsSep "," (builtins.map ({ip, port }: "${ip}:${builtins.toString port}") kafka.servers);
logConfig = builtins.toFile "log4j.properties" ''
log4j.rootLogger=DEBUG, stderr
log4j.appender.stderr=org.apache.log4j.ConsoleAppender
log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
log4j.appender.stderr.layout.ConversionPattern=[KAFKA-CONNECT] [%d] %p %m (%c)%n
log4j.appender.stderr.Target=System.err
'';
config = builtins.toFile "distributed-config.properties" ''
bootstrap.servers = ${bootstrapServers}
key.converter = org.apache.kafka.connect.json.JsonConverter
value.converter = org.apache.kafka.connect.json.JsonConverter
group.id = connect-cluster
rest.port = 8080
config.storage.topic = connect-config
config.storage.replication.factor = 1
offset.storage.topic = connect-offsets
offset.storage.replication.factor = 1
offset.storage.partitions=1
status.storage.topic = connect-status
status.storage.replication.factor = 1
status.storage.partitions=1
'';
in
{
apache-kafka-connect = {
description = "Apache Connect Distributed";
script = ''
export KAFKA_HEAP_OPTS="-Xms128M -Xmx512M"
export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${logConfig}"
${kafka.package}/bin/connect-distributed.sh ${config}
'';
after = [ "apache-kafka.service" ];
partOf = [ "apache-kafka.service" ];
requires = [ "apache-kafka.service" ];
wantedBy = [ "default.target" ];
enable = true;
};
}
{ infra, kafka, zookeeperServers, extraServices }:
let generate = id:
let name = "kafka-${toString id}";
in
{ name = name;
value = { config, pkgs, lib, ... }:
let ip = config.networking.privateIPv4;
in
{ services.apache-kafka.enable = true;
services.apache-kafka.brokerId = id;
services.apache-kafka.zookeeper = lib.concatStringsSep "," (map (x: x.name) zookeeperServers);
services.apache-kafka.hostname = ip;
services.apache-kafka.extraProperties = ''
listeners=PLAINTEXT://${ip}:9092
zookeeper.connection.timeout.ms=600000
zookeeper.session.timeout.ms=12000
'';
services.apache-kafka.package = kafka;
# Use only 128 MB of memory
services.apache-kafka.jvmOptions =
[ "-server"
"-Xmx512M"
"-Xms128M"
"-XX:+UseCompressedOops"
"-XX:+UseParNewGC"
"-XX:+UseConcMarkSweepGC"
"-XX:+CMSClassUnloadingEnabled"
"-XX:+CMSScavengeBeforeRemark"
"-XX:+DisableExplicitGC"
"-Djava.awt.headless=true"
"-Djava.net.preferIPv4Stack=true"
];
systemd.services = extraServices;
services.journald = {
rateLimitBurst = 0;
rateLimitInterval = "0";
};
networking.firewall = {
enable = true;
allowedTCPPorts = [2181 9092 2888 3888 514 8080];
allowedUDPPorts = [2181 9092 2888 3888 514 8080];
};
services.syslog-ng = {
enable = true;
extraConfig = ''
source s_local {
system();
internal();
};
destination d_syslog_tcp {
syslog("log-collector" transport("tcp") port(514));
};
log {
source(s_local);
destination(d_syslog_tcp);
};
'';
};
} // infra;
};
in
{
generate = generate;
server = generate 0;
}
{ infra }:
{ config, pkgs, lib, ... }:
{
services.syslog-ng = {
enable = true;
extraConfig = ''
options {
time-reap(30);
mark-freq(10);
keep-hostname(yes);
};
source s_local {
system();
internal();
};
source s_network {
syslog(transport(tcp) port(514));
};
destination d_local {
file("/var/log/syslog-ng/messages_$${HOST}");
};
destination d_logs {
file(
"/var/log/syslog-ng/logs.txt"
owner("root")
group("root")
perm(0777)
);
};
log {
source(s_local);
source(s_network);
destination(d_logs);
};
'';
};
networking.firewall = {
enable = true;
allowedTCPPorts = [514];
allowedUDPPorts = [514];
};
} // infra
let source =
rec {
owner = "NixOS";
repo = "nixpkgs-channels";
rev = "0a7e258012b60cbe530a756f09a4f2516786d370";
sha256 = "1qcnxkqkw7bffyc17mqifcwjfqwbvn0vs0xgxnjvh9w0ssl2s036";
name = "nixpkgs${rev}";
};
fetch = (import <nixpkgs> {}).fetchFromGitHub;
in
import (fetch (source))
{ infra, zookeeperServers }:
let generate = id:
{ name = "zookeeper-${toString id}";
value = { config, pkgs, lib, ... }:
{
services.nginx.enable = true;
services.zookeeper.dataDir = "/data";
services.zookeeper.enable = true;
services.zookeeper.id = id;
services.zookeeper.logging =
''
zookeeper.root.logger=INFO, CONSOLE
log4j.rootLogger=INFO, CONSOLE
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=[myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n
'';
services.zookeeper.servers =
let toLine = n: x: "server.${toString (builtins.sub n 1)}=${x.name}:2888:3888\n";
in lib.concatImapStrings toLine zookeeperServers;
networking.firewall = {
enable = true;
allowedTCPPorts = [2181 9092 2888 3888 514];
allowedUDPPorts = [2181 9092 2888 3888 514];
};
services.syslog-ng = {
enable = true;
extraConfig = ''
source s_local {
system();
internal();
};
destination d_syslog_tcp {
syslog("log-collector" transport("tcp") port(514));
};
log {
source(s_local);
destination(d_syslog_tcp);
};
'';
};
} // infra;
};
in {
generate = generate;
server = generate 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment