Created
April 16, 2015 07:54
-
-
Save rmetzger/674dd277c66f5c1486a6 to your computer and use it in GitHub Desktop.
Test Apache Kafka. I'm planning to merge this to Flink at a later point
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Test | |
public void testKafkaWithoutFlink() { | |
// start consumer: | |
new Thread(new Runnable() { | |
@Override | |
public void run() { | |
LOG.info("Starting consumer"); | |
// consume from "testtopic" | |
Properties cprops = new Properties(); | |
cprops.put("zookeeper.connect", zookeeperConnectionString); | |
cprops.put("group.id", "mygroupid"); | |
cprops.put("zookeeper.session.timeout.ms", "400"); | |
cprops.put("zookeeper.sync.time.ms", "200"); | |
cprops.put("auto.commit.interval.ms", "1000"); | |
ConsumerConnector jcc = Consumer.createJavaConsumerConnector(new ConsumerConfig(cprops)); | |
Map<String,Integer> tcm = new HashMap<String, Integer>(1); | |
tcm.put("testtopic", 1); | |
Map<String, List<KafkaStream<byte[], byte[]>>> ms = jcc.createMessageStreams(tcm); | |
LOG.info("message streams "+ms); | |
Assert.assertEquals(1, ms.size()); | |
List<KafkaStream<byte[], byte[]>> strList = ms.get("testtopic"); | |
Assert.assertEquals(1, strList.size()); | |
KafkaStream<byte[], byte[]> str = strList.get(0); | |
LOG.info("str = "+str); | |
ConsumerIterator<byte[], byte[]> it = str.iterator(); | |
while(it.hasNext()) { | |
MessageAndMetadata<byte[], byte[]> msg = it.next(); | |
LOG.info("msg = "+msg); | |
} | |
LOG.info("Finished consumer"); | |
} | |
}).start(); | |
// start producer: | |
Properties props = new Properties(); | |
String brks = ""; | |
for(KafkaServer broker : brokers) { | |
SocketServer srv = broker.socketServer(); | |
String host = "localhost"; | |
if(srv.host() != null) { | |
host = srv.host(); | |
} | |
brks += host+":"+broker.socketServer().port()+","; | |
} | |
LOG.info("Using broker list "+brks); | |
props.setProperty("metadata.broker.list", brks); | |
props.put("serializer.class", StringEncoder.class.getCanonicalName()); | |
ProducerConfig config = new ProducerConfig(props); | |
Producer p = new Producer<String, String>(config); | |
for(int i = 0; i < 10; i++) { | |
p.send(new KeyedMessage<String, String>("testtopic", "What a message")); | |
} | |
LOG.info("+++++++++++ FINISHED PRODUCING ++++++++++++++++ "); | |
try { | |
Thread.sleep(1000000); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment