Created
June 25, 2014 10:03
-
-
Save adyliu/5674c683ba2737e136d8 to your computer and use it in GitHub Desktop.
Simple Producers for jafka
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.Closeable; | |
import java.io.IOException; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Properties; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import com.shijiebang.xpower.configcenter.ConfigCenter; | |
import com.sohu.jafka.consumer.Consumer; | |
import com.sohu.jafka.consumer.ConsumerConfig; | |
import com.sohu.jafka.consumer.ConsumerConnector; | |
import com.sohu.jafka.consumer.MessageStream; | |
import com.sohu.jafka.producer.serializer.StringDecoder; | |
import com.sohu.jafka.utils.Closer; | |
import com.sohu.jafka.utils.ImmutableMap; | |
/** | |
* @author adyliu ([email protected]) | |
* @since 2012-11-27 | |
*/ | |
public class Consumers implements Closeable{ | |
private ExecutorService executor; | |
private final ConsumerConnector connector; | |
private Consumers(Properties props, final String topic, final int threads, final IMessageListener<String> listener) { | |
ConsumerConfig consumerConfig = new ConsumerConfig(props); | |
connector = Consumer.create(consumerConfig); | |
Map<String, List<MessageStream<String>>> topicMessageStreams = connector.createMessageStreams(// | |
ImmutableMap.of(topic, threads), new StringDecoder()); | |
List<MessageStream<String>> streams = topicMessageStreams.get(topic); | |
// | |
executor = Executors.newFixedThreadPool(threads); | |
for (final MessageStream<String> stream : streams) { | |
executor.submit(new Runnable() { | |
public void run() { | |
for (String message : stream) { | |
listener.onMessage(message); | |
} | |
} | |
}); | |
} | |
// | |
Runtime.getRuntime().addShutdownHook(new Thread() { | |
@Override | |
public void run() { | |
close(); | |
} | |
}); | |
} | |
@Override | |
public void close() { | |
if(executor != null) { | |
executor.shutdown(); | |
Closer.closeQuietly(connector); | |
executor = null; | |
} | |
} | |
public static Consumers buildConsumer(final String topic,// | |
final String groupId,// | |
final IMessageListener<String> listener,// | |
final int threads, // | |
Properties props) { | |
if (props == null || props.isEmpty()) { | |
props = new Properties(); | |
} | |
if (!props.containsKey("zk.connect")) { | |
props.put("zk.connect", ConfigCenter.getFullConnection()+ "/xpower/jafka"); | |
} | |
props.put("groupid", groupId); | |
return new Consumers(props, topic, threads, listener); | |
} | |
public static Consumers buildConsumer(final String topic,// | |
final String groupId,// | |
final IMessageListener<String> listener, // | |
final int threads) { | |
return buildConsumer(topic, groupId, listener, threads, null); | |
} | |
/** | |
* create a consumer | |
* | |
* @param topic the topic to be watched | |
* @param groupId grouping the consumer clients | |
* @param listener message listener | |
* @return the real consumer | |
*/ | |
public static Consumers buildConsumer(final String topic,// | |
final String groupId, // | |
final IMessageListener<String> listener) { | |
return buildConsumer(topic, groupId, listener, 2); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//send message | |
StringMessage message = ... | |
Producers.buildProducer().send(message); | |
// consume message | |
// class UserMessageListener implements IMessageListener<String> | |
Consumers.buildConsumer("users", "auditor", new UserMessageListener()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* @author adyliu ([email protected]) | |
* @since 2012-11-27 | |
*/ | |
public interface IMessageListener<T> { | |
/** | |
* 消息处理 | |
* <p> | |
* 此方法不应抛出异常,否则导致消息接受中断,从而无法继续接受新的消息 | |
* </p> | |
* @param message 消息内容 | |
*/ | |
void onMessage(T message); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.Properties; | |
import com.sohu.jafka.producer.Producer; | |
import com.sohu.jafka.producer.ProducerConfig; | |
import com.sohu.jafka.producer.serializer.StringEncoder; | |
public class Producers { | |
private final String ZK_HOSTS = "192.168.6.22:2181"; | |
private final Producer<String, String> producer; | |
private Producers(Properties props) { | |
if (!props.containsKey("zk.connect") && !props.containsKey("broker.list")) { | |
props.put("zk.connect", ZK_HOSTS+"/xpower/jafka"); | |
} | |
if (!props.containsKey("serializer.class")) { | |
props.put("serializer.class", StringEncoder.class.getName()); | |
} | |
ProducerConfig config = new ProducerConfig(props); | |
producer = new com.sohu.jafka.producer.Producer<String, String>(config); | |
Runtime.getRuntime().addShutdownHook(new Thread() { | |
@Override | |
public void run() { | |
producer.close(); | |
} | |
}); | |
} | |
public Producer<String, String> getProducer(){ | |
return this.producer; | |
} | |
/** | |
* send the message to message-queue | |
* @param message message | |
*/ | |
public void send(StringMessage message) { | |
producer.send(message); | |
} | |
private static volatile Producers instance = null; | |
/** | |
* 创建一个生产者 | |
* <p> | |
* 由于每一个生产者都需要和服务器端建立连接,因此,默认情况下应该维持此实例为单实例,减少连接数 | |
* </p> | |
* @param props 扩展参数 | |
* @return 带有特定参数的生产者 | |
*/ | |
public static Producers buildProducer(Properties props) { | |
if(props == null || props.isEmpty()) { | |
if(instance == null) { | |
synchronized (Producers.class) { | |
if(instance == null) { | |
instance = new Producers(new Properties()); | |
} | |
} | |
} | |
return instance; | |
} | |
return new Producers(props); | |
} | |
/** | |
* 创建一个生产者 | |
* <p> | |
* 由于每一个生产者都需要和服务器端建立连接,因此,默认情况下应该维持此实例为单实例,减少连接数 | |
* </p> | |
* @param props 扩展参数 | |
* @return 带有特定参数的生产者 | |
*/ | |
public static Producers buildProducer() { | |
return buildProducer(null); | |
} | |
// | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment