-
-
Save fjavieralba/7930018 to your computer and use it in GitHub Desktop.
import java.io.IOException; | |
import java.util.Properties; | |
import kafka.server.KafkaConfig; | |
import kafka.server.KafkaServerStartable; | |
public class KafkaLocal { | |
public KafkaServerStartable kafka; | |
public ZooKeeperLocal zookeeper; | |
public KafkaLocal(Properties kafkaProperties, Properties zkProperties) throws IOException, InterruptedException{ | |
KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties); | |
//start local zookeeper | |
System.out.println("starting local zookeeper..."); | |
zookeeper = new ZooKeeperLocal(zkProperties); | |
System.out.println("done"); | |
//start local kafka broker | |
kafka = new KafkaServerStartable(kafkaConfig); | |
System.out.println("starting local kafka broker..."); | |
kafka.startup(); | |
System.out.println("done"); | |
} | |
public void stop(){ | |
//stop kafka broker | |
System.out.println("stopping kafka..."); | |
kafka.shutdown(); | |
System.out.println("done"); | |
} | |
} |
public class MyTest { | |
static KafkaLocal kafka; | |
@BeforeClass | |
public static void startKafka(){ | |
Properties kafkaProperties = new Properties(); | |
Properties zkProperties = new Properties(); | |
try { | |
//load properties | |
kafkaProperties.load(Class.class.getResourceAsStream("/kafkalocal.properties")); | |
zkProperties.load(Class.class.getResourceAsStream("/zklocal.properties")); | |
//start kafka | |
kafka = new KafkaLocal(kafkaProperties, zkProperties); | |
Thread.sleep(5000); | |
} catch (Exception e){ | |
e.printStackTrace(System.out); | |
fail("Error running local Kafka broker"); | |
e.printStackTrace(System.out); | |
} | |
//do other things | |
} | |
@Test | |
public void testSomething() { | |
} | |
} |
import java.io.FileNotFoundException; | |
import java.io.IOException; | |
import java.util.Properties; | |
import org.apache.zookeeper.server.ServerConfig; | |
import org.apache.zookeeper.server.ZooKeeperServerMain; | |
import org.apache.zookeeper.server.quorum.QuorumPeerConfig; | |
public class ZooKeeperLocal { | |
ZooKeeperServerMain zooKeeperServer; | |
public ZooKeeperLocal(Properties zkProperties) throws FileNotFoundException, IOException{ | |
QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig(); | |
try { | |
quorumConfiguration.parseProperties(zkProperties); | |
} catch(Exception e) { | |
throw new RuntimeException(e); | |
} | |
zooKeeperServer = new ZooKeeperServerMain(); | |
final ServerConfig configuration = new ServerConfig(); | |
configuration.readFrom(quorumConfiguration); | |
new Thread() { | |
public void run() { | |
try { | |
zooKeeperServer.runFromConfig(configuration); | |
} catch (IOException e) { | |
System.out.println("ZooKeeper Failed"); | |
e.printStackTrace(System.err); | |
} | |
} | |
}.start(); | |
} | |
} |
Hey, this works like a charm. Thank you for sharing!
Just a quick question, do you know why in the test it needs to wait for 5 seconds after kafka was created? I tried to run the code without the 5 second wait, and my program just ended straight away after kafka instance is created. But with the wait, I could then actually use the connection. It just seems strange to me.
EDIT:
Please discard my question. The Kafka server crashed due to some conflict in the existing zookeeper data folder. For my purpose, I removed the entire zookeeper folder everytime I run the program, and the problem no longer appears.
- has anyone tested w/ kafka 0.11.x? any reason not to? trying it out now...
- what version/artifact of zookeeper are folks using?
thx!
tagging on the properties files would be super helpful too if someone has 'em
created sample project based on this sample gist
https://github.com/raarunmozhi/BigDataTemplate/blob/master/localcluster/src/test/java/com/bigdata/localcluster/TestKafkaLocalServer.java#L75
If anybody tried to run kafka localy (embedded mode), please share the properties file.
If you want to see a complete implementation of this, apache-flume uses it in some of their tests.
e..g flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaLocal.java and TestUtil wrapping it.
How can it use the Kafka scala library in java code ?
I believe by default kafka auto creates topics...
http://kafka.apache.org/documentation.html#brokerconfigs