Last active
August 29, 2015 14:16
-
-
Save jayrambhia/a5eb3337b7165096b556 to your computer and use it in GitHub Desktop.
AMQP chat/message server and client
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import pika | |
| import sys | |
| import logging | |
| import json | |
| import datetime | |
| logging.basicConfig() | |
| # Demo Data | |
| channels = ["channel_1", "channel_2", "channel_3", "channel_4"] | |
| class SimpleMessage: | |
| def __init__(self, message, channel): | |
| self.message = message | |
| self.channel = channel | |
| self.timestamp = datetime.datetime.now() | |
| def getJSONDump(self): | |
| jsondata = {"message":self.message, | |
| "channel":self.channel, | |
| "timestamp":str(self.timestamp)} | |
| return json.dumps(jsondata) | |
| # Create AMQP Connection | |
| connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) | |
| channel = connection.channel() | |
| exchange = 'group_messages' | |
| channel.exchange_declare(exchange=exchange, type='direct') | |
| # Run a while loop and get data from command line | |
| while True: | |
| try: | |
| data = raw_input() | |
| msg, channel_index = data.split(":") | |
| channel_id = channels[int(channel_index)] | |
| simpleMessage = SimpleMessage(msg, channel_id) | |
| # Publish to the queue | |
| channel.basic_publish(exchange=exchange, routing_key=channel_id, body=simpleMessage.getJSONDump()) | |
| print "[x] sent", simpleMessage.getJSONDump() | |
| except KeyboardInterrupt: | |
| print "closing queue" | |
| connection.close() | |
| break | |
| # Usage | |
| """ | |
| python emit_group_messages.py | |
| # Input style - message:channel_index_in_list | |
| >>> Input prompt | |
| This is a simple message.:1 | |
| >>> Input prompt | |
| This is another simple message.:2 | |
| """ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import pika | |
| import sys | |
| import logging | |
| logging.basicConfig() | |
| channels = sys.argv[1:] # channels/groups that you want to listen to | |
| if not channels: | |
| print "No channels to listen" | |
| sys.exit(1) | |
| # Create AMQP Connection | |
| connection = pika.BlockingConnection(pika.ConnectionParameters( | |
| host='localhost')) | |
| channel = connection.channel() | |
| exchange = 'group_messages' | |
| channel.exchange_declare(exchange=exchange, type='direct') | |
| # get queue | |
| result = channel.queue_declare(exclusive=True) | |
| queue_name = result.method.queue | |
| # bind queue to channels/groups | |
| for c_id in channels: | |
| channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=c_id) | |
| print "[*] waiting for messages" | |
| def callback(ch, method, properties, body): | |
| print "[x] %r %r" % (method.routing_key, body,) | |
| # start consuming | |
| channel.basic_consume(callback, queue=queue_name, no_ack=True) | |
| channel.start_consuming() | |
| # Usage | |
| """ | |
| python receive_group_messages.py channel_1 channel_3 | |
| """ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package com.fenchtose.amqptest; | |
| import android.app.Service; | |
| import android.content.Intent; | |
| import android.os.Bundle; | |
| import android.os.Handler; | |
| import android.os.IBinder; | |
| import android.os.Message; | |
| import android.util.Log; | |
| import com.rabbitmq.client.Channel; | |
| import com.rabbitmq.client.Connection; | |
| import com.rabbitmq.client.ConnectionFactory; | |
| import com.rabbitmq.client.QueueingConsumer; | |
| import java.util.ArrayList; | |
| public class ReceiverService extends Service { | |
| private static final String HOSTNAME = "192.168.1.14"; // Your local/remote server address | |
| private static final String EXCHANGE_NAME = "group_messages"; | |
| private static final String TAG = "ReceiverService"; | |
| private ArrayList<String> subscribedChannels; | |
| private Thread subscribeThread; | |
| private ConnectionFactory factory; | |
| private Handler incomingMessageHandler; | |
| public ReceiverService() { | |
| } | |
| @Override | |
| public IBinder onBind(Intent intent) { | |
| return null; | |
| } | |
| @Override | |
| public void onCreate() { | |
| Log.i(TAG, "Service onCreate called"); | |
| incomingMessageHandler = new Handler() { | |
| @Override | |
| public void handleMessage(Message msg) { | |
| String message = msg.getData().getString("json_message"); | |
| deliverMessage(message); | |
| // do something here | |
| } | |
| }; | |
| subscribedChannels = new ArrayList<String>(); | |
| subscribedChannels.add("channel_2"); | |
| subscribedChannels.add("channel_3"); | |
| } | |
| @Override | |
| public int onStartCommand(Intent intent, int flags, int startId) { | |
| Log.i(TAG, "onStartCommand called"); | |
| if (factory == null) { | |
| setupConnectionFactory(); | |
| setupSubscription(incomingMessageHandler); | |
| } | |
| return START_STICKY; | |
| } | |
| @Override | |
| public void onDestroy() { | |
| Log.i(TAG, "onDestroy called"); | |
| subscribeThread.interrupt(); | |
| super.onDestroy(); | |
| } | |
| private void setupConnectionFactory() { | |
| factory = new ConnectionFactory(); | |
| factory.setHost(HOSTNAME); | |
| factory.setUsername("android"); | |
| factory.setPassword("android"); | |
| } | |
| private void setupSubscription(final Handler handler) { | |
| if (subscribeThread != null) { | |
| subscribeThread.interrupt(); | |
| subscribeThread.start(); | |
| return; | |
| } | |
| subscribeThread = new Thread(new Runnable(){ | |
| @Override | |
| public void run() { | |
| while(true) { | |
| try { | |
| Connection connection = factory.newConnection(); | |
| Channel channel = connection.createChannel(); | |
| channel.exchangeDeclare(EXCHANGE_NAME, "direct"); | |
| String queueName = channel.queueDeclare().getQueue(); | |
| // bind queue to channels | |
| for(int i=0; i<subscribedChannels.size(); i++) { | |
| channel.queueBind(queueName, EXCHANGE_NAME, subscribedChannels.get(i)); | |
| } | |
| QueueingConsumer consumer = new QueueingConsumer(channel); | |
| channel.basicConsume(queueName, true, consumer); | |
| while(true) { | |
| try { | |
| QueueingConsumer.Delivery delivery = consumer.nextDelivery(); | |
| String routingKey = delivery.getEnvelope().getRoutingKey(); | |
| String message = new String(delivery.getBody()); | |
| Log.i(TAG, "[r] " + message); | |
| Message msg = handler.obtainMessage(); | |
| Bundle bundle = new Bundle(); | |
| bundle.putString("json_message", message); | |
| msg.setData(bundle); | |
| handler.sendMessage(msg); | |
| } catch(InterruptedException ie) { | |
| ie.printStackTrace(); | |
| return; | |
| } | |
| } | |
| } catch(Exception e) { | |
| if (e.getClass().equals(InterruptedException.class)) { | |
| Log.e(TAG, "thread interrupted"); | |
| break; | |
| } | |
| Log.e(TAG, "connection broke"); | |
| e.printStackTrace(); | |
| try { | |
| Thread.sleep(4000); | |
| } catch (InterruptedException e1) { | |
| break; | |
| } | |
| } | |
| } | |
| } | |
| }); | |
| subscribeThread.start(); | |
| } | |
| private void deliverMessage(String message) { | |
| // deliver to activity or store in database | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment