Last active
March 31, 2018 20:06
-
-
Save nad2000/8230720 to your computer and use it in GitHub Desktop.
Threading with Python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def fib(n): | |
if n <= 2: | |
return 1 | |
else: | |
return fib(n-1) + fib(n-2) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import urllib2 | |
from multiprocessing.dummy import Pool as ThreadPool | |
import time | |
urls = [ | |
'http://www.python.org', | |
'http://www.python.org/about/', | |
'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html', | |
'http://www.python.org/doc/', | |
'http://www.python.org/download/', | |
'http://www.python.org/getit/', | |
'http://www.python.org/community/', | |
'https://wiki.python.org/moin/', | |
'http://planet.python.org/', | |
'https://wiki.python.org/moin/LocalUserGroups', | |
'http://www.python.org/psf/', | |
'http://docs.python.org/devguide/', | |
'http://www.python.org/community/awards/' | |
# etc.. | |
] | |
# Make the Pool of workers | |
pool = ThreadPool(4) | |
# Open the urls in their own threads | |
# and return the results | |
results = pool.map(urllib2.urlopen, urls) | |
# close the pool and wait for the work to finish | |
pool.close() | |
pool.join() | |
#%% | |
class Timer(): | |
@classmethod | |
def start(cls): | |
cls._start = time.time() | |
@classmethod | |
def elapsed(cls): | |
return time.time() - cls._start | |
@classmethod | |
def show(cls): | |
print "*** Elapsed: %0.5f" % cls.elapsed() | |
#%% --- Single thread ----- # | |
Timer.start() | |
results = [] | |
for url in urls: | |
result = urllib2.urlopen(url) | |
results.append(result) | |
Timer.show() | |
#%% ------- 4 Pool ------- # | |
Timer.start() | |
pool = ThreadPool(4) | |
results = pool.map(urllib2.urlopen, urls) | |
Timer.show() | |
#%% ------- 8 Pool ------- # | |
Timer.start() | |
pool = ThreadPool(8) | |
results = pool.map(urllib2.urlopen, urls) | |
Timer.show() | |
#%% ------- 13 Pool ------- # | |
Timer.start() | |
pool = ThreadPool(13) | |
results = pool.map(urllib2.urlopen, urls) | |
Timer.show() | |
# Single thread: 9.35082 sec | |
# 4 Pool: 3.09248 sec | |
# 8 Pool: 2.08774 sec | |
# 13 Pool: 1.67926 sec |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# perf1.py | |
# Time of a long running request | |
from socket import * | |
import time | |
sock = socket(AF_INET, SOCK_STREAM) | |
sock.connect(('localhost', 25000)) | |
while True: | |
start = time.time() | |
sock.send(b'30') | |
resp =sock.recv(100) | |
end = time.time() | |
print(end-start) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# perf2.py | |
# requests/sec of fast requests | |
from socket import * | |
import time | |
sock = socket(AF_INET, SOCK_STREAM) | |
sock.connect(('localhost', 25000)) | |
n = 0 | |
from threading import Thread | |
def monitor(): | |
global n | |
while True: | |
time.sleep(1) | |
print(n, 'reqs/sec') | |
n = 0 | |
Thread(target=monitor).start() | |
while True: | |
sock.send(b'1') | |
resp =sock.recv(100) | |
n += 1 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
A more realistic thread pool example | |
Practical threaded programming with Python | |
http://www.ibm.com/developerworks/aix/library/au-threadingpython/ | |
""" | |
import time | |
import threading | |
import Queue | |
import urllib2 | |
#%% | |
class Consumer(threading.Thread): | |
def __init__(self, queue): | |
threading.Thread.__init__(self) | |
self._queue = queue | |
def run(self): | |
while True: | |
content = self._queue.get() | |
if isinstance(content, str) and content == 'quit': | |
break | |
urllib2.urlopen(content) | |
print 'Bye byes!' | |
def Producer(): | |
urls = [ | |
'http://www.python.org', 'http://www.yahoo.com' | |
'http://www.scala.org', 'http://www.google.com' | |
# etc.. | |
] | |
queue = Queue.Queue() | |
worker_threads = build_worker_pool(queue, 4) | |
start_time = time.time() | |
# Add the urls to process | |
for url in urls: | |
queue.put(url) | |
# Add the poison pillv | |
for worker in worker_threads: | |
queue.put('quit') | |
for worker in worker_threads: | |
worker.join() | |
print 'Done! Time taken: {}'.format(time.time() - start_time) | |
def build_worker_pool(queue, size): | |
workers = [] | |
for _ in range(size): | |
worker = Consumer(queue) | |
worker.start() | |
workers.append(worker) | |
return workers | |
#%% | |
if __name__ == '__main__': | |
Producer() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# server.py | |
# Fib microservice | |
from socket import * | |
from fib import fib | |
from threading import Thread | |
def fib_server(address): | |
sock = socket(AF_INET, SOCK_STREAM) | |
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) | |
sock.bind(address) | |
sock.listen(5) | |
while True: | |
client, addr = sock.accept() | |
print("Connection", addr) | |
fib_handler(client) | |
def fib_handler(client): | |
while True: | |
req = client.recv(100) | |
if not req: | |
break | |
n = int(req) | |
result = fib(n) | |
resp = str(result).encode('ascii') + b'\n' | |
client.send(resp) | |
print("Closed") | |
fib_server(('',25000)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# server.py | |
# Fib microservice | |
from socket import * | |
from fib import fib | |
from threading import Thread | |
def fib_server(address): | |
sock = socket(AF_INET, SOCK_STREAM) | |
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) | |
sock.bind(address) | |
sock.listen(5) | |
while True: | |
client, addr = sock.accept() | |
print("Connection", addr) | |
Thread(target=fib_handler, args=(client,), daemon=True).start() | |
def fib_handler(client): | |
while True: | |
req = client.recv(100) | |
if not req: | |
break | |
n = int(req) | |
result = fib(n) | |
resp = str(result).encode('ascii') + b'\n' | |
client.send(resp) | |
print("Closed") | |
fib_server(('',25000)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# server.py | |
# Fib microservice | |
from socket import * | |
from fib import fib | |
from threading import Thread | |
from concurrent.futures import ProcessPoolExecutor as Pool | |
pool = Pool(4) | |
def fib_server(address): | |
sock = socket(AF_INET, SOCK_STREAM) | |
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) | |
sock.bind(address) | |
sock.listen(5) | |
while True: | |
client, addr = sock.accept() | |
print("Connection", addr) | |
Thread(target=fib_handler, args=(client,), daemon=True).start() | |
def fib_handler(client): | |
while True: | |
req = client.recv(100) | |
if not req: | |
break | |
n = int(req) | |
future = pool.submit(fib, n) | |
result = future.result() | |
resp = str(result).encode('ascii') + b'\n' | |
client.send(resp) | |
print("Closed") | |
fib_server(('',25000)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Standard Producer/Consumer Threading Pattern | |
""" | |
import time | |
import threading | |
import Queue | |
#%% | |
class Consumer(threading.Thread): | |
def __init__(self, queue): | |
threading.Thread.__init__(self) | |
self._queue = queue | |
def run(self): | |
while True: | |
# queue.get() blocks the current thread until | |
# an item is retrieved. | |
msg = self._queue.get() | |
# Checks if the current message is | |
# the "Poison Pill" | |
if isinstance(msg, str) and msg == 'quit': | |
# if so, exists the loop | |
break | |
# "Processes" (or in our case, prints) the queue item | |
print "I'm a thread, and I received %s!!" % msg | |
# Always be friendly! | |
print 'Bye byes!' | |
def Producer(): | |
# Queue is used to share items between | |
# the threads. | |
queue = Queue.Queue() | |
# Create an instance of the worker | |
worker = Consumer(queue) | |
# start calls the internal run() method to | |
# kick off the thread | |
worker.start() | |
# variable to keep track of when we started | |
start_time = time.time() | |
# While under 5 seconds.. | |
while time.time() - start_time < 5: | |
# "Produce" a piece of work and stick it in | |
# the queue for the Consumer to process | |
queue.put('something at %s' % time.time()) | |
# Sleep a bit just to avoid an absurd number of messages | |
time.sleep(1) | |
# This the "poison pill" method of killing a thread. | |
queue.put('quit') | |
# wait for the thread to close down | |
worker.join() | |
#%% | |
if __name__ == '__main__': | |
Producer() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment