Skip to content

Instantly share code, notes, and snippets.

@zhangchunlin
Created April 1, 2014 03:26
Show Gist options
  • Select an option

  • Save zhangchunlin/9907155 to your computer and use it in GitHub Desktop.

Select an option

Save zhangchunlin/9907155 to your computer and use it in GitHub Desktop.
gevent master slave example
#! /usr/bin/env python
#coding=utf-8
import gevent
from gevent.server import StreamServer
from gevent.queue import Queue
import json
def sendmsg2fobj(fobj,msg):
fobj.write(json.dumps(msg)+"\n")
fobj.flush()
class slave_controller(object):
def __init__(self,name):
self.name = name
self.online = False
self.nrequest1time = 1
self.nhandling = 0
self.fobj = None
def handle1request(self,r):
if self.nhandling<self.nrequest1time:
self.nhandling += 1
r["invokerq"].put(self)
def invoke(self,iparam):
if self.fobj:
print "send to %s: %s"%(self.name,iparam)
sendmsg2fobj(self.fobj,iparam)
def end1request(self):
if self.nhandling>0:
self.nhandling -= 1
class master(object):
def __init__(self):
self.next_invoker_id = 1
self.invokerq = {}
self.scontrollers = {}
self.scontrollers["slave1"] = slave_controller("slave1")
self.slave_request_queue = []
def new_invoker(self):
iid = self.next_invoker_id
self.next_invoker_id +=1
self.invokerq[iid] = Queue()
return iid,self.invokerq[iid]
def del_invoker(self,iid):
del self.invokerq[iid]
def update_slave_status(self,name,online,fobj=None):
sc = self.scontrollers[name]
if sc.online!=online:
print "slave %s online: %s->%s"%(name,sc.online,online)
sc.online = online
sc.fobj = fobj
if online:
self.handle_slave_request()
def handle_slave_request(self):
for scname in self.scontrollers:
sc = self.scontrollers[scname]
check_all = False
while (sc.nhandling<sc.nrequest1time) and sc.online and (not check_all):
print "while",sc.nhandling,sc.nrequest1time
check_all = True
for i in xrange(len(self.slave_request_queue)):
ri = -(i+1)
r = self.slave_request_queue[ri]
if r["request_slave_name"]==sc.name:
sc.handle1request(r)
print "after",sc.nhandling,sc.nrequest1time
del self.slave_request_queue[ri]
check_all = False
break
def get_slave(self,slavename):
print "want to get_slave",slavename
myq = Queue()
sreq = {"request_slave_name":"slave1","invokerq":myq}
self.slave_request_queue.append(sreq)
self.handle_slave_request()
slave = myq.get()
print "got_slave",slavename
return slave
def put_slave(self,sc):
print "put_slave",sc.name
sc.end1request()
self.handle_slave_request()
def dispatch(self,msg):
iid = msg.get("iid",0)
q = self.invokerq.get(iid,None)
if q:
q.put_nowait(msg)
gevent.sleep(0)
else:
print "discard %s"%(msg)
def sconnect_handler(self,sock, address):
print('New connection from %s:%s' % address)
fileobj = sock.makefile()
class obj(object):pass
info = obj()
info.sname = None
def init_slave():
print "init slave"
iid,q = self.new_invoker()
msg = {'iid':iid,'itype':'slave_info_query'}
sendmsg2fobj(fileobj,msg)
rmsg = q.get()
if rmsg.get("ctype")=="session":
sname = rmsg["slave_name"]
self.update_slave_status(sname,online=True,fobj = fileobj)
info.sname = sname
elif rmsg.get("ctype")=="handle":
msg2handle = rmsg["msg2handle"]
print "handle msg:",msg2handle
if msg2handle.get("itype")=="runcmd":
def runcmd():
iid,q = self.new_invoker()
rmsg = msg2handle.copy()
rmsg["runlocal"]=True
rmsg["iid"]=iid
sendmsg2fobj(fileobj,rmsg)
print "----------------"
while 1:
rmsg = q.get()
stdio = rmsg["stdio"]
if stdio:
if stdio[0]=="stderr":
print "err:",
print stdio[1],
else:
break
fileobj.close()
gruncmd = gevent.spawn(runcmd)
self.del_invoker(iid)
print "finish init slave"
gevent.spawn(init_slave)
while True:
try:
line = fileobj.readline()
except AttributeError,e:
break
if not line:
print ("slave disconnected")
break
try:
msg = json.loads(line)
self.dispatch(msg)
except ValueError,e:
print e
if info.sname:
self.update_slave_status(info.sname,online=False,fobj=None)
def main():
m = master()
def sconnect_handler(sock, address):
m.sconnect_handler(sock, address)
server = StreamServer(('0.0.0.0', 6000), sconnect_handler)
def run_cmd_slave1():
iid,q = m.new_invoker()
for i in range(3):
sc = m.get_slave("slave1")
print "%d got slave :%s"%(i,sc.name)
iparam = {
"iid":iid,
"itype":"runcmd",
"cmd":"ping localhost -c 3"
}
sc.invoke(iparam)
rmsg = q.get()
print rmsg
m.put_slave(sc)
#gevent.sleep(8)
m.del_invoker(iid)
gevent.spawn(run_cmd_slave1)
print('Starting master server on port 6000')
server.serve_forever()
print('Stop master server')
if __name__ == '__main__':
main()
#! /usr/bin/env python
#coding=utf-8
import gevent
from gevent.socket import create_connection
from gevent.subprocess import Popen, PIPE
from gevent.queue import Queue
import json
def sendmsg2fobj(fobj,msg):
fobj.write(json.dumps(msg)+"\n")
fobj.flush()
class slave(object):
def __init__(self,msg2handle=None):
self.fileobj = None
self.msg2handle = msg2handle
self.sp_stdioq = Queue()
self.sp_returncode = None
def handle_msg(self,msg):
if msg.get("itype")=="slave_info_query":
rmsg = {}
rmsg["slave_name"]="slave1"
rmsg["mtype"]="info"
if self.msg2handle:
rmsg["ctype"]="handle"
else:
rmsg["ctype"]="session"
rmsg["iid"]=msg["iid"]
rmsg["msg2handle"]=self.msg2handle
sendmsg2fobj(self.fileobj,rmsg)
elif msg.get("itype")=="runcmd":
if msg.get("runlocal")==True:
def runlocal():
cmd = msg["cmd"]
print "run cmd:'%s'"%(cmd)
sp = Popen([cmd], stdout = PIPE, stderr = PIPE, shell=True)
def stdout2q():
l = sp.stdout.readline()
while l:
self.sp_stdioq.put_nowait(("stdout",l))
l = sp.stdout.readline()
def stderr2q():
l = sp.stderr.readline()
while l:
self.sp_stdioq.put_nowait(("stderr",l))
l = sp.stderr.readline()
def q2master():
while 1:
stdio = self.sp_stdioq.get()
rmsg = {}
rmsg["mtype"] = "stdio"
rmsg["iid"]=msg["iid"]
rmsg["stdio"]=stdio
if self.fileobj:
sendmsg2fobj(self.fileobj,rmsg)
else:
print "%s not sent"%(rmsg)
gstdout2q = gevent.spawn(stdout2q)
gstderr2q = gevent.spawn(stderr2q)
gq2master = gevent.spawn(q2master)
sp.wait()
self.sp_returncode = sp.returncode
print "cmd run return:%d"%(self.sp_returncode)
rmsg = {}
rmsg["mtype"] = "stdio"
rmsg["iid"]=msg["iid"]
rmsg["stdio"]=None
sendmsg2fobj(self.fileobj,rmsg)
grunlocal = gevent.spawn(runlocal)
#print("grunlocal:%s"%(grunlocal))
else:
def runcmd():
cslave = slave(msg2handle=msg)
cslave.main()
rmsg = {
"iid":msg["iid"],
"itype":"runcmd",
"cmd":msg["cmd"],
"sp_returncode":cslave.sp_returncode
}
sendmsg2fobj(self.fileobj,rmsg)
gruncmd = gevent.spawn(runcmd)
#print("gruncmd:%s"%(gruncmd))
else:
print "discard msg:%s"%(msg)
def main(self):
self.sock = create_connection(("localhost",6000))
self.fileobj = self.sock.makefile()
while True:
line = self.fileobj.readline()
if not line:
print ("server disconnected")
break
try:
msg = json.loads(line)
print "receive %s"%(msg)
self.handle_msg(msg)
except ValueError,e:
print e
self.fileobj = None
if __name__ == '__main__':
s = slave()
s.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment