Last active
August 6, 2020 03:56
-
-
Save songron/4335629 to your computer and use it in GitHub Desktop.
How to use gevent?
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Gevent 在Python 中的简单用法;多线程控制利器。 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#coding=utf8 | |
import gevent.monkey | |
gevent.monkey.patch_all() | |
import gevent.queue | |
import gevent | |
import os | |
MAX_TASK = 50 # control the amount of tasks in task_queue. if not, memory may be used up | |
N = 10 # number of workers | |
WORKER_PAUSE = 0.05 | |
outdir = './outdir' | |
infile = './infile' | |
task_queue = gevent.queue.JoinableQueue(MAX_TASK) | |
result_queue = gevent.queue.JoinableQueue() | |
def _do_something(): # up to you | |
import random | |
n = random.random() | |
gevent.sleep(n/10) | |
return n | |
def worker(): | |
''' | |
What do you want the worker to do? | |
''' | |
while True: | |
t = task_queue.get() | |
if t is None: | |
break | |
#do something for t(task) | |
r = _do_something() # up to you | |
result_queue.put( (t, r) ) | |
task_queue.task_done() | |
gevent.sleep( WORKER_PAUSE ) | |
task_queue.task_done() | |
print 'a worker quit!' | |
LINE_PER_FILE = 100 | |
def pipeline(): | |
''' | |
An independent thread: write results to disk. | |
filename: 0, 1, 2, ... | |
''' | |
fp = None | |
fno = 0 | |
counter = 0 | |
while True: | |
if counter >= LINE_PER_FILE and fp: #it's time to close the current file | |
fp.close() | |
print 'close file: ' + filename | |
counter = 0 | |
fp = None | |
fno += 1 | |
r = result_queue.get() | |
if fp is None: | |
filename = os.path.join( outdir, str(fno) ) | |
fp = open(filename, 'w') | |
fp.write( str(r) + '\n' ) #how do you want to output your result? | |
counter += 1 | |
result_queue.task_done() | |
print 'pipeline quit!' | |
#an independent thread: write to disk | |
def add_task(): | |
''' | |
How do you want to add your tasks? | |
''' | |
fp = open(infile) | |
for line in fp: | |
t = line.strip() | |
task_queue.put( int(t) ) | |
fp.close() | |
for i in xrange(N): | |
task_queue.put( None ) #inform workers to quit | |
print 'add task quit!' | |
def manager(): | |
gevent.spawn(pipeline) | |
gevent.spawn(add_task) | |
workers = [ gevent.spawn(worker) for i in xrange(N) ] | |
gevent.joinall(workers) #block until all workers quit | |
if __name__ == '__main__': | |
manager() | |
print 'Game over!!' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment