Last active
August 3, 2016 15:48
-
-
Save phizaz/c62162f45ca1f453ce265d09222ee96d to your computer and use it in GitHub Desktop.
Python Getting A Free Port Number : A Multiprocess-safe Recipe
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import fasteners | |
import threading | |
class BindFreePort(object): | |
def __init__(self, start, stop): | |
self.port = None | |
import random, socket | |
self.sock = socket.socket() | |
while True: | |
port = random.randint(start, stop) | |
try: | |
self.sock.bind(('', port)) | |
self.port = port | |
break | |
except Exception: | |
continue | |
def release(self): | |
assert self.port is not None | |
self.sock.close() | |
class FreePort(object): | |
used_ports = set() | |
def __init__(self, start=4000, stop=6000): | |
self.lock = None | |
self.bind = None | |
self.port = None | |
from fasteners.process_lock import InterProcessLock | |
import time | |
while True: | |
bind = BindFreePort(start, stop) | |
if bind.port in self.used_ports: | |
bind.release() | |
continue | |
''' | |
Since we cannot be certain the user will bind the port 'immediately' (actually it is not possible using | |
this flow. We must ensure that the port will not be reacquired even it is not bound to anything | |
''' | |
lock = InterProcessLock(path='/tmp/socialdna/port_{}_lock'.format(bind.port)) | |
success = lock.acquire(blocking=False) | |
if success: | |
self.lock = lock | |
self.port = bind.port | |
self.used_ports.add(bind.port) | |
bind.release() | |
break | |
bind.release() | |
time.sleep(0.01) | |
def release(self): | |
assert self.lock is not None | |
assert self.port is not None | |
self.used_ports.remove(self.port) | |
self.lock.release() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from freeport import FreePort | |
def get_and_bind_freeport(*args): | |
freeport = FreePort(start=4000, stop=4009) | |
import time | |
time.sleep(1) | |
return freeport.port | |
class FreePortClassTest(unittest.TestCase): | |
def test_one_port(self): | |
freeport = FreePort(start=4000, stop=4000) | |
self.assertEqual(freeport.port, 4000) | |
freeport.release() | |
def test_many_ports(self): | |
freeport = FreePort(start=4000, stop=4000) | |
self.assertEqual(freeport.port, 4000) | |
freeport.release() | |
freeport = FreePort(start=4000, stop=4000) | |
self.assertEqual(freeport.port, 4000) | |
freeport.release() | |
def test_many_ports_conflict(self): | |
def get_port(): | |
freeport = FreePort(start=4000, stop=4000) | |
return freeport.port | |
def run(): | |
self.assertEqual(get_port(), 4000) | |
freeport = FreePort(start=4000, stop=4000) | |
self.assertEqual(freeport.port, 4000) | |
from multiprocessing import Process | |
p = Process(target=run) | |
p.start() | |
p.join(0.1) | |
self.assertTrue(p.is_alive(), 'the process should find it hard to acquire a free port') | |
p.terminate() | |
p.join() | |
freeport.release() | |
def test_multithread_race_condition(self): | |
from multiprocessing.pool import ThreadPool | |
jobs = 100 | |
def get_and_bind_freeport(*args): | |
freeport = FreePort(start=4000, stop=4000 + jobs - 1) | |
import time | |
time.sleep(1) | |
freeport.release() # needed because thread will not turn back the file descriptor | |
return freeport.port | |
p = ThreadPool(jobs) | |
ports = p.map(get_and_bind_freeport, range(jobs)) | |
self.assertEqual(len(ports), len(set(ports))) | |
def test_multiprocess_race_condition(self): | |
from multiprocessing.pool import Pool | |
p = Pool(10) | |
ports = p.map(get_and_bind_freeport, range(10)) | |
self.assertEqual(len(ports), len(set(ports))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment