Created
July 25, 2013 16:52
-
-
Save devoncrouse/6081648 to your computer and use it in GitHub Desktop.
Simple class to consume from a Kafka topic
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.List; | |
import java.nio.ByteBuffer; | |
import java.io.IOException; | |
import kafka.consumer.Consumer; | |
import kafka.consumer.ConsumerConfig; | |
import kafka.consumer.ConsumerIterator; | |
import kafka.consumer.KafkaStream; | |
import kafka.javaapi.consumer.ConsumerConnector; | |
import kafka.message.Message; | |
import kafka.message.MessageAndMetadata; | |
import org.apache.log4j.Logger; | |
import com.google.common.collect.ImmutableMap; | |
import org.codehaus.jettison.json.JSONArray; | |
import org.codehaus.jettison.json.JSONException; | |
import org.codehaus.jettison.json.JSONObject; | |
public class MessageConsumer implements Runnable { | |
private static final Logger logger = Logger.getLogger(MessageConsumer.class); | |
private static final int allowedKafkaErrors = 10; | |
private int kafkaErrors = 0; | |
private Properties conf; | |
/** | |
* | |
* @param conf | |
* @param loader | |
*/ | |
MessageConsumer(final Properties conf) { | |
this.conf = conf; | |
} | |
@Override | |
public void run() { | |
final List<KafkaStream<Message>> streams = connectToQueue(conf); | |
ExecutorService executor = Executors.newFixedThreadPool(streams.size()); | |
// Execute a consumer thread for each Kafka message stream | |
// (representing a broker/partition) | |
for (final KafkaStream<Message> stream : streams) { | |
executor.submit(new Runnable() { | |
public void run() { | |
try { | |
consumeTopic(stream.iterator()); | |
} catch (Exception e) { | |
shutdown.setShutdown(true); | |
logger.fatal("Caught unexpected exception", e); | |
System.exit(-1); | |
} | |
} | |
}); | |
} | |
} | |
/** | |
* Start consuming from Kafka | |
* | |
* @param ConsumerIterator<Message> from Kafka message stream | |
*/ | |
private void consumeTopic(ConsumerIterator<Message> kafkaIter) throws IOException, InterruptedException { | |
while (!shutdown.isShutdown()) { | |
MessageAndMetadata<Message> rawMsg; | |
try { | |
if (!kafkaIter.hasNext()) { // Generally doesn't happen since hasNext() blocks | |
return; | |
} | |
rawMsg = kafkaIter.next(); | |
kafkaErrors = 0; | |
} catch (RuntimeException e) { | |
logger.error("Unable to pull message from Kafka", e); | |
kafkaErrors++; | |
if (kafkaErrors >= allowedKafkaErrors) { | |
logger | |
.fatal("Too many consecutive kafka errors detected. Shutting down"); | |
shutdown.setShutdown(true); | |
} | |
return; | |
} | |
// Here's the consumed message from Kafka | |
String msg = convertPayloadToString(rawMsg); | |
try { | |
// Here's your result object | |
JSONObject obj = new JSONObject(msg); | |
} catch (JSONException e) { | |
logger.error("Invalid JSON object", e); | |
} | |
} | |
} | |
/** | |
* Convert the consumed message payload to a String | |
* | |
* @param MessageAndMetadata<Message> from Kafka consumer | |
* @return String representation of the message payload | |
*/ | |
private String convertPayloadToString(final MessageAndMetadata<Message> msgAndMetadata) { | |
Message rawMsg = msgAndMetadata.message(); | |
ByteBuffer buf = rawMsg.payload(); | |
byte[] dst = new byte[buf.limit()]; | |
buf.get(dst); | |
String str = new String(dst); | |
return str; | |
} | |
/** | |
* Create our Kafka connection. | |
* | |
* @param Properties object with Kafka/Zookeeper options | |
* @return Collection of Kafka message streams | |
*/ | |
private List<KafkaStream<Message>> connectToQueue(final Properties conf) { | |
String topic = conf.getProperty("kafka.topic", "MessageHeadersBody"); | |
int streamCount = Integer.parseInt(conf.getProperty("kafka.streams", "3")); | |
// Create the connection to the cluster | |
ConsumerConfig consumerConfig = new ConsumerConfig(conf); | |
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); | |
// Create multiple partitions of the stream for multiple consumer threads | |
Map<String, List<KafkaStream<Message>>> topicMessageStreams = | |
consumerConnector.createMessageStreams(ImmutableMap.of(topic, streamCount)); | |
List<KafkaStream<Message>> streams = topicMessageStreams.get(topic); | |
return streams; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment