Skip to content

Instantly share code, notes, and snippets.

@adyliu
Created June 25, 2014 10:03
Show Gist options
  • Save adyliu/5674c683ba2737e136d8 to your computer and use it in GitHub Desktop.
Save adyliu/5674c683ba2737e136d8 to your computer and use it in GitHub Desktop.
Simple Producers for jafka
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.shijiebang.xpower.configcenter.ConfigCenter;
import com.sohu.jafka.consumer.Consumer;
import com.sohu.jafka.consumer.ConsumerConfig;
import com.sohu.jafka.consumer.ConsumerConnector;
import com.sohu.jafka.consumer.MessageStream;
import com.sohu.jafka.producer.serializer.StringDecoder;
import com.sohu.jafka.utils.Closer;
import com.sohu.jafka.utils.ImmutableMap;
/**
* @author adyliu ([email protected])
* @since 2012-11-27
*/
public class Consumers implements Closeable{
private ExecutorService executor;
private final ConsumerConnector connector;
private Consumers(Properties props, final String topic, final int threads, final IMessageListener<String> listener) {
ConsumerConfig consumerConfig = new ConsumerConfig(props);
connector = Consumer.create(consumerConfig);
Map<String, List<MessageStream<String>>> topicMessageStreams = connector.createMessageStreams(//
ImmutableMap.of(topic, threads), new StringDecoder());
List<MessageStream<String>> streams = topicMessageStreams.get(topic);
//
executor = Executors.newFixedThreadPool(threads);
for (final MessageStream<String> stream : streams) {
executor.submit(new Runnable() {
public void run() {
for (String message : stream) {
listener.onMessage(message);
}
}
});
}
//
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
close();
}
});
}
@Override
public void close() {
if(executor != null) {
executor.shutdown();
Closer.closeQuietly(connector);
executor = null;
}
}
public static Consumers buildConsumer(final String topic,//
final String groupId,//
final IMessageListener<String> listener,//
final int threads, //
Properties props) {
if (props == null || props.isEmpty()) {
props = new Properties();
}
if (!props.containsKey("zk.connect")) {
props.put("zk.connect", ConfigCenter.getFullConnection()+ "/xpower/jafka");
}
props.put("groupid", groupId);
return new Consumers(props, topic, threads, listener);
}
public static Consumers buildConsumer(final String topic,//
final String groupId,//
final IMessageListener<String> listener, //
final int threads) {
return buildConsumer(topic, groupId, listener, threads, null);
}
/**
* create a consumer
*
* @param topic the topic to be watched
* @param groupId grouping the consumer clients
* @param listener message listener
* @return the real consumer
*/
public static Consumers buildConsumer(final String topic,//
final String groupId, //
final IMessageListener<String> listener) {
return buildConsumer(topic, groupId, listener, 2);
}
}
//send message
StringMessage message = ...
Producers.buildProducer().send(message);
// consume message
// class UserMessageListener implements IMessageListener<String>
Consumers.buildConsumer("users", "auditor", new UserMessageListener())
/**
* @author adyliu ([email protected])
* @since 2012-11-27
*/
public interface IMessageListener<T> {
/**
* 消息处理
* <p>
* 此方法不应抛出异常,否则导致消息接受中断,从而无法继续接受新的消息
* </p>
* @param message 消息内容
*/
void onMessage(T message);
}
import java.util.Properties;
import com.sohu.jafka.producer.Producer;
import com.sohu.jafka.producer.ProducerConfig;
import com.sohu.jafka.producer.serializer.StringEncoder;
public class Producers {
private final String ZK_HOSTS = "192.168.6.22:2181";
private final Producer<String, String> producer;
private Producers(Properties props) {
if (!props.containsKey("zk.connect") && !props.containsKey("broker.list")) {
props.put("zk.connect", ZK_HOSTS+"/xpower/jafka");
}
if (!props.containsKey("serializer.class")) {
props.put("serializer.class", StringEncoder.class.getName());
}
ProducerConfig config = new ProducerConfig(props);
producer = new com.sohu.jafka.producer.Producer<String, String>(config);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
producer.close();
}
});
}
public Producer<String, String> getProducer(){
return this.producer;
}
/**
* send the message to message-queue
* @param message message
*/
public void send(StringMessage message) {
producer.send(message);
}
private static volatile Producers instance = null;
/**
* 创建一个生产者
* <p>
* 由于每一个生产者都需要和服务器端建立连接,因此,默认情况下应该维持此实例为单实例,减少连接数
* </p>
* @param props 扩展参数
* @return 带有特定参数的生产者
*/
public static Producers buildProducer(Properties props) {
if(props == null || props.isEmpty()) {
if(instance == null) {
synchronized (Producers.class) {
if(instance == null) {
instance = new Producers(new Properties());
}
}
}
return instance;
}
return new Producers(props);
}
/**
* 创建一个生产者
* <p>
* 由于每一个生产者都需要和服务器端建立连接,因此,默认情况下应该维持此实例为单实例,减少连接数
* </p>
* @param props 扩展参数
* @return 带有特定参数的生产者
*/
public static Producers buildProducer() {
return buildProducer(null);
}
//
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment