Last active
June 6, 2016 02:02
-
-
Save cjmamo/7840143 to your computer and use it in GitHub Desktop.
Apache Kafka for Event Sourcing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
... | |
public class MyConsumer { | |
private static final int SO_TIMEOUT = 100000; // socket timeout | |
private static final int BUFFER_SIZE = 64 * 1024; // maximum socket receive buffer in bytes | |
private static final int FETCH_SIZE = 100000; // maximum bytes to fetch from topic | |
public static void main(String args[]) { | |
MyConsumer myConsumer = new MyConsumer(); | |
String topic = "ossandme"; // topic from which to consume messages | |
String address = "localhost"; // broker address | |
int port = 9092; // broker port | |
int partition = 0; // partition that has the messages the application is interested in | |
int offset = 0; // from which point in the topic the consumer should start reading messages. | |
// 0 means that the first published message is read and all subsequent ones. | |
// keep replaying all messages from the partition until myConsumer.consume(...) returns true | |
while (!myConsumer.consume(offset, address, port, topic, partition)) {}; | |
} | |
private boolean consume(int offset, String address, int port, String topic, int partition) { | |
try { | |
String consumerGroup = "Client_" + topic + "_" + partition; | |
SimpleConsumer consumer = new SimpleConsumer(address, port, SO_TIMEOUT, BUFFER_SIZE, consumerGroup); | |
FetchRequest req = new FetchRequestBuilder().clientId(consumerGroup). | |
addFetch(topic, partition, offset, FETCH_SIZE).build(); | |
FetchResponse fetchResponse = consumer.fetch(req); // fetch messages from broker | |
// iterate over all messages fetched from the topic | |
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) { | |
ByteBuffer payload = messageAndOffset.message().payload(); | |
byte[] bytes = new byte[payload.limit()]; | |
payload.get(bytes); | |
// do something useful | |
... | |
// print message offset and its contents | |
System.out.println("Offset = " + String.valueOf(messageAndOffset.offset()) + ", Time = " + new String(bytes, "UTF-8")); | |
} | |
if (consumer != null) consumer.close(); | |
} catch (Exception e) { | |
// rollback state | |
... | |
return false; | |
} | |
return true; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
... | |
public class MyProducer { | |
public static void main(String[] args) { | |
// producer configuration | |
Properties props = new Properties(); | |
props.put("metadata.broker.list", "localhost:9092"); // broker address | |
props.put("serializer.class", "kafka.serializer.StringEncoder"); | |
// create producer | |
ProducerConfig config = new ProducerConfig(props); | |
Producer<String, String> producer = new Producer<String, String>(config); | |
for (int cnt = 0; cnt < 100; cnt++) { | |
long time = new Date().getTime(); // get current time | |
String msg = String.valueOf(time); | |
KeyedMessage<String, String> data = new KeyedMessage<String, String>("ossandme", "0", msg); | |
producer.send(data); // dispatch message to broker | |
} | |
System.out.println("Messages published!!"); | |
producer.close(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment