Created
September 22, 2015 03:53
-
-
Save Ivlyth/b083201feb68ea7d6c57 to your computer and use it in GitHub Desktop.
redis subpub client
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding:utf-8 -*- | |
''' | |
Author : myth | |
Date : 15-9-22 | |
Email : belongmyth at 163.com | |
''' | |
import redis | |
import time | |
class RedisPublishClient(): | |
def __init__(self, channel, host=u'localhost', port=6379, db=0, password=None): | |
self.channel = channel | |
self.host = host | |
self.port = port | |
self.db = db | |
self.password = password | |
self.client = None | |
self.retryTime = None | |
self.retryStart = 1.0 | |
self.retryMax = 30.0 | |
self.retryFactor = 2.0 | |
self.failed_msg = [] | |
def makeClient(self): | |
c = redis.StrictRedis(host=self.host, port=self.port, db=self.db, password=self.password) | |
c.ping() | |
return c | |
def createClient(self): | |
now = time.time() | |
if self.retryTime is None: | |
attempt = 1 | |
else: | |
attempt = (now >= self.retryTime) | |
if attempt: | |
try: | |
self.client = self.makeClient() | |
self.retryTime = None | |
except redis.ConnectionError: | |
if self.retryTime is None: | |
self.retryPeriod = self.retryStart | |
else: | |
self.retryPeriod = self.retryPeriod * self.retryFactor | |
if self.retryPeriod > self.retryMax: | |
self.retryPeriod = self.retryMax | |
self.retryTime = now + self.retryPeriod | |
def send(self, s): | |
if self.client is None: | |
self.createClient() | |
if self.client: | |
try: | |
self.client.publish(self.channel, s) | |
while self.failed_msg: | |
msg = self.failed_msg.pop(0) | |
try: | |
self.client.publish(self.channel, msg) | |
except redis.ConnectionError: | |
self.failed_msg.append(msg) | |
except redis.ConnectionError: | |
self.failed_msg.append(s) | |
self.client = None | |
else: | |
self.failed_msg.append(s) | |
def makePickle(self, record): | |
return record | |
def emit(self, record): | |
try: | |
s = self.makePickle(record) | |
self.send(s) | |
except Exception as e: | |
raise | |
class RedisSubscribeClient(): | |
def __init__(self, channel, msg_queue, host=u'localhost', port=6379, db=0, password=None): | |
self.channel = channel | |
self.queue = msg_queue # use queue to trans msg | |
self.host = host | |
self.port = port | |
self.db = db | |
self.password = password | |
self.client = None | |
self.retryTime = None | |
self.retryStart = 1.0 | |
self.retryMax = 30.0 | |
self.retryFactor = 2.0 | |
def makeClient(self): | |
c = redis.StrictRedis(host=self.host, port=self.port, db=self.db, password=self.password) | |
c.ping() | |
return c | |
def createClient(self): | |
now = time.time() | |
if self.retryTime is None: | |
attempt = 1 | |
else: | |
attempt = (now >= self.retryTime) | |
if attempt: | |
try: | |
self.client = self.makeClient() | |
self.retryTime = None | |
except redis.ConnectionError: | |
if self.retryTime is None: | |
self.retryPeriod = self.retryStart | |
else: | |
self.retryPeriod = self.retryPeriod * self.retryFactor | |
if self.retryPeriod > self.retryMax: | |
self.retryPeriod = self.retryMax | |
self.retryTime = now + self.retryPeriod | |
def listen(self): | |
while True: | |
if self.client is None: | |
self.createClient() | |
if self.client: | |
ps = self.client.pubsub() | |
ps.subscribe(self.channel) | |
try: | |
for item in ps.listen(): | |
if item[u'type'] != u'message': | |
continue | |
chunk = item[u'data'] | |
self.queue.put(chunk) | |
except redis.ConnectionError: | |
self.client = None | |
time.sleep(0.5) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment