Created
April 1, 2014 03:26
-
-
Save zhangchunlin/9907155 to your computer and use it in GitHub Desktop.
gevent master slave example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #! /usr/bin/env python | |
| #coding=utf-8 | |
| import gevent | |
| from gevent.server import StreamServer | |
| from gevent.queue import Queue | |
| import json | |
| def sendmsg2fobj(fobj,msg): | |
| fobj.write(json.dumps(msg)+"\n") | |
| fobj.flush() | |
| class slave_controller(object): | |
| def __init__(self,name): | |
| self.name = name | |
| self.online = False | |
| self.nrequest1time = 1 | |
| self.nhandling = 0 | |
| self.fobj = None | |
| def handle1request(self,r): | |
| if self.nhandling<self.nrequest1time: | |
| self.nhandling += 1 | |
| r["invokerq"].put(self) | |
| def invoke(self,iparam): | |
| if self.fobj: | |
| print "send to %s: %s"%(self.name,iparam) | |
| sendmsg2fobj(self.fobj,iparam) | |
| def end1request(self): | |
| if self.nhandling>0: | |
| self.nhandling -= 1 | |
| class master(object): | |
| def __init__(self): | |
| self.next_invoker_id = 1 | |
| self.invokerq = {} | |
| self.scontrollers = {} | |
| self.scontrollers["slave1"] = slave_controller("slave1") | |
| self.slave_request_queue = [] | |
| def new_invoker(self): | |
| iid = self.next_invoker_id | |
| self.next_invoker_id +=1 | |
| self.invokerq[iid] = Queue() | |
| return iid,self.invokerq[iid] | |
| def del_invoker(self,iid): | |
| del self.invokerq[iid] | |
| def update_slave_status(self,name,online,fobj=None): | |
| sc = self.scontrollers[name] | |
| if sc.online!=online: | |
| print "slave %s online: %s->%s"%(name,sc.online,online) | |
| sc.online = online | |
| sc.fobj = fobj | |
| if online: | |
| self.handle_slave_request() | |
| def handle_slave_request(self): | |
| for scname in self.scontrollers: | |
| sc = self.scontrollers[scname] | |
| check_all = False | |
| while (sc.nhandling<sc.nrequest1time) and sc.online and (not check_all): | |
| print "while",sc.nhandling,sc.nrequest1time | |
| check_all = True | |
| for i in xrange(len(self.slave_request_queue)): | |
| ri = -(i+1) | |
| r = self.slave_request_queue[ri] | |
| if r["request_slave_name"]==sc.name: | |
| sc.handle1request(r) | |
| print "after",sc.nhandling,sc.nrequest1time | |
| del self.slave_request_queue[ri] | |
| check_all = False | |
| break | |
| def get_slave(self,slavename): | |
| print "want to get_slave",slavename | |
| myq = Queue() | |
| sreq = {"request_slave_name":"slave1","invokerq":myq} | |
| self.slave_request_queue.append(sreq) | |
| self.handle_slave_request() | |
| slave = myq.get() | |
| print "got_slave",slavename | |
| return slave | |
| def put_slave(self,sc): | |
| print "put_slave",sc.name | |
| sc.end1request() | |
| self.handle_slave_request() | |
| def dispatch(self,msg): | |
| iid = msg.get("iid",0) | |
| q = self.invokerq.get(iid,None) | |
| if q: | |
| q.put_nowait(msg) | |
| gevent.sleep(0) | |
| else: | |
| print "discard %s"%(msg) | |
| def sconnect_handler(self,sock, address): | |
| print('New connection from %s:%s' % address) | |
| fileobj = sock.makefile() | |
| class obj(object):pass | |
| info = obj() | |
| info.sname = None | |
| def init_slave(): | |
| print "init slave" | |
| iid,q = self.new_invoker() | |
| msg = {'iid':iid,'itype':'slave_info_query'} | |
| sendmsg2fobj(fileobj,msg) | |
| rmsg = q.get() | |
| if rmsg.get("ctype")=="session": | |
| sname = rmsg["slave_name"] | |
| self.update_slave_status(sname,online=True,fobj = fileobj) | |
| info.sname = sname | |
| elif rmsg.get("ctype")=="handle": | |
| msg2handle = rmsg["msg2handle"] | |
| print "handle msg:",msg2handle | |
| if msg2handle.get("itype")=="runcmd": | |
| def runcmd(): | |
| iid,q = self.new_invoker() | |
| rmsg = msg2handle.copy() | |
| rmsg["runlocal"]=True | |
| rmsg["iid"]=iid | |
| sendmsg2fobj(fileobj,rmsg) | |
| print "----------------" | |
| while 1: | |
| rmsg = q.get() | |
| stdio = rmsg["stdio"] | |
| if stdio: | |
| if stdio[0]=="stderr": | |
| print "err:", | |
| print stdio[1], | |
| else: | |
| break | |
| fileobj.close() | |
| gruncmd = gevent.spawn(runcmd) | |
| self.del_invoker(iid) | |
| print "finish init slave" | |
| gevent.spawn(init_slave) | |
| while True: | |
| try: | |
| line = fileobj.readline() | |
| except AttributeError,e: | |
| break | |
| if not line: | |
| print ("slave disconnected") | |
| break | |
| try: | |
| msg = json.loads(line) | |
| self.dispatch(msg) | |
| except ValueError,e: | |
| print e | |
| if info.sname: | |
| self.update_slave_status(info.sname,online=False,fobj=None) | |
| def main(): | |
| m = master() | |
| def sconnect_handler(sock, address): | |
| m.sconnect_handler(sock, address) | |
| server = StreamServer(('0.0.0.0', 6000), sconnect_handler) | |
| def run_cmd_slave1(): | |
| iid,q = m.new_invoker() | |
| for i in range(3): | |
| sc = m.get_slave("slave1") | |
| print "%d got slave :%s"%(i,sc.name) | |
| iparam = { | |
| "iid":iid, | |
| "itype":"runcmd", | |
| "cmd":"ping localhost -c 3" | |
| } | |
| sc.invoke(iparam) | |
| rmsg = q.get() | |
| print rmsg | |
| m.put_slave(sc) | |
| #gevent.sleep(8) | |
| m.del_invoker(iid) | |
| gevent.spawn(run_cmd_slave1) | |
| print('Starting master server on port 6000') | |
| server.serve_forever() | |
| print('Stop master server') | |
| if __name__ == '__main__': | |
| main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #! /usr/bin/env python | |
| #coding=utf-8 | |
| import gevent | |
| from gevent.socket import create_connection | |
| from gevent.subprocess import Popen, PIPE | |
| from gevent.queue import Queue | |
| import json | |
| def sendmsg2fobj(fobj,msg): | |
| fobj.write(json.dumps(msg)+"\n") | |
| fobj.flush() | |
| class slave(object): | |
| def __init__(self,msg2handle=None): | |
| self.fileobj = None | |
| self.msg2handle = msg2handle | |
| self.sp_stdioq = Queue() | |
| self.sp_returncode = None | |
| def handle_msg(self,msg): | |
| if msg.get("itype")=="slave_info_query": | |
| rmsg = {} | |
| rmsg["slave_name"]="slave1" | |
| rmsg["mtype"]="info" | |
| if self.msg2handle: | |
| rmsg["ctype"]="handle" | |
| else: | |
| rmsg["ctype"]="session" | |
| rmsg["iid"]=msg["iid"] | |
| rmsg["msg2handle"]=self.msg2handle | |
| sendmsg2fobj(self.fileobj,rmsg) | |
| elif msg.get("itype")=="runcmd": | |
| if msg.get("runlocal")==True: | |
| def runlocal(): | |
| cmd = msg["cmd"] | |
| print "run cmd:'%s'"%(cmd) | |
| sp = Popen([cmd], stdout = PIPE, stderr = PIPE, shell=True) | |
| def stdout2q(): | |
| l = sp.stdout.readline() | |
| while l: | |
| self.sp_stdioq.put_nowait(("stdout",l)) | |
| l = sp.stdout.readline() | |
| def stderr2q(): | |
| l = sp.stderr.readline() | |
| while l: | |
| self.sp_stdioq.put_nowait(("stderr",l)) | |
| l = sp.stderr.readline() | |
| def q2master(): | |
| while 1: | |
| stdio = self.sp_stdioq.get() | |
| rmsg = {} | |
| rmsg["mtype"] = "stdio" | |
| rmsg["iid"]=msg["iid"] | |
| rmsg["stdio"]=stdio | |
| if self.fileobj: | |
| sendmsg2fobj(self.fileobj,rmsg) | |
| else: | |
| print "%s not sent"%(rmsg) | |
| gstdout2q = gevent.spawn(stdout2q) | |
| gstderr2q = gevent.spawn(stderr2q) | |
| gq2master = gevent.spawn(q2master) | |
| sp.wait() | |
| self.sp_returncode = sp.returncode | |
| print "cmd run return:%d"%(self.sp_returncode) | |
| rmsg = {} | |
| rmsg["mtype"] = "stdio" | |
| rmsg["iid"]=msg["iid"] | |
| rmsg["stdio"]=None | |
| sendmsg2fobj(self.fileobj,rmsg) | |
| grunlocal = gevent.spawn(runlocal) | |
| #print("grunlocal:%s"%(grunlocal)) | |
| else: | |
| def runcmd(): | |
| cslave = slave(msg2handle=msg) | |
| cslave.main() | |
| rmsg = { | |
| "iid":msg["iid"], | |
| "itype":"runcmd", | |
| "cmd":msg["cmd"], | |
| "sp_returncode":cslave.sp_returncode | |
| } | |
| sendmsg2fobj(self.fileobj,rmsg) | |
| gruncmd = gevent.spawn(runcmd) | |
| #print("gruncmd:%s"%(gruncmd)) | |
| else: | |
| print "discard msg:%s"%(msg) | |
| def main(self): | |
| self.sock = create_connection(("localhost",6000)) | |
| self.fileobj = self.sock.makefile() | |
| while True: | |
| line = self.fileobj.readline() | |
| if not line: | |
| print ("server disconnected") | |
| break | |
| try: | |
| msg = json.loads(line) | |
| print "receive %s"%(msg) | |
| self.handle_msg(msg) | |
| except ValueError,e: | |
| print e | |
| self.fileobj = None | |
| if __name__ == '__main__': | |
| s = slave() | |
| s.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment