Skip to content

Instantly share code, notes, and snippets.

@fjavieralba
Last active March 23, 2021 09:57
Show Gist options
  • Save fjavieralba/7930018 to your computer and use it in GitHub Desktop.
Save fjavieralba/7930018 to your computer and use it in GitHub Desktop.
Embedding Kafka+Zookeeper for testing purposes. Tested with Apache Kafka 0.8
import java.io.IOException;
import java.util.Properties;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
public class KafkaLocal {
public KafkaServerStartable kafka;
public ZooKeeperLocal zookeeper;
public KafkaLocal(Properties kafkaProperties, Properties zkProperties) throws IOException, InterruptedException{
KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
//start local zookeeper
System.out.println("starting local zookeeper...");
zookeeper = new ZooKeeperLocal(zkProperties);
System.out.println("done");
//start local kafka broker
kafka = new KafkaServerStartable(kafkaConfig);
System.out.println("starting local kafka broker...");
kafka.startup();
System.out.println("done");
}
public void stop(){
//stop kafka broker
System.out.println("stopping kafka...");
kafka.shutdown();
System.out.println("done");
}
}
public class MyTest {
static KafkaLocal kafka;
@BeforeClass
public static void startKafka(){
Properties kafkaProperties = new Properties();
Properties zkProperties = new Properties();
try {
//load properties
kafkaProperties.load(Class.class.getResourceAsStream("/kafkalocal.properties"));
zkProperties.load(Class.class.getResourceAsStream("/zklocal.properties"));
//start kafka
kafka = new KafkaLocal(kafkaProperties, zkProperties);
Thread.sleep(5000);
} catch (Exception e){
e.printStackTrace(System.out);
fail("Error running local Kafka broker");
e.printStackTrace(System.out);
}
//do other things
}
@Test
public void testSomething() {
}
}
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Properties;
import org.apache.zookeeper.server.ServerConfig;
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
public class ZooKeeperLocal {
ZooKeeperServerMain zooKeeperServer;
public ZooKeeperLocal(Properties zkProperties) throws FileNotFoundException, IOException{
QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig();
try {
quorumConfiguration.parseProperties(zkProperties);
} catch(Exception e) {
throw new RuntimeException(e);
}
zooKeeperServer = new ZooKeeperServerMain();
final ServerConfig configuration = new ServerConfig();
configuration.readFrom(quorumConfiguration);
new Thread() {
public void run() {
try {
zooKeeperServer.runFromConfig(configuration);
} catch (IOException e) {
System.out.println("ZooKeeper Failed");
e.printStackTrace(System.err);
}
}
}.start();
}
}
@digizeph
Copy link

digizeph commented Jun 12, 2017

Hey, this works like a charm. Thank you for sharing!

Just a quick question, do you know why in the test it needs to wait for 5 seconds after kafka was created? I tried to run the code without the 5 second wait, and my program just ended straight away after kafka instance is created. But with the wait, I could then actually use the connection. It just seems strange to me.

EDIT:
Please discard my question. The Kafka server crashed due to some conflict in the existing zookeeper data folder. For my purpose, I removed the entire zookeeper folder everytime I run the program, and the problem no longer appears.

@cdaringe
Copy link

cdaringe commented Jul 6, 2017

  • has anyone tested w/ kafka 0.11.x? any reason not to? trying it out now...
  • what version/artifact of zookeeper are folks using?

thx!

@cdaringe
Copy link

cdaringe commented Jul 6, 2017

tagging on the properties files would be super helpful too if someone has 'em

@PrashantSabnekar
Copy link

If anybody tried to run kafka localy (embedded mode), please share the properties file.

@jamesrgrinter
Copy link

If you want to see a complete implementation of this, apache-flume uses it in some of their tests.

e..g flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaLocal.java and TestUtil wrapping it.

@sahlone
Copy link

sahlone commented Aug 15, 2018

How can it use the Kafka scala library in java code ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment