Skip to content

Instantly share code, notes, and snippets.

@jayrambhia
Last active August 29, 2015 14:16
Show Gist options
  • Select an option

  • Save jayrambhia/a5eb3337b7165096b556 to your computer and use it in GitHub Desktop.

Select an option

Save jayrambhia/a5eb3337b7165096b556 to your computer and use it in GitHub Desktop.
AMQP chat/message server and client
import pika
import sys
import logging
import json
import datetime
logging.basicConfig()
# Demo Data
channels = ["channel_1", "channel_2", "channel_3", "channel_4"]
class SimpleMessage:
def __init__(self, message, channel):
self.message = message
self.channel = channel
self.timestamp = datetime.datetime.now()
def getJSONDump(self):
jsondata = {"message":self.message,
"channel":self.channel,
"timestamp":str(self.timestamp)}
return json.dumps(jsondata)
# Create AMQP Connection
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
exchange = 'group_messages'
channel.exchange_declare(exchange=exchange, type='direct')
# Run a while loop and get data from command line
while True:
try:
data = raw_input()
msg, channel_index = data.split(":")
channel_id = channels[int(channel_index)]
simpleMessage = SimpleMessage(msg, channel_id)
# Publish to the queue
channel.basic_publish(exchange=exchange, routing_key=channel_id, body=simpleMessage.getJSONDump())
print "[x] sent", simpleMessage.getJSONDump()
except KeyboardInterrupt:
print "closing queue"
connection.close()
break
# Usage
"""
python emit_group_messages.py
# Input style - message:channel_index_in_list
>>> Input prompt
This is a simple message.:1
>>> Input prompt
This is another simple message.:2
"""
import pika
import sys
import logging
logging.basicConfig()
channels = sys.argv[1:] # channels/groups that you want to listen to
if not channels:
print "No channels to listen"
sys.exit(1)
# Create AMQP Connection
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
exchange = 'group_messages'
channel.exchange_declare(exchange=exchange, type='direct')
# get queue
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
# bind queue to channels/groups
for c_id in channels:
channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=c_id)
print "[*] waiting for messages"
def callback(ch, method, properties, body):
print "[x] %r %r" % (method.routing_key, body,)
# start consuming
channel.basic_consume(callback, queue=queue_name, no_ack=True)
channel.start_consuming()
# Usage
"""
python receive_group_messages.py channel_1 channel_3
"""
package com.fenchtose.amqptest;
import android.app.Service;
import android.content.Intent;
import android.os.Bundle;
import android.os.Handler;
import android.os.IBinder;
import android.os.Message;
import android.util.Log;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.util.ArrayList;
public class ReceiverService extends Service {
private static final String HOSTNAME = "192.168.1.14"; // Your local/remote server address
private static final String EXCHANGE_NAME = "group_messages";
private static final String TAG = "ReceiverService";
private ArrayList<String> subscribedChannels;
private Thread subscribeThread;
private ConnectionFactory factory;
private Handler incomingMessageHandler;
public ReceiverService() {
}
@Override
public IBinder onBind(Intent intent) {
return null;
}
@Override
public void onCreate() {
Log.i(TAG, "Service onCreate called");
incomingMessageHandler = new Handler() {
@Override
public void handleMessage(Message msg) {
String message = msg.getData().getString("json_message");
deliverMessage(message);
// do something here
}
};
subscribedChannels = new ArrayList<String>();
subscribedChannels.add("channel_2");
subscribedChannels.add("channel_3");
}
@Override
public int onStartCommand(Intent intent, int flags, int startId) {
Log.i(TAG, "onStartCommand called");
if (factory == null) {
setupConnectionFactory();
setupSubscription(incomingMessageHandler);
}
return START_STICKY;
}
@Override
public void onDestroy() {
Log.i(TAG, "onDestroy called");
subscribeThread.interrupt();
super.onDestroy();
}
private void setupConnectionFactory() {
factory = new ConnectionFactory();
factory.setHost(HOSTNAME);
factory.setUsername("android");
factory.setPassword("android");
}
private void setupSubscription(final Handler handler) {
if (subscribeThread != null) {
subscribeThread.interrupt();
subscribeThread.start();
return;
}
subscribeThread = new Thread(new Runnable(){
@Override
public void run() {
while(true) {
try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
// bind queue to channels
for(int i=0; i<subscribedChannels.size(); i++) {
channel.queueBind(queueName, EXCHANGE_NAME, subscribedChannels.get(i));
}
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while(true) {
try {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String routingKey = delivery.getEnvelope().getRoutingKey();
String message = new String(delivery.getBody());
Log.i(TAG, "[r] " + message);
Message msg = handler.obtainMessage();
Bundle bundle = new Bundle();
bundle.putString("json_message", message);
msg.setData(bundle);
handler.sendMessage(msg);
} catch(InterruptedException ie) {
ie.printStackTrace();
return;
}
}
} catch(Exception e) {
if (e.getClass().equals(InterruptedException.class)) {
Log.e(TAG, "thread interrupted");
break;
}
Log.e(TAG, "connection broke");
e.printStackTrace();
try {
Thread.sleep(4000);
} catch (InterruptedException e1) {
break;
}
}
}
}
});
subscribeThread.start();
}
private void deliverMessage(String message) {
// deliver to activity or store in database
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment