Last active
August 29, 2015 13:56
-
-
Save jbpotonnier/9180033 to your computer and use it in GitHub Desktop.
Using semaphore to allocate resources.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Type ipython -i client.py and interract with surfer_queue and agent in the command line | |
""" | |
from connector import ConnectorManager, Agent | |
ConnectorManager.register('get_surfer_queue') | |
ConnectorManager.register('get_agent_pool') | |
if __name__ == '__main__': | |
address = ('localhost', 8888) | |
manager = ConnectorManager(address, 'password') | |
manager.connect() | |
print 'client connected to ConnectorManager on {}:{}'.format(*address) | |
surfer_queue = manager.get_surfer_queue() | |
agent_pool = manager.get_agent_pool() | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Network (withSocketsDo, accept, listenOn, PortID(PortNumber)) | |
import Control.Concurrent (forkIO) | |
import System.IO (Handle, hClose, hGetLine, hSetBuffering, BufferMode (..)) | |
import Control.Applicative ((<$>), (<*>)) | |
import Control.Monad (forever) | |
import Control.Monad.STM (atomically) | |
import Control.Concurrent.STM (STM) | |
import Control.Concurrent.STM.TChan (TChan, newTChan, writeTChan, readTChan) | |
import Control.Concurrent.STM.TVar (TVar, newTVar, writeTVar, readTVar) | |
import Control.Concurrent.STM.SSem (SSem) | |
import qualified Control.Concurrent.STM.SSem as SSem | |
import Data.List (sort) | |
type Surfer = String | |
type Agent = String | |
data AgentPool = AgentPool (TVar [(Int, Agent)]) SSem | |
enqueue :: TChan Surfer -> String -> IO () | |
enqueue surferChan surfer = do | |
atomically $ writeTChan surferChan surfer | |
putStrLn $ "surfer " ++ surfer ++ " enqueued" | |
addAgent :: AgentPool -> Agent -> Int -> IO () | |
addAgent (AgentPool agentsTVar slots) agent nbSlots = do | |
atomically $ do | |
agents <- readTVar agentsTVar | |
writeTVar agentsTVar $ (nbSlots, agent):agents | |
SSem.signalN slots nbSlots | |
putStrLn $ "agent " ++ agent ++ " added" | |
acquireAgent :: AgentPool -> IO String | |
acquireAgent (AgentPool agentsTVar slots) = | |
atomically $ do | |
SSem.wait slots | |
agents <- readTVar $ agentsTVar | |
let (nbSlots, name) = (!! 0) . reverse . sort $ agents | |
let newAgents = (nbSlots - 1, name):filter ((/= name) . snd) agents | |
writeTVar agentsTVar $ newAgents | |
return name | |
handleRequest :: AgentPool -> TChan Surfer -> Handle -> IO () | |
handleRequest agentPool surferChan h = loop | |
where | |
loop = do | |
hSetBuffering h NoBuffering | |
command <- hGetLine h | |
case words command of | |
["enqueue", surfer] -> do | |
enqueue surferChan surfer | |
loop | |
["add_agent", name, nb_max_slots] -> do | |
addAgent agentPool name (read nb_max_slots) | |
loop | |
["bye"] -> | |
hClose h | |
_ -> loop | |
connect :: AgentPool -> TChan Surfer -> IO () | |
connect agentPool surferChan = do | |
surfer <- atomically $ readTChan surferChan | |
agent <- acquireAgent agentPool | |
putStrLn $ "connected " ++ agent ++ " and " ++ surfer | |
serve :: AgentPool -> TChan Surfer -> IO () | |
serve agentPool surferChan = withSocketsDo $ do | |
sock <- listenOn $ PortNumber 5002 | |
forever $ do | |
(h, _ , _) <- accept sock | |
forkIO $ handleRequest agentPool surferChan h | |
newAgentPool :: STM AgentPool | |
newAgentPool = AgentPool <$> newTVar [] <*> SSem.new 0 | |
main :: IO () | |
main = do | |
surferChan <- atomically newTChan | |
agentPool <- atomically newAgentPool | |
_ <- forkIO $ forever (connect agentPool surferChan) | |
serve agentPool surferChan | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from multiprocessing.managers import BaseManager | |
from multiprocessing import Process, Queue, Lock, Semaphore | |
from collections import namedtuple, defaultdict | |
Agent = namedtuple('Agent', 'name nb_max_slots') | |
class AgentPool(object): | |
def __init__(self): | |
self.agent_collection = AgentCollection() | |
self.conversations = {} | |
self.lock = Lock() | |
self.slots = Semaphore(0) | |
def add_agent(self, agent): | |
with self.lock: | |
self.agent_collection.add(agent) | |
print '{agent} added'.format(**locals()) | |
for _ in xrange(agent.nb_max_slots): | |
self.slots.release() | |
print '1 slot for {agent} added'.format(**locals()) | |
def remove_agent(self, agent): | |
# TODO | |
pass | |
def connect(self, surfer): | |
self.slots.acquire() | |
print 'an agent is available' | |
with self.lock: | |
agent = self.agent_collection.acquire() | |
self.conversations[surfer] = agent | |
print '{agent} and {surfer} connected'.format(**locals()) | |
def end_chat(self, surfer): | |
with self.lock: | |
agent = self.conversations[surfer] | |
self.agent_collection.release(agent) | |
del self.conversations[surfer] | |
self.slots.release() | |
print 'chat between {agent} and {surfer} ended'.format(**locals()) | |
class AgentCollection(object): | |
def __init__(self): | |
self.agents_by_nb_slots = defaultdict(list) | |
self.nb_slot_by_agent = {} | |
def add(self, agent): | |
self.agents_by_nb_slots[agent.nb_max_slots].append(agent) | |
self.nb_slot_by_agent[agent] = agent.nb_max_slots | |
def remove(self, agent): | |
# TODO | |
pass | |
def acquire(self): | |
max_free_slots = max(self.agents_by_nb_slots.iterkeys()) | |
agent = self.agents_by_nb_slots[max_free_slots].pop(0) | |
if not self.agents_by_nb_slots[max_free_slots]: | |
del self.agents_by_nb_slots[max_free_slots] | |
self.agents_by_nb_slots[max_free_slots - 1].append(agent) | |
self.nb_slot_by_agent[agent] -= 1 | |
return agent | |
def release(self, agent): | |
nb_slots = self.nb_slot_by_agent[agent] | |
self.agents_by_nb_slots[nb_slots].remove(agent) | |
self.agents_by_nb_slots[nb_slots + 1].append(agent) | |
self.nb_slot_by_agent[agent] += 1 | |
def connect(surfer_queue, agent_pool): | |
print 'connect loop started' | |
while True: | |
surfer = surfer_queue.get() | |
agent_pool.connect(surfer) | |
class ConnectorManager(BaseManager): | |
pass | |
if __name__ == '__main__': | |
surfer_queue = Queue() | |
ConnectorManager.register('get_surfer_queue', lambda: surfer_queue) | |
agent_pool = AgentPool() | |
ConnectorManager.register('get_agent_pool', lambda: agent_pool) | |
address = ('localhost', 8888) | |
manager = ConnectorManager(address, 'password') | |
manager.start() | |
print 'ConnectorManager started on {}:{}'.format(*address) | |
connector = Process(target=connect, args=(surfer_queue, manager.get_agent_pool())) | |
connector.start() | |
connector.join() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment