Skip to content

Instantly share code, notes, and snippets.

@Ivlyth
Created September 22, 2015 03:53
Show Gist options
  • Save Ivlyth/b083201feb68ea7d6c57 to your computer and use it in GitHub Desktop.
Save Ivlyth/b083201feb68ea7d6c57 to your computer and use it in GitHub Desktop.
redis subpub client
#!/usr/bin/env python
# -*- coding:utf-8 -*-
'''
Author : myth
Date : 15-9-22
Email : belongmyth at 163.com
'''
import redis
import time
class RedisPublishClient():
def __init__(self, channel, host=u'localhost', port=6379, db=0, password=None):
self.channel = channel
self.host = host
self.port = port
self.db = db
self.password = password
self.client = None
self.retryTime = None
self.retryStart = 1.0
self.retryMax = 30.0
self.retryFactor = 2.0
self.failed_msg = []
def makeClient(self):
c = redis.StrictRedis(host=self.host, port=self.port, db=self.db, password=self.password)
c.ping()
return c
def createClient(self):
now = time.time()
if self.retryTime is None:
attempt = 1
else:
attempt = (now >= self.retryTime)
if attempt:
try:
self.client = self.makeClient()
self.retryTime = None
except redis.ConnectionError:
if self.retryTime is None:
self.retryPeriod = self.retryStart
else:
self.retryPeriod = self.retryPeriod * self.retryFactor
if self.retryPeriod > self.retryMax:
self.retryPeriod = self.retryMax
self.retryTime = now + self.retryPeriod
def send(self, s):
if self.client is None:
self.createClient()
if self.client:
try:
self.client.publish(self.channel, s)
while self.failed_msg:
msg = self.failed_msg.pop(0)
try:
self.client.publish(self.channel, msg)
except redis.ConnectionError:
self.failed_msg.append(msg)
except redis.ConnectionError:
self.failed_msg.append(s)
self.client = None
else:
self.failed_msg.append(s)
def makePickle(self, record):
return record
def emit(self, record):
try:
s = self.makePickle(record)
self.send(s)
except Exception as e:
raise
class RedisSubscribeClient():
def __init__(self, channel, msg_queue, host=u'localhost', port=6379, db=0, password=None):
self.channel = channel
self.queue = msg_queue # use queue to trans msg
self.host = host
self.port = port
self.db = db
self.password = password
self.client = None
self.retryTime = None
self.retryStart = 1.0
self.retryMax = 30.0
self.retryFactor = 2.0
def makeClient(self):
c = redis.StrictRedis(host=self.host, port=self.port, db=self.db, password=self.password)
c.ping()
return c
def createClient(self):
now = time.time()
if self.retryTime is None:
attempt = 1
else:
attempt = (now >= self.retryTime)
if attempt:
try:
self.client = self.makeClient()
self.retryTime = None
except redis.ConnectionError:
if self.retryTime is None:
self.retryPeriod = self.retryStart
else:
self.retryPeriod = self.retryPeriod * self.retryFactor
if self.retryPeriod > self.retryMax:
self.retryPeriod = self.retryMax
self.retryTime = now + self.retryPeriod
def listen(self):
while True:
if self.client is None:
self.createClient()
if self.client:
ps = self.client.pubsub()
ps.subscribe(self.channel)
try:
for item in ps.listen():
if item[u'type'] != u'message':
continue
chunk = item[u'data']
self.queue.put(chunk)
except redis.ConnectionError:
self.client = None
time.sleep(0.5)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment