Skip to content

Instantly share code, notes, and snippets.

@munro
Last active August 29, 2015 14:22
Show Gist options
  • Save munro/2b60ba19ddfaa47b0990 to your computer and use it in GitHub Desktop.
Save munro/2b60ba19ddfaa47b0990 to your computer and use it in GitHub Desktop.
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE FunctionalDependencies #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE GADTs #-}
module Main where
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad.IO.Class
import Control.Exception
import qualified Data.Map as Map
import qualified Data.Set as Set
import Network
import System.IO.Temp
import GHC.IO.Handle
import Control.Monad.State.Lazy
{- Abstract simple network API -}
class Monad m => SimpleNetwork m network node message | network -> node message where
simpleCreateNode :: network -> m node
simpleSendMessage :: network -> node -> node -> message -> m ()
simpleWaitMessage :: network -> node -> m message
{- Abstract model for network client API, I think I could use StateMonad to make this shorter -}
instance SimpleNetwork m network node message => Functor (NetworkClient m network client) where
fmap apply (NetworkClient transform) = NetworkClient (\state -> do
(a, network, client) <- transform state
return (apply a, network, client))
instance SimpleNetwork m network node message => Applicative (NetworkClient m network client) where
pure a = NetworkClient (\(network, client) -> return (a, network, client))
(NetworkClient applyM) <*> (NetworkClient transform) = NetworkClient (\state -> do
(a, network, client) <- transform state
(apply, network2, client2) <- applyM (network, client)
return (apply a, network2, client2))
instance SimpleNetwork m network node message => Monad (NetworkClient m network client) where
(NetworkClient transform) >>= apply = NetworkClient (\state -> do
(a, network, client) <- transform state
let (NetworkClient transform2) = apply a
transform2 (network, client))
data NetworkClient m network client a = NetworkClient
{ runNetworkClient :: (network, client) -> m (a, network, client)
}
simpleClientMessage :: SimpleNetwork m network node message => node -> message -> NetworkClient m network node ()
simpleClientMessage to message = NetworkClient transform where
transform (network, from) = do
simpleSendMessage network from to message
return ((), network, from)
simpleClientWaitMessage :: SimpleNetwork m network node message => NetworkClient m network node message
simpleClientWaitMessage = NetworkClient transform where
transform (network, from) = do
msg <- simpleWaitMessage network from
return (msg, network, from)
{- Usage testing -}
simpleCreateNodeTest :: SimpleNetwork m network node message => network -> m node
simpleCreateNodeTest network = do
node <- simpleCreateNode network
return node
simpleSendMessageTestQueue :: SimpleNetwork m network node String => network -> m ()
simpleSendMessageTestQueue network = do
nodeA <- simpleCreateNode network
nodeB <- simpleCreateNode network
simpleSendMessage network nodeA nodeB "hello"
simpleSendMessage network nodeB nodeA "world"
return ()
simpleWaitMessageTest :: SimpleNetwork m network node String => network -> m [String]
simpleWaitMessageTest network = do
nodeA <- simpleCreateNode network
nodeB <- simpleCreateNode network
simpleSendMessage network nodeA nodeB "hello"
simpleSendMessage network nodeB nodeA "world"
msgA <- simpleWaitMessage network nodeA
msgB <- simpleWaitMessage network nodeB
return [msgA, msgB]
simpleWaitMessageTestConcurrent :: SimpleNetwork IO network node String => network -> IO [String]
simpleWaitMessageTestConcurrent network = do
userA <- simpleCreateNode network
userB <- simpleCreateNode network
aVar <- newEmptyMVar
bVar <- newEmptyMVar
forkIO $ do
a <- simpleWaitMessage network userA
putMVar aVar a
b <- simpleWaitMessage network userB
putMVar bVar b
forkIO $ do
simpleSendMessage network userA userB "hello"
simpleSendMessage network userB userA "world"
a <- takeMVar aVar
b <- takeMVar bVar
return [a, b]
simpleClientSendMessageTestQueue :: SimpleNetwork m network node String => network -> m ()
simpleClientSendMessageTestQueue network = do
nodeA <- simpleCreateNode network
nodeB <- simpleCreateNode network
let (NetworkClient transformNodeB) = do
simpleClientMessage nodeB "foo"
simpleClientMessage nodeB "bar"
let (NetworkClient transformNodeA) = do
simpleClientMessage nodeA "world"
transformNodeA (network, nodeA)
transformNodeB (network, nodeB)
return ()
-- | Test the in memory simple network
-- >>> runMemoryTest simpleCreateNodeTest
-- (0,MemorySimple {memory_counter = 1, memory_connections = fromList [0], memory_handlers = fromList [], memory_messageQueue = fromList []})
-- >>> runMemoryTest simpleSendMessageTestQueue
-- ((),MemorySimple {memory_counter = 2, memory_connections = fromList [0,1], memory_handlers = fromList [], memory_messageQueue = fromList [(0,["world"]),(1,["hello"])]})
-- >>> runMemoryTest simpleWaitMessageTest
-- (["world","hello"],MemorySimple {memory_counter = 2, memory_connections = fromList [0,1], memory_handlers = fromList [], memory_messageQueue = fromList []})
-- >>> runMemoryTest simpleWaitMessageTestConcurrent
-- (["world","hello"],MemorySimple {memory_counter = 2, memory_connections = fromList [0,1], memory_handlers = fromList [], memory_messageQueue = fromList []})
-- >>> runMemoryTest simpleClientSendMessageTestQueue
-- ((),MemorySimple {memory_counter = 2, memory_connections = fromList [0,1], memory_handlers = fromList [], memory_messageQueue = fromList [(0,["world"]),(1,["bar","foo"])]})
runMemoryTest :: (TVar (MemorySimple a) -> IO b) -> IO (b, MemorySimple a)
runMemoryTest test = do
network <- newTVarIO memorySimpleEmpty
r <- test network
s <- atomically $ readTVar network
return (r, s)
-- | Test the in TCP simple network
-- >>> runTCPTest simpleCreateNodeTest
-- (<TCPSimpleNode>,TCPSimple {tcp_message_queue = fromList [], tcp_message_handlers = fromList []})
-- >>> runTCPTest simpleSendMessageTestQueue
-- ((),TCPSimple {tcp_message_queue = fromList [], tcp_message_handlers = fromList []})
-- >>> runTCPTest simpleWaitMessageTest
-- (["world","hello"],TCPSimple {tcp_message_queue = fromList [], tcp_message_handlers = fromList []})
-- >>> runTCPTest simpleWaitMessageTestConcurrent
-- (["world","hello"],TCPSimple {tcp_message_queue = fromList [], tcp_message_handlers = fromList []})
-- >>> runTCPTest simpleClientSendMessageTestQueue
-- ((),TCPSimple {tcp_message_queue = fromList [], tcp_message_handlers = fromList []})
runTCPTest :: (TVar TCPSimple -> IO b) -> IO (b, TCPSimple)
runTCPTest test = do
network <- newTVarIO tcpSimpleEmpty
r <- test network
s <- atomically $ readTVar network
return (r, s)
{- In memory implementation -}
instance SimpleNetwork IO (TVar (MemorySimple a)) MemorySimpleNode a where
simpleCreateNode = memorySimpleCreateNode
simpleSendMessage = memorySimpleSendMessage
simpleWaitMessage = memoryWaitMessage
data MemorySimple a = MemorySimple
{ memory_counter :: MemorySimpleNode
, memory_connections :: Set.Set MemorySimpleNode
, memory_handlers :: Map.Map MemorySimpleNode (MVar a)
, memory_messageQueue :: Map.Map MemorySimpleNode [a]
} deriving (Show)
type MemorySimpleNode = Integer
{- Empty network state -}
memorySimpleEmpty :: MemorySimple a
memorySimpleEmpty = MemorySimple
{ memory_counter = 0
, memory_connections = Set.empty
, memory_handlers = Map.empty
, memory_messageQueue = Map.empty
}
{- Create new node -}
memorySimpleCreateNode :: TVar (MemorySimple a) -> IO MemorySimpleNode
memorySimpleCreateNode network = atomically $ do
(MemorySimple counter conns a b) <- readTVar network
writeTVar network (MemorySimple (counter + 1) (Set.insert counter conns) a b)
return counter
{- Send a message -}
memorySimpleSendMessage :: TVar (MemorySimple a) -> MemorySimpleNode -> MemorySimpleNode -> a -> IO ()
memorySimpleSendMessage network from to message = do
maybeHandler <- atomically $ do
(MemorySimple a conns handlers messageQueue) <- readTVar network
-- @TODO error if connection does not exit
case Map.lookup to handlers of
Just handler -> do -- @TODO test this
writeTVar network (MemorySimple a conns (Map.delete to handlers) messageQueue)
return (Just (handler, message))
Nothing -> do
writeTVar network (MemorySimple a conns handlers (adjustDefault (message :) [] to messageQueue))
return Nothing
case maybeHandler of
Just (handler, message) -> putMVar handler message
Nothing -> return ()
{- Send a message -}
memoryWaitMessage :: TVar (MemorySimple a) -> MemorySimpleNode -> IO a
memoryWaitMessage network conn = do
waitVar <- newEmptyMVar
result <- atomically $ do
(MemorySimple a b handlers messageQueue) <- readTVar network
case Map.lookup conn messageQueue of
Just messages -> do
writeTVar network (MemorySimple a b handlers updateMessageQueue)
return (Just (head messages))
where
updateMessageQueue = if length messages > 1
then Map.insert conn (drop 1 messages) messageQueue
else Map.delete conn messageQueue
Nothing -> do
writeTVar network (MemorySimple a b (Map.insert conn waitVar handlers) messageQueue)
return Nothing
case result of
Just message -> do
return message
Nothing -> do
message <- liftIO (takeMVar waitVar)
return message
{- TCP implementation -}
instance SimpleNetwork IO (TVar TCPSimple) TCPSimpleNode String where
simpleCreateNode = tcpSimpleCreateNode
simpleSendMessage = tcpSimpleSendMessage
simpleWaitMessage = tcpSimpleWaitMessage
data TCPSimple = TCPSimple
{ tcp_message_queue :: Map.Map TCPSimplePort [String]
, tcp_message_handlers :: Map.Map TCPSimplePort [MVar String]
} deriving (Show)
data TCPSimpleNode = TCPSimpleNode Socket
data TCPSimplePort = TCPSimplePort PortID
{- Empty network state -}
tcpSimpleEmpty :: TCPSimple
tcpSimpleEmpty = TCPSimple
{ tcp_message_queue = Map.empty
, tcp_message_handlers = Map.empty
}
{- Create new node -}
tcpSimpleCreateNode :: TVar TCPSimple -> IO TCPSimpleNode
tcpSimpleCreateNode network = do
tempFile <- createTimeFile "ghc_tcp_simple_"
let port = (UnixSocket tempFile)
socket <- listenOn port
forkIO $ acceptLoop port socket
return (TCPSimpleNode socket)
where
acceptLoop :: PortID -> Socket -> IO ()
acceptLoop port socket = do
(handle, _, _) <- accept socket
forkIO $ readMessage port handle
acceptLoop port socket
readMessage :: PortID -> Handle -> IO ()
readMessage port handle = do
let key = (TCPSimplePort port)
msg <- hGetContents handle
maybeMVar <- atomically $ do
(TCPSimple queue handlers) <- readTVar network
case Map.lookup key handlers of
Just mvars -> do
writeTVar network (TCPSimple queue (Map.delete key handlers))
return (Just mvars)
Nothing -> do
writeTVar network (TCPSimple (adjustDefault (msg :) [] key queue) handlers)
return Nothing
case maybeMVar of
Just mvars -> sequence_ $ map (\m -> putMVar m msg) mvars
Nothing -> return ()
{- Send a message -}
tcpSimpleSendMessage :: TVar TCPSimple -> TCPSimpleNode -> TCPSimpleNode -> String -> IO ()
tcpSimpleSendMessage network (TCPSimpleNode from) (TCPSimpleNode to) message = do
toPort <- socketPort to
handle <- connectTo "localhost" toPort
hPutStr handle message -- @TODO should serialize better
hFlush handle
hClose handle
return ()
{- Wait for a message -}
tcpSimpleWaitMessage :: TVar TCPSimple -> TCPSimpleNode -> IO String
tcpSimpleWaitMessage network (TCPSimpleNode socket) = do
waitVar <- newEmptyMVar
fromPort <- socketPort socket
let key = TCPSimplePort fromPort
result <- atomically $ do
(TCPSimple queue handlers) <- readTVar network
return ()
case Map.lookup key queue of
-- pop message off the queue
Just messages -> do
writeTVar network (TCPSimple updateQueue handlers)
return (Just (head messages))
where
updateQueue = if length messages > 1
then Map.insert key (drop 1 messages) queue
else Map.delete key queue
-- add our handler to wait for a message
Nothing -> do
writeTVar network (TCPSimple queue (adjustDefault (waitVar :) [] key handlers))
return Nothing
case result of
Just message -> return message
Nothing -> takeMVar waitVar
{- Instances -}
instance Show TCPSimpleNode where
show _ = "<TCPSimpleNode>"
instance Show TCPSimplePort where
show _ = "<TCPSimplePort>"
instance Ord TCPSimplePort where
compare (TCPSimplePort a) (TCPSimplePort b) = compare a b
instance Eq TCPSimplePort where
(TCPSimplePort a) == (TCPSimplePort b) = a == b
{- Helper functions -}
adjustDefault :: Ord k => (a -> a) -> a -> k -> Map.Map k a -> Map.Map k a
adjustDefault fn def key m = Map.insert key (fn . Map.findWithDefault def key $ m) m
listenOnNext :: PortNumber -> IO Socket
listenOnNext port = handle tryNext (listenOn $ PortNumber port)
where
tryNext :: IOException -> IO Socket
tryNext _ = listenOnNext (port + 1)
createTimeFile prefix = withTempFile "/tmp/" "ghc_.tmp" (\name _ -> return name)
instance Ord PortID where
compare (Service a) (Service b) = compare a b
compare (PortNumber a) (PortNumber b) = compare a b
compare (UnixSocket a) (UnixSocket b) = compare a b
instance Show (MVar a) where
show _ = "<mvar>"
main = print "hello world"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment