Last active
March 26, 2019 18:06
-
-
Save akrisanov/3d86399a8bcaec7d341907b901bb4661 to your computer and use it in GitHub Desktop.
Python Concurrency
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def fib(n): | |
if n <= 2: | |
return 1 | |
else: | |
return fib(n-1) + fib(n-2) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from socket import * | |
from fib import fib | |
def fib_server(address): | |
sock = socket(AF_INET, SOCK_STREAM) | |
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) | |
sock.bind(address) | |
sock.listen(5) | |
while True: | |
client, addr = sock.accept() # blocking | |
print(f"Connection {addr}") | |
fib_handler(client) | |
def fib_handler(client): | |
while True: | |
req = client.recv(100) # blocking | |
if not req: | |
break | |
n = int(req) | |
result = fib(n) | |
resp = str(result).encode('ascii') + b'\n' | |
client.send(resp) # blocking (potentially) | |
print("Closed") | |
fib_server(("", 5000)) | |
# Run 2 connections simultaneously: nc localhost 5000 | |
# You will notice that second one is hanging. | |
# Our server handles only one client at the time. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from socket import * | |
from fib import fib | |
from collections import deque | |
from select import select | |
tasks = deque() | |
recv_wait = {} # Mapping sockets -> tasks (generators) | |
send_wait = {} | |
def run(): | |
while any([tasks, recv_wait, send_wait]): | |
while not tasks: | |
# No active tasks to run | |
# wait for I/O | |
can_recv, can_send, _ = select(recv_wait, send_wait, []) | |
for s in can_recv: | |
tasks.append(recv_wait.pop(s)) | |
for s in can_send: | |
tasks.append(send_wait.pop(s)) | |
task = tasks.popleft() | |
try: | |
why, what = next(task) # Run to the yield | |
if why == 'recv': | |
recv_wait[what] = task | |
elif why == 'send': | |
send_wait[what] = task | |
else: | |
raise RuntimeError("ARG!") | |
except StopIteration: | |
print("Task done") | |
def fib_server(address): | |
sock = socket(AF_INET, SOCK_STREAM) | |
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) | |
sock.bind(address) | |
sock.listen(5) | |
while True: | |
yield 'recv', sock | |
client, addr = sock.accept() # blocking | |
print(f"Connection {addr}") | |
tasks.append(fib_handler(client)) | |
def fib_handler(client): | |
while True: | |
yield 'recv', client | |
req = client.recv(100) # blocking | |
if not req: | |
break | |
n = int(req) | |
result = fib(n) | |
resp = str(result).encode('ascii') + b'\n' | |
yield 'send', client | |
client.send(resp) # blocking (potentially) | |
print("Closed") | |
tasks.append(fib_server(("", 5000))) | |
run() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from socket import * | |
from fib import fib | |
from threading import Thread | |
def fib_server(address): | |
sock = socket(AF_INET, SOCK_STREAM) | |
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) | |
sock.bind(address) | |
sock.listen(5) | |
while True: | |
client, addr = sock.accept() | |
print(f"Connection {addr}") | |
Thread(target=fib_handler, args=(client,), daemon=True).start() | |
def fib_handler(client): | |
while True: | |
req = client.recv(100) | |
if not req: | |
break | |
n = int(req) | |
result = fib(n) | |
resp = str(result).encode('ascii') + b'\n' | |
client.send(resp) | |
print("Closed") | |
fib_server(("", 5000)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from socket import * | |
from fib import fib | |
from threading import Thread | |
from concurrent.futures import ProcessPoolExecutor as Pool | |
def fib_server(address): | |
sock = socket(AF_INET, SOCK_STREAM) | |
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) | |
sock.bind(address) | |
sock.listen(5) | |
while True: | |
client, addr = sock.accept() | |
print(f"Connection {addr}") | |
Thread(target=fib_handler, args=(client,), daemon=True).start() | |
pool = Pool(4) | |
def fib_handler(client): | |
while True: | |
req = client.recv(100) | |
if not req: | |
break | |
n = int(req) | |
future = pool.submit(fib, n) | |
result = future.result() | |
resp = str(result).encode('ascii') + b'\n' | |
client.send(resp) | |
print("Closed") | |
fib_server(("", 5000)) | |
# One downside of this approach is overhead of working with (sub)processes itself. | |
# We can notice that perf2 results 10 times lower comparing to the fib_server_threads program. | |
# 2701 req/sec | |
# 2732 req/sec | |
# 2707 req/sec | |
# 2764 req/sec | |
# 2737 req/sec | |
# But at the same time, it process pool helps to balance computations and our server would not be | |
# hammered much by the heavy task |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from socket import * | |
from threading import Thread | |
import time | |
sock = socket(AF_INET, SOCK_STREAM) | |
sock.connect(("localhost", 5000)) | |
n = 0 | |
def monitor(): | |
global n | |
while True: | |
time.sleep(1) | |
print(n, 'req/sec') | |
n = 0 | |
Thread(target=monitor).start() | |
while True: | |
sock.send(b'1') | |
resp = sock.recv(100) | |
n += 1 | |
# Example: running perf2 program and passing "big" number to our fib server | |
# 21717 req/sec | |
# 25164 req/sec | |
# 24865 req/sec | |
# 25188 req/sec | |
# 6954 req/sec <- we notice that GIL prioritize CPU extensive task and that affects our requests | |
# 148 req/sec <- it has to do with internal implementation of GIL itself and it's not how OS do things | |
# 147 req/sec | |
# 145 req/sec | |
# 148 req/sec | |
# 140 req/sec | |
# 146 req/sec | |
# 144 req/sec |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from socket import * | |
import time | |
sock = socket(AF_INET, SOCK_STREAM) | |
sock.connect(("localhost", 5000)) | |
while True: | |
start = time.time() | |
sock.send(b'30') | |
resp = sock.recv(100) | |
end = time.time() | |
print(end-start) | |
# Running >= 1 copies of this program we can see how GIL affects the computation on a single core. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment