Skip to content

Instantly share code, notes, and snippets.

@quiver
Created December 16, 2012 09:06
Show Gist options
  • Save quiver/4305768 to your computer and use it in GitHub Desktop.
Save quiver/4305768 to your computer and use it in GitHub Desktop.
pub-sub based simple message push system. DB : Redis, program : Python

README

pub-sub based simple message push system.

programs

pub.py

  • pub-sub #admin publisher
  • pub-sub #channel_name publisher

sub_admin.py

  • pub-sub #admin subscriber

sub_chat.py

  • pub-sub #channel_name subscriber

pubsub channel

#admin

used for chat group administration

  • create/dele channel
  • join/leave to/from channel

#channel_name

used for chat messages for each channel

Usage

redis

start a redis-server

$ redis-server &

server-program

$ python sub_admin.py

sub_chat.py is dynamically forked from sub_admin.py.

client-program

$ python pub.py --create channel
$ python pub.py --remove channel
$ python pub.py --join channel user
$ python pub.py --leave channel user
$ python pub.py --speak channel message

data model

db_name = 5 is used.

channel names

  • type : set
  • key : channel
  • value : {channel_name, channel_name, ...}

chat users

  • type : set
  • key : user:channel_name
  • value : {user_name, user_name, ...}

process

  • type : hash
  • key : process
  • value : {channel_name : pid, channel_name:pid, ...}
# vim: set fileencoding=utf8
import argparse
import json
import redis
client = redis.StrictRedis(host='localhost', port=6379, db=5)
def cmd_create_chat(channel):
client.publish('admin',
json.dumps({'type' : 'create',
'channel' : channel}))
def cmd_remove_chat(channel):
client.publish('admin',
json.dumps({'type' : 'remove',
'channel' : channel}))
def cmd_join_chat(channel, user):
client.publish('admin',
json.dumps({'type' : 'join',
'channel' : channel,
'user' : user}))
def cmd_leave_chat(channel, user):
client.publish('admin',
json.dumps({'type' : 'leave',
'channel' : channel,
'user' : user}))
def cmd_speak_chat(channel, message):
client.publish(channel, message)
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--create', nargs=1, metavar=('channel', ))
parser.add_argument('--remove', nargs=1, metavar=('channel', ))
parser.add_argument('--join', nargs=2, metavar=('channel', 'user'))
parser.add_argument('--leave', nargs=2, metavar=('channel', 'user'))
parser.add_argument('--speak', nargs=2, metavar=('channel', 'message'))
args = parser.parse_args()
if args.create is not None:
cmd_create_chat(*args.create)
elif args.remove is not None:
cmd_remove_chat(*args.remove)
elif args.join is not None:
cmd_join_chat(*args.join)
elif args.leave is not None:
cmd_leave_chat(*args.leave)
elif args.speak is not None:
cmd_speak_chat(*args.speak)
if __name__ == '__main__':
main()
# vim: set fileencoding=utf8
import json
import os
import signal
import syslog
import redis
import sub_chat
client = redis.StrictRedis(host='localhost', port=6379, db=5)
pubsub = client.pubsub()
pubsub.subscribe('admin')
def subscribe_to_channel(channel):
if client.sismember('channel', channel):
return
pid = os.fork()
if pid > 0:
# parent process
client.sadd('channel', channel)
client.hset('process', channel, pid)
elif pid == 0:
# child process
sub_chat.listen(channel)
def unsubscribe_to_channel(channel):
client.srem('channel', channel)
client.delete('user' + ':' + channel)
pid = client.hget('process', channel)
if pid.isdigit():
syslog.syslog('kill subscribe process[%s]' % pid)
os.kill(int(pid), signal.SIGTERM)
os.waitpid(int(pid), 0)
def join_user(channel, user):
key = 'user' + ':' + channel
client.sadd(key, user)
def leave_user(channel, user):
key = 'user' + ':' + channel
client.srem(key, user)
def init_subscribe():
for channel in client.smembers('channel'):
subscribe_to_channel(channel)
def main():
init_subscribe()
for message in pubsub.listen():
if message['type'] == 'message':
data = message['data']
data = json.loads(data)
# create channel
if data['type'] == 'create':
subscribe_to_channel(data['channel'])
# remove channel
elif data['type'] == 'remove':
unsubscribe_to_channel(data['channel'])
# join channel
elif data['type'] == 'join':
join_user(data['channel'], data['user'])
# leave channel
elif data['type'] == 'leave':
leave_user(data['channel'], data['user'])
else:
print 'admin', message
if __name__ == '__main__':
main()
# vim: set fileencoding=utf8
import os
import syslog
import redis
def async_push_notify(channel, user, message):
syslog.syslog('push message [%s] of channel [#%s] to user [%s]' %
(message, channel, user))
def listen(channel):
client = redis.StrictRedis(host='localhost', port=6379, db=5)
pubsub = client.pubsub()
pubsub.subscribe(channel)
syslog.syslog('[%s]subscribe to %s' % (os.getpid(), channel))
for message in pubsub.listen():
if message['type'] == 'message':
syslog.syslog('received a message of channel [#%s]' % channel)
key = 'user' + ':' + channel
for user in client.smembers(key):
async_push_notify(channel, user, message['data'])
else:
print 'admin', message
def main():
pass
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment