Created
June 24, 2011 22:07
-
-
Save josiahcarlson/1045789 to your computer and use it in GitHub Desktop.
A way of implementing a poll-based chat inside Redis
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
redis_simple_chat.py | |
Written June 24, 2011 by Josiah Carlson | |
Released under the GNU GPL v2 | |
available: http://www.gnu.org/licenses/gpl-2.0.html | |
Other licenses may be available upon request. | |
This module intends to offer a simple way for Redis to hold state for chat | |
channels over time. Clients send messages, check for messages since they last | |
checked, and some cron daemon calls a cleanup function on occasion. | |
Requires a recent version of https://github.com/andymccurdy/redis-py/ or a | |
work-alike. | |
''' | |
import time | |
import redis | |
def send_message(conn, channel, message): | |
''' | |
This function will send a message to the provided channel. | |
''' | |
# get the id for this message | |
with conn.Lock('lock:channel:' + channel, timeout=5): | |
id = conn.zincrby('ids:', channel, 1) | |
pipeline = conn.pipeline(transaction=True) | |
# store the data | |
pipeline.hset('messages:' + channel, id, message) | |
# update the timeline | |
pipeline.zadd('timeline:' + channel, id, id) | |
# and keep a record about when this channel last got a message | |
pipeline.zadd('updated:', channel, time.time()) | |
pipeline.execute() | |
def check_messages(conn, client, channel, limit=10): | |
''' | |
This function will check messages for a client on a given channel. | |
''' | |
# we must ensure that only one instance of this client is fetching data | |
# at a time | |
with conn.Lock('lock:client:' + client, timeout=5): | |
with conn.lock('lock:channel:' + channel, timeout=5): | |
pipeline = conn.pipeline(True) | |
cl_ch = client + ':' + channel | |
progress = 'clients:' + cl_ch | |
# find out which messages we already know about | |
pipeline.zinterstore(progress + ':tmp', ['timeline:' + channel, progress], aggregate='MAX') | |
# set the last time this client checked for messages | |
pipeline.zadd('clients:', cl_ch, time.time()) | |
results = pipeline.execute() | |
if results[-1]: | |
# Client timed out, or client is new. | |
pipeline.zunionstore(progress, ['timeline:' + channel]) | |
pipeline.delete(progress + ':tmp') | |
pipeline.execute() | |
return 'new connection' | |
elif not results[0]: | |
# First messages in a new channel, or a channel that had been | |
# deleted due to timeout. | |
conn.zunionstore(progress + 'tmp', ['timeline:' + channel]) | |
if limit is not None: | |
pipeline.zremrangebyrank(progress + ':tmp', limit, -1) | |
# get the ids, update the known ids | |
pipeline.zrange(progress + ':tmp', 0, -1) | |
pipeline.zunionstore(progress, [progress, progress + ':tmp'], aggregate='MAX') | |
# discard memory of timed-out messages | |
pipeline.zinterstore(progress, [progress, 'timeline:' + channel], aggregate='MAX') | |
pipeline.delete(progress + ':tmp') | |
ids = pipeline.execute()[-4] | |
# return the known messages | |
return [msg for msg in conn.hmget('messages:' + channel, ids) if msg] | |
def clean_out_channel_backlog(conn, backlog=100, channel_timeout=900, client_timeout=300): | |
''' | |
This function cleans out old messages from channels, old channels, and | |
information about old client/channel pairs. | |
''' | |
# only one backlog cleanup function call can run at a time | |
with conn.Lock('lock:cleanup:'): | |
pipeline = conn.pipeline(True) | |
# find those channels that haven't been updated for a while | |
ch_timeout = time.time() - channel_timeout | |
for channel, score in conn.zrangebyscore('updated:', 0, ch_timeout, withscores=True): | |
with conn.Lock('lock:channel:' + channel, timeout=5): | |
if conn.zscore('updated:', channel) == score: | |
pipeline.delete( | |
'messages:' + channel, | |
'timeline:' + channel, | |
) | |
pipeline.zrem('updated:', channel) | |
pipeline.zrem('ids:', channel) | |
pipeline.execute() | |
# get a prioritized list of those channels that have the most volume | |
pipeline.zinterstore('ids:tmp', {'ids:cleanup': 1, 'ids:': -1}) | |
pipeline.zunionstore('ids:cleanup', ['ids:']) | |
known = pipeline.execute()[0] | |
if not known: | |
# if we've never cleaned up before, clean them all | |
known = conn.zunionstore('ids:tmp', ['ids:cleanup']) | |
# iterate over chunks of channels to clean out old messages | |
for i in xrange(0, known, 100): | |
for channel in conn.zrange('ids:tmp', i, i+99): | |
with conn.Lock('lock:channel:' + channel, timeout=5): | |
# remove old messages from the timeline | |
message_ids = conn.zrange('timeline:' + channel, 0, -backlog-1) | |
for id in message_ids: | |
pipeline.hdel('messages:' + channel, id) | |
pipeline.zrem('timeline:' + channel, id) | |
pipeline.execute() | |
conn.delete('ids:tmp') | |
# clean out old clients that haven't checked recently | |
cl_timeout = time.time() - client_timeout | |
for client, score in conn.zrange('clients:', 0, cl_timeout, withscores=True): | |
with conn.Lock('lock:client' + client, timeout=5): | |
if conn.zscore('clients:', client) == score: | |
# remove the notice of when the item was last called | |
pipeline.zrem('clients:', client) | |
# clean out the progress zset | |
pipeline.delete('clients:' + client) | |
pipeline.execute() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment