Last active
August 29, 2015 14:22
-
-
Save munro/2b60ba19ddfaa47b0990 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE FlexibleInstances #-} | |
{-# LANGUAGE FunctionalDependencies #-} | |
{-# LANGUAGE FlexibleContexts #-} | |
{-# LANGUAGE UndecidableInstances #-} | |
{-# LANGUAGE GADTs #-} | |
module Main where | |
import Control.Concurrent | |
import Control.Concurrent.STM | |
import Control.Monad.IO.Class | |
import Control.Exception | |
import qualified Data.Map as Map | |
import qualified Data.Set as Set | |
import Network | |
import System.IO.Temp | |
import GHC.IO.Handle | |
import Control.Monad.State.Lazy | |
{- Abstract simple network API -} | |
class Monad m => SimpleNetwork m network node message | network -> node message where | |
simpleCreateNode :: network -> m node | |
simpleSendMessage :: network -> node -> node -> message -> m () | |
simpleWaitMessage :: network -> node -> m message | |
{- Abstract model for network client API, I think I could use StateMonad to make this shorter -} | |
instance SimpleNetwork m network node message => Functor (NetworkClient m network client) where | |
fmap apply (NetworkClient transform) = NetworkClient (\state -> do | |
(a, network, client) <- transform state | |
return (apply a, network, client)) | |
instance SimpleNetwork m network node message => Applicative (NetworkClient m network client) where | |
pure a = NetworkClient (\(network, client) -> return (a, network, client)) | |
(NetworkClient applyM) <*> (NetworkClient transform) = NetworkClient (\state -> do | |
(a, network, client) <- transform state | |
(apply, network2, client2) <- applyM (network, client) | |
return (apply a, network2, client2)) | |
instance SimpleNetwork m network node message => Monad (NetworkClient m network client) where | |
(NetworkClient transform) >>= apply = NetworkClient (\state -> do | |
(a, network, client) <- transform state | |
let (NetworkClient transform2) = apply a | |
transform2 (network, client)) | |
data NetworkClient m network client a = NetworkClient | |
{ runNetworkClient :: (network, client) -> m (a, network, client) | |
} | |
simpleClientMessage :: SimpleNetwork m network node message => node -> message -> NetworkClient m network node () | |
simpleClientMessage to message = NetworkClient transform where | |
transform (network, from) = do | |
simpleSendMessage network from to message | |
return ((), network, from) | |
simpleClientWaitMessage :: SimpleNetwork m network node message => NetworkClient m network node message | |
simpleClientWaitMessage = NetworkClient transform where | |
transform (network, from) = do | |
msg <- simpleWaitMessage network from | |
return (msg, network, from) | |
{- Usage testing -} | |
simpleCreateNodeTest :: SimpleNetwork m network node message => network -> m node | |
simpleCreateNodeTest network = do | |
node <- simpleCreateNode network | |
return node | |
simpleSendMessageTestQueue :: SimpleNetwork m network node String => network -> m () | |
simpleSendMessageTestQueue network = do | |
nodeA <- simpleCreateNode network | |
nodeB <- simpleCreateNode network | |
simpleSendMessage network nodeA nodeB "hello" | |
simpleSendMessage network nodeB nodeA "world" | |
return () | |
simpleWaitMessageTest :: SimpleNetwork m network node String => network -> m [String] | |
simpleWaitMessageTest network = do | |
nodeA <- simpleCreateNode network | |
nodeB <- simpleCreateNode network | |
simpleSendMessage network nodeA nodeB "hello" | |
simpleSendMessage network nodeB nodeA "world" | |
msgA <- simpleWaitMessage network nodeA | |
msgB <- simpleWaitMessage network nodeB | |
return [msgA, msgB] | |
simpleWaitMessageTestConcurrent :: SimpleNetwork IO network node String => network -> IO [String] | |
simpleWaitMessageTestConcurrent network = do | |
userA <- simpleCreateNode network | |
userB <- simpleCreateNode network | |
aVar <- newEmptyMVar | |
bVar <- newEmptyMVar | |
forkIO $ do | |
a <- simpleWaitMessage network userA | |
putMVar aVar a | |
b <- simpleWaitMessage network userB | |
putMVar bVar b | |
forkIO $ do | |
simpleSendMessage network userA userB "hello" | |
simpleSendMessage network userB userA "world" | |
a <- takeMVar aVar | |
b <- takeMVar bVar | |
return [a, b] | |
simpleClientSendMessageTestQueue :: SimpleNetwork m network node String => network -> m () | |
simpleClientSendMessageTestQueue network = do | |
nodeA <- simpleCreateNode network | |
nodeB <- simpleCreateNode network | |
let (NetworkClient transformNodeB) = do | |
simpleClientMessage nodeB "foo" | |
simpleClientMessage nodeB "bar" | |
let (NetworkClient transformNodeA) = do | |
simpleClientMessage nodeA "world" | |
transformNodeA (network, nodeA) | |
transformNodeB (network, nodeB) | |
return () | |
-- | Test the in memory simple network | |
-- >>> runMemoryTest simpleCreateNodeTest | |
-- (0,MemorySimple {memory_counter = 1, memory_connections = fromList [0], memory_handlers = fromList [], memory_messageQueue = fromList []}) | |
-- >>> runMemoryTest simpleSendMessageTestQueue | |
-- ((),MemorySimple {memory_counter = 2, memory_connections = fromList [0,1], memory_handlers = fromList [], memory_messageQueue = fromList [(0,["world"]),(1,["hello"])]}) | |
-- >>> runMemoryTest simpleWaitMessageTest | |
-- (["world","hello"],MemorySimple {memory_counter = 2, memory_connections = fromList [0,1], memory_handlers = fromList [], memory_messageQueue = fromList []}) | |
-- >>> runMemoryTest simpleWaitMessageTestConcurrent | |
-- (["world","hello"],MemorySimple {memory_counter = 2, memory_connections = fromList [0,1], memory_handlers = fromList [], memory_messageQueue = fromList []}) | |
-- >>> runMemoryTest simpleClientSendMessageTestQueue | |
-- ((),MemorySimple {memory_counter = 2, memory_connections = fromList [0,1], memory_handlers = fromList [], memory_messageQueue = fromList [(0,["world"]),(1,["bar","foo"])]}) | |
runMemoryTest :: (TVar (MemorySimple a) -> IO b) -> IO (b, MemorySimple a) | |
runMemoryTest test = do | |
network <- newTVarIO memorySimpleEmpty | |
r <- test network | |
s <- atomically $ readTVar network | |
return (r, s) | |
-- | Test the in TCP simple network | |
-- >>> runTCPTest simpleCreateNodeTest | |
-- (<TCPSimpleNode>,TCPSimple {tcp_message_queue = fromList [], tcp_message_handlers = fromList []}) | |
-- >>> runTCPTest simpleSendMessageTestQueue | |
-- ((),TCPSimple {tcp_message_queue = fromList [], tcp_message_handlers = fromList []}) | |
-- >>> runTCPTest simpleWaitMessageTest | |
-- (["world","hello"],TCPSimple {tcp_message_queue = fromList [], tcp_message_handlers = fromList []}) | |
-- >>> runTCPTest simpleWaitMessageTestConcurrent | |
-- (["world","hello"],TCPSimple {tcp_message_queue = fromList [], tcp_message_handlers = fromList []}) | |
-- >>> runTCPTest simpleClientSendMessageTestQueue | |
-- ((),TCPSimple {tcp_message_queue = fromList [], tcp_message_handlers = fromList []}) | |
runTCPTest :: (TVar TCPSimple -> IO b) -> IO (b, TCPSimple) | |
runTCPTest test = do | |
network <- newTVarIO tcpSimpleEmpty | |
r <- test network | |
s <- atomically $ readTVar network | |
return (r, s) | |
{- In memory implementation -} | |
instance SimpleNetwork IO (TVar (MemorySimple a)) MemorySimpleNode a where | |
simpleCreateNode = memorySimpleCreateNode | |
simpleSendMessage = memorySimpleSendMessage | |
simpleWaitMessage = memoryWaitMessage | |
data MemorySimple a = MemorySimple | |
{ memory_counter :: MemorySimpleNode | |
, memory_connections :: Set.Set MemorySimpleNode | |
, memory_handlers :: Map.Map MemorySimpleNode (MVar a) | |
, memory_messageQueue :: Map.Map MemorySimpleNode [a] | |
} deriving (Show) | |
type MemorySimpleNode = Integer | |
{- Empty network state -} | |
memorySimpleEmpty :: MemorySimple a | |
memorySimpleEmpty = MemorySimple | |
{ memory_counter = 0 | |
, memory_connections = Set.empty | |
, memory_handlers = Map.empty | |
, memory_messageQueue = Map.empty | |
} | |
{- Create new node -} | |
memorySimpleCreateNode :: TVar (MemorySimple a) -> IO MemorySimpleNode | |
memorySimpleCreateNode network = atomically $ do | |
(MemorySimple counter conns a b) <- readTVar network | |
writeTVar network (MemorySimple (counter + 1) (Set.insert counter conns) a b) | |
return counter | |
{- Send a message -} | |
memorySimpleSendMessage :: TVar (MemorySimple a) -> MemorySimpleNode -> MemorySimpleNode -> a -> IO () | |
memorySimpleSendMessage network from to message = do | |
maybeHandler <- atomically $ do | |
(MemorySimple a conns handlers messageQueue) <- readTVar network | |
-- @TODO error if connection does not exit | |
case Map.lookup to handlers of | |
Just handler -> do -- @TODO test this | |
writeTVar network (MemorySimple a conns (Map.delete to handlers) messageQueue) | |
return (Just (handler, message)) | |
Nothing -> do | |
writeTVar network (MemorySimple a conns handlers (adjustDefault (message :) [] to messageQueue)) | |
return Nothing | |
case maybeHandler of | |
Just (handler, message) -> putMVar handler message | |
Nothing -> return () | |
{- Send a message -} | |
memoryWaitMessage :: TVar (MemorySimple a) -> MemorySimpleNode -> IO a | |
memoryWaitMessage network conn = do | |
waitVar <- newEmptyMVar | |
result <- atomically $ do | |
(MemorySimple a b handlers messageQueue) <- readTVar network | |
case Map.lookup conn messageQueue of | |
Just messages -> do | |
writeTVar network (MemorySimple a b handlers updateMessageQueue) | |
return (Just (head messages)) | |
where | |
updateMessageQueue = if length messages > 1 | |
then Map.insert conn (drop 1 messages) messageQueue | |
else Map.delete conn messageQueue | |
Nothing -> do | |
writeTVar network (MemorySimple a b (Map.insert conn waitVar handlers) messageQueue) | |
return Nothing | |
case result of | |
Just message -> do | |
return message | |
Nothing -> do | |
message <- liftIO (takeMVar waitVar) | |
return message | |
{- TCP implementation -} | |
instance SimpleNetwork IO (TVar TCPSimple) TCPSimpleNode String where | |
simpleCreateNode = tcpSimpleCreateNode | |
simpleSendMessage = tcpSimpleSendMessage | |
simpleWaitMessage = tcpSimpleWaitMessage | |
data TCPSimple = TCPSimple | |
{ tcp_message_queue :: Map.Map TCPSimplePort [String] | |
, tcp_message_handlers :: Map.Map TCPSimplePort [MVar String] | |
} deriving (Show) | |
data TCPSimpleNode = TCPSimpleNode Socket | |
data TCPSimplePort = TCPSimplePort PortID | |
{- Empty network state -} | |
tcpSimpleEmpty :: TCPSimple | |
tcpSimpleEmpty = TCPSimple | |
{ tcp_message_queue = Map.empty | |
, tcp_message_handlers = Map.empty | |
} | |
{- Create new node -} | |
tcpSimpleCreateNode :: TVar TCPSimple -> IO TCPSimpleNode | |
tcpSimpleCreateNode network = do | |
tempFile <- createTimeFile "ghc_tcp_simple_" | |
let port = (UnixSocket tempFile) | |
socket <- listenOn port | |
forkIO $ acceptLoop port socket | |
return (TCPSimpleNode socket) | |
where | |
acceptLoop :: PortID -> Socket -> IO () | |
acceptLoop port socket = do | |
(handle, _, _) <- accept socket | |
forkIO $ readMessage port handle | |
acceptLoop port socket | |
readMessage :: PortID -> Handle -> IO () | |
readMessage port handle = do | |
let key = (TCPSimplePort port) | |
msg <- hGetContents handle | |
maybeMVar <- atomically $ do | |
(TCPSimple queue handlers) <- readTVar network | |
case Map.lookup key handlers of | |
Just mvars -> do | |
writeTVar network (TCPSimple queue (Map.delete key handlers)) | |
return (Just mvars) | |
Nothing -> do | |
writeTVar network (TCPSimple (adjustDefault (msg :) [] key queue) handlers) | |
return Nothing | |
case maybeMVar of | |
Just mvars -> sequence_ $ map (\m -> putMVar m msg) mvars | |
Nothing -> return () | |
{- Send a message -} | |
tcpSimpleSendMessage :: TVar TCPSimple -> TCPSimpleNode -> TCPSimpleNode -> String -> IO () | |
tcpSimpleSendMessage network (TCPSimpleNode from) (TCPSimpleNode to) message = do | |
toPort <- socketPort to | |
handle <- connectTo "localhost" toPort | |
hPutStr handle message -- @TODO should serialize better | |
hFlush handle | |
hClose handle | |
return () | |
{- Wait for a message -} | |
tcpSimpleWaitMessage :: TVar TCPSimple -> TCPSimpleNode -> IO String | |
tcpSimpleWaitMessage network (TCPSimpleNode socket) = do | |
waitVar <- newEmptyMVar | |
fromPort <- socketPort socket | |
let key = TCPSimplePort fromPort | |
result <- atomically $ do | |
(TCPSimple queue handlers) <- readTVar network | |
return () | |
case Map.lookup key queue of | |
-- pop message off the queue | |
Just messages -> do | |
writeTVar network (TCPSimple updateQueue handlers) | |
return (Just (head messages)) | |
where | |
updateQueue = if length messages > 1 | |
then Map.insert key (drop 1 messages) queue | |
else Map.delete key queue | |
-- add our handler to wait for a message | |
Nothing -> do | |
writeTVar network (TCPSimple queue (adjustDefault (waitVar :) [] key handlers)) | |
return Nothing | |
case result of | |
Just message -> return message | |
Nothing -> takeMVar waitVar | |
{- Instances -} | |
instance Show TCPSimpleNode where | |
show _ = "<TCPSimpleNode>" | |
instance Show TCPSimplePort where | |
show _ = "<TCPSimplePort>" | |
instance Ord TCPSimplePort where | |
compare (TCPSimplePort a) (TCPSimplePort b) = compare a b | |
instance Eq TCPSimplePort where | |
(TCPSimplePort a) == (TCPSimplePort b) = a == b | |
{- Helper functions -} | |
adjustDefault :: Ord k => (a -> a) -> a -> k -> Map.Map k a -> Map.Map k a | |
adjustDefault fn def key m = Map.insert key (fn . Map.findWithDefault def key $ m) m | |
listenOnNext :: PortNumber -> IO Socket | |
listenOnNext port = handle tryNext (listenOn $ PortNumber port) | |
where | |
tryNext :: IOException -> IO Socket | |
tryNext _ = listenOnNext (port + 1) | |
createTimeFile prefix = withTempFile "/tmp/" "ghc_.tmp" (\name _ -> return name) | |
instance Ord PortID where | |
compare (Service a) (Service b) = compare a b | |
compare (PortNumber a) (PortNumber b) = compare a b | |
compare (UnixSocket a) (UnixSocket b) = compare a b | |
instance Show (MVar a) where | |
show _ = "<mvar>" | |
main = print "hello world" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment