Last active
June 17, 2020 04:06
-
-
Save kaka19ace/c5f20613414888de2dc4 to your computer and use it in GitHub Desktop.
gevent with zmq and redis tasks
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# | |
# @file gevent_zmq_redis_demo.py | |
# @author kaka_ace <[email protected]> | |
# @date Tue Oct 14 2014 | |
""" | |
fetures: there are two concurrent tasks | |
1. From Redis server, Getting the notify msg with BLPOP operation, | |
then using zmq dealer send the msg | |
2. zmq dealer recv msg from zmq router, then set the value(from recv's msg) to Redis | |
base on gevent, redis-py, pyzmq | |
Reference: | |
[ gevent with redis-py ](http://gehrcke.de/2013/01/highly-concurrent-connections-to-redis-with-gevent-and-redis-py/) | |
[ gevent with zmq ](https://github.com/zeromq/pyzmq/blob/master/examples/gevent/poll.py) | |
Python 2.7.8 | |
""" | |
import sys | |
reload(sys) | |
sys.setdefaultencoding("utf8") | |
import os | |
import signal | |
# test ip and port | |
REDIS_IP = "127.0.0.1" | |
REDIS_PORT = 6379 | |
ROUTER_IP = "127.0.0.1" | |
ROUTER_PORT = 59144 # also we could use 44944, just interesting number in chinese :) | |
# gevent, we use gevent to manipulate io tasks (here all socket classes) | |
import gevent | |
# after gevent 1.0.1 in __init__.py, the __all__ has no 'socket' attribute | |
# another way: from ... import ... | |
from gevent import socket as gevent_socket | |
# redis | |
import redis.connection | |
# redis's socket resign to gevent.socket | |
redis.connection.socket = gevent_socket | |
# Instantiate two redis client | |
# redis_pop_client pop list | |
# redis_setvalue_client set value | |
from redis import RedisError | |
from redis import ConnectionPool | |
from redis import Redis | |
REDIS_CONNECTION_POOL = ConnectionPool(max_connections = 8, host = REDIS_IP, port = REDIS_PORT) | |
redis_pop_client = Redis(connection_pool=REDIS_CONNECTION_POOL) | |
redis_setvalue_client = Redis(connection_pool=REDIS_CONNECTION_POOL) | |
# Redis key | |
TEST_REDIS_LIST_KEY = "kaka:test:list" | |
TEST_REDIS_SETVALUE_KEY = "kaka:test:key" | |
# zmq, based on gevent | |
from zmq import green as zmq | |
context = zmq.Context() | |
dealer = context.socket(zmq.DEALER) | |
dealer.set_hwm(1000) # zmq default high water mark is 1000, explicitly here | |
router_addr = "tcp://" + ROUTER_IP + ":" + str(ROUTER_PORT) | |
dealer.connect(router_addr) | |
# regiser dealer in zmq poller, | |
# in each poll task, poller will check dealer ZMQ_POLLIN event | |
poller = zmq.Poller() | |
poller.register(dealer, zmq.POLLIN) | |
# process functions | |
def process_redis_pop_task(): | |
""" | |
brpop msg from Redis list, then dealer send msg | |
""" | |
while True: | |
try: | |
_, content = redis_pop_client.brpop(TEST_REDIS_LIST_KEY ) | |
msgs = [content] | |
dealer.send_multipart(msgs, copy=False) | |
except RedisError, e: | |
continue | |
except TypeError, e: | |
# if redis_pop_client.brpop add timeout param, | |
# brpop will raise TypeError, when the list is empty | |
continue | |
except Exception, e: | |
print "Exception: ", str(e) | |
continue | |
def process_dealer_zmq_pollin(): | |
""" | |
dealer recv msg from router, then set value to Redis | |
""" | |
while True: | |
socks = dict(poller.poll()) | |
if dealer in socks and socks[dealer] == zmq.POLLIN: | |
msgs = dealer.recv_multipart(copy=True) | |
if len(msgs) >= 1: | |
# the demo just use the first msg frame | |
redis_setvalue_client.set(TEST_REDIS_SETVALUE_KEY, msgs[0]) | |
def process_shutdown(signum, greenlets): | |
""" | |
when the process recv signal, before the process exit, gevent will kill all tasks | |
""" | |
gevent.killall(greenlets) | |
def register_sys_exit_handler(greenlets): | |
""" | |
register signal | |
""" | |
gevent.signal(signal.SIGQUIT, process_shutdown, signal.SIGQUIT, greenlets) | |
gevent.signal(signal.SIGINT, process_shutdown, signal.SIGQUIT, greenlets) | |
gevent.signal(signal.SIGTERM, process_shutdown, signal.SIGQUIT, greenlets) | |
gevent.signal(signal.SIGKILL, process_shutdown, signal.SIGQUIT, greenlets) | |
tasks = [gevent.spawn(process_redis_pop_task), gevent.spawn(process_dealer_zmq_pollin)] | |
register_sys_exit_handler(tasks) | |
# gevent loop | |
gevent.joinall(tasks) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment