Skip to content

Instantly share code, notes, and snippets.

@HaVonTe1
Created April 25, 2016 11:04
Show Gist options
  • Save HaVonTe1/80774930f179ace9b5fc545578243c33 to your computer and use it in GitHub Desktop.
Save HaVonTe1/80774930f179ace9b5fc545578243c33 to your computer and use it in GitHub Desktop.
Kafka
import kafka.admin.AdminUtils;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.server.KafkaServerStartable;
import kafka.utils.MockTime;
import kafka.utils.TestUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.apache.curator.test.TestingServer;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.springframework.util.SocketUtils;
import scala.Option;
import java.io.IOException;
import java.util.Properties;
public class KafkaLocal {
private final KafkaServer kafkaServer;
private final TestingServer zkServer;
public KafkaLocal() throws Exception {
zkServer = new TestingServer(SocketUtils.findAvailableTcpPort());
Properties brokerProps = new Properties();
brokerProps.put("enable.zookeeper", "true");
brokerProps.put("broker.id", "1");
final String zkconn = "localhost:" + zkServer.getPort();
brokerProps.put("zk.connect", zkconn);
brokerProps.put("zookeeper.connect", zkconn);
brokerProps.put("port", SocketUtils.findAvailableTcpPort());
brokerProps.setProperty("num.partitions", "10");
KafkaConfig config = new KafkaConfig(brokerProps);
kafkaServer = new kafka.server.KafkaServer(config, new MockTime(), Option.apply("test"));
kafkaServer.startup();
}
public int getKafkaPort() {
return kafkaServer.config().port();
}
public void stop() throws IOException {
kafkaServer.shutdown();
zkServer.stop();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment