Skip to content

Instantly share code, notes, and snippets.

@allenyang79
Last active October 12, 2017 06:02
Show Gist options
  • Save allenyang79/b03040f1d779516b2650bcc431859e17 to your computer and use it in GitHub Desktop.
Save allenyang79/b03040f1d779516b2650bcc431859e17 to your computer and use it in GitHub Desktop.
message queue by pipe.
import time
import logging
from multiprocessing import Process, Pipe, Queue
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s|%(levelname)-5.5s|%(processName)-10s| %(message)s')
logger = logging.getLogger()
def master():
logger.info('master start')
p, s = Pipe()
ping_process = Process(name='ping', target=ping, args=(p,))
ping_process.start()
time.sleep(1)
logger.info('master send start')
s.send(0)
s.send(3)
s.send(5)
logger.info('master send done')
def ping(p):
logger.info('ping init')
while True:
args = p.recv()
logger.info('ping %s start', args)
time.sleep(1)
logger.info('ping %s done', args)
master()
import time
import logging
from multiprocessing import Process, Pipe, Queue
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s|%(levelname)-5.5s|%(processName)-10s| %(message)s')
logger = logging.getLogger()
def master():
logger.info('master start')
p1, s1 = Pipe()
p2, s2 = Pipe()
ping_process = Process(name='ping', target=ping, args=(p2, s1)).start()
pong_process = Process(name='pong', target=pong, args=(p1, s2)).start()
time.sleep(3)
s2.send(0)
def ping(p2, s1):
logger.info('ping init')
while True:
args = p2.recv()
logger.info('ping %s', args)
time.sleep(1)
ret = args
ret = ret + 1
s1.send(ret)
if ret > 10:
break
def pong(p1, s2):
logger.info('pong init')
while True:
args = p1.recv()
logger.info('pong %s', args)
time.sleep(1)
ret = args + 1
s2.send(ret)
if ret > 10:
break
master()
import multiprocessing
import time
def task(i):
# do something
time.sleep(1)
return i * i
def main():
#multiprocessing.freeze_support()
pool = multiprocessing.Pool(10)
cpus = multiprocessing.cpu_count()
results = []
for i in xrange(0, cpus):
result = pool.apply_async(task, args=(i,))
#print(result.get())
results.append(result)
pool.close()
pool.join()
for result in results:
print(result.get())
main()
import multiprocessing as mp
import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s|%(levelname)-5.5s|%(processName)-10s| %(message)s')
logger = logging.getLogger()
q = mp.Queue(2)
# put, if queue is full, then block process
# put_nowait, if queue is full, then raise Queue.Full Exception.
q.put([0])
q.put_nowait([0, 1])
q.put_nowait([0, 1, 2])
print q.get()
print q.get()
print q.get()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment