Skip to content

Instantly share code, notes, and snippets.

@akrisanov
Last active March 26, 2019 18:06
Show Gist options
  • Save akrisanov/3d86399a8bcaec7d341907b901bb4661 to your computer and use it in GitHub Desktop.
Save akrisanov/3d86399a8bcaec7d341907b901bb4661 to your computer and use it in GitHub Desktop.
Python Concurrency
def fib(n):
if n <= 2:
return 1
else:
return fib(n-1) + fib(n-2)
from socket import *
from fib import fib
def fib_server(address):
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
while True:
client, addr = sock.accept() # blocking
print(f"Connection {addr}")
fib_handler(client)
def fib_handler(client):
while True:
req = client.recv(100) # blocking
if not req:
break
n = int(req)
result = fib(n)
resp = str(result).encode('ascii') + b'\n'
client.send(resp) # blocking (potentially)
print("Closed")
fib_server(("", 5000))
# Run 2 connections simultaneously: nc localhost 5000
# You will notice that second one is hanging.
# Our server handles only one client at the time.
from socket import *
from fib import fib
from collections import deque
from select import select
tasks = deque()
recv_wait = {} # Mapping sockets -> tasks (generators)
send_wait = {}
def run():
while any([tasks, recv_wait, send_wait]):
while not tasks:
# No active tasks to run
# wait for I/O
can_recv, can_send, _ = select(recv_wait, send_wait, [])
for s in can_recv:
tasks.append(recv_wait.pop(s))
for s in can_send:
tasks.append(send_wait.pop(s))
task = tasks.popleft()
try:
why, what = next(task) # Run to the yield
if why == 'recv':
recv_wait[what] = task
elif why == 'send':
send_wait[what] = task
else:
raise RuntimeError("ARG!")
except StopIteration:
print("Task done")
def fib_server(address):
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
while True:
yield 'recv', sock
client, addr = sock.accept() # blocking
print(f"Connection {addr}")
tasks.append(fib_handler(client))
def fib_handler(client):
while True:
yield 'recv', client
req = client.recv(100) # blocking
if not req:
break
n = int(req)
result = fib(n)
resp = str(result).encode('ascii') + b'\n'
yield 'send', client
client.send(resp) # blocking (potentially)
print("Closed")
tasks.append(fib_server(("", 5000)))
run()
from socket import *
from fib import fib
from threading import Thread
def fib_server(address):
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
while True:
client, addr = sock.accept()
print(f"Connection {addr}")
Thread(target=fib_handler, args=(client,), daemon=True).start()
def fib_handler(client):
while True:
req = client.recv(100)
if not req:
break
n = int(req)
result = fib(n)
resp = str(result).encode('ascii') + b'\n'
client.send(resp)
print("Closed")
fib_server(("", 5000))
from socket import *
from fib import fib
from threading import Thread
from concurrent.futures import ProcessPoolExecutor as Pool
def fib_server(address):
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
while True:
client, addr = sock.accept()
print(f"Connection {addr}")
Thread(target=fib_handler, args=(client,), daemon=True).start()
pool = Pool(4)
def fib_handler(client):
while True:
req = client.recv(100)
if not req:
break
n = int(req)
future = pool.submit(fib, n)
result = future.result()
resp = str(result).encode('ascii') + b'\n'
client.send(resp)
print("Closed")
fib_server(("", 5000))
# One downside of this approach is overhead of working with (sub)processes itself.
# We can notice that perf2 results 10 times lower comparing to the fib_server_threads program.
# 2701 req/sec
# 2732 req/sec
# 2707 req/sec
# 2764 req/sec
# 2737 req/sec
# But at the same time, it process pool helps to balance computations and our server would not be
# hammered much by the heavy task
from socket import *
from threading import Thread
import time
sock = socket(AF_INET, SOCK_STREAM)
sock.connect(("localhost", 5000))
n = 0
def monitor():
global n
while True:
time.sleep(1)
print(n, 'req/sec')
n = 0
Thread(target=monitor).start()
while True:
sock.send(b'1')
resp = sock.recv(100)
n += 1
# Example: running perf2 program and passing "big" number to our fib server
# 21717 req/sec
# 25164 req/sec
# 24865 req/sec
# 25188 req/sec
# 6954 req/sec <- we notice that GIL prioritize CPU extensive task and that affects our requests
# 148 req/sec <- it has to do with internal implementation of GIL itself and it's not how OS do things
# 147 req/sec
# 145 req/sec
# 148 req/sec
# 140 req/sec
# 146 req/sec
# 144 req/sec
from socket import *
import time
sock = socket(AF_INET, SOCK_STREAM)
sock.connect(("localhost", 5000))
while True:
start = time.time()
sock.send(b'30')
resp = sock.recv(100)
end = time.time()
print(end-start)
# Running >= 1 copies of this program we can see how GIL affects the computation on a single core.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment