Skip to content

Instantly share code, notes, and snippets.

@rmetzger
Created April 16, 2015 07:54
Show Gist options
  • Save rmetzger/674dd277c66f5c1486a6 to your computer and use it in GitHub Desktop.
Save rmetzger/674dd277c66f5c1486a6 to your computer and use it in GitHub Desktop.
Test Apache Kafka. I'm planning to merge this to Flink at a later point
@Test
public void testKafkaWithoutFlink() {
// start consumer:
new Thread(new Runnable() {
@Override
public void run() {
LOG.info("Starting consumer");
// consume from "testtopic"
Properties cprops = new Properties();
cprops.put("zookeeper.connect", zookeeperConnectionString);
cprops.put("group.id", "mygroupid");
cprops.put("zookeeper.session.timeout.ms", "400");
cprops.put("zookeeper.sync.time.ms", "200");
cprops.put("auto.commit.interval.ms", "1000");
ConsumerConnector jcc = Consumer.createJavaConsumerConnector(new ConsumerConfig(cprops));
Map<String,Integer> tcm = new HashMap<String, Integer>(1);
tcm.put("testtopic", 1);
Map<String, List<KafkaStream<byte[], byte[]>>> ms = jcc.createMessageStreams(tcm);
LOG.info("message streams "+ms);
Assert.assertEquals(1, ms.size());
List<KafkaStream<byte[], byte[]>> strList = ms.get("testtopic");
Assert.assertEquals(1, strList.size());
KafkaStream<byte[], byte[]> str = strList.get(0);
LOG.info("str = "+str);
ConsumerIterator<byte[], byte[]> it = str.iterator();
while(it.hasNext()) {
MessageAndMetadata<byte[], byte[]> msg = it.next();
LOG.info("msg = "+msg);
}
LOG.info("Finished consumer");
}
}).start();
// start producer:
Properties props = new Properties();
String brks = "";
for(KafkaServer broker : brokers) {
SocketServer srv = broker.socketServer();
String host = "localhost";
if(srv.host() != null) {
host = srv.host();
}
brks += host+":"+broker.socketServer().port()+",";
}
LOG.info("Using broker list "+brks);
props.setProperty("metadata.broker.list", brks);
props.put("serializer.class", StringEncoder.class.getCanonicalName());
ProducerConfig config = new ProducerConfig(props);
Producer p = new Producer<String, String>(config);
for(int i = 0; i < 10; i++) {
p.send(new KeyedMessage<String, String>("testtopic", "What a message"));
}
LOG.info("+++++++++++ FINISHED PRODUCING ++++++++++++++++ ");
try {
Thread.sleep(1000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment