Last active
October 8, 2015 18:08
-
-
Save billdozr/3368959 to your computer and use it in GitHub Desktop.
Distributed ping (with boilerplate code, i.e. without Template Haskell)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE DeriveDataTypeable, GeneralizedNewtypeDeriving #-} | |
import System.Environment (getArgs, getProgName) | |
import Control.Monad (forM_, replicateM_) | |
import Data.Binary (Binary, encode, decode) | |
import Data.Typeable (Typeable) | |
import Data.ByteString.Lazy (ByteString) | |
import Control.Concurrent (threadDelay) | |
import Data.Rank1Dynamic (toDynamic) | |
import Control.Distributed.Static | |
( Static | |
, Closure(..) | |
, RemoteTable | |
, registerStatic | |
, staticLabel | |
, staticCompose | |
) | |
import Control.Distributed.Process | |
import Control.Distributed.Process.Node (initRemoteTable, runProcess) | |
import Control.Distributed.Process.Serializable (SerializableDict(..)) | |
import Control.Distributed.Process.Backend.SimpleLocalnet | |
( Backend | |
, startMaster | |
, initializeBackend | |
, newLocalNode | |
, findPeers | |
, findSlaves | |
) | |
newtype Ping = Ping ProcessId | |
deriving (Typeable, Binary, Show) | |
newtype Pong = Pong ProcessId | |
deriving (Typeable, Binary, Show) | |
worker :: Ping -> Process () | |
worker (Ping master) = do | |
wId <- getSelfPid | |
say "Got a Ping!" | |
send master (Pong wId) | |
-- // Explicitly construct Closures | |
workerStatic :: Static (Ping -> Process ()) | |
workerStatic = staticLabel "$ping.worker" | |
decodePingStatic :: Static (ByteString -> Ping) | |
decodePingStatic = staticLabel "$ping.decodePing" | |
workerClosure :: Ping -> Closure (Process ()) | |
workerClosure p = closure decoder (encode p) | |
where | |
decoder :: Static (ByteString -> Process ()) | |
decoder = workerStatic `staticCompose` decodePingStatic | |
-- // | |
initialProcess :: String -> [NodeId] -> Process () | |
initialProcess "WORKER" peers = do | |
say $ "Peers: " ++ show peers | |
pid <- getSelfPid | |
register "slaveController" pid | |
receiveWait [] | |
initialProcess "MASTER" workers = do | |
say $ "Workers: " ++ show workers | |
pid <- getSelfPid | |
forM_ workers $ \w -> do | |
say $ "Sending a Ping to " ++ (show w) ++ "..." | |
spawn w (workerClosure (Ping pid)) | |
say $ "Waiting for reply from " ++ (show (length workers)) ++ " worker(s)" | |
replicateM_ (length workers) $ do | |
let resultMatch = match (\(Pong wId) -> return wId) | |
in do wId <- receiveWait [resultMatch] | |
say $ "Got back a Pong from " | |
++ (show $ processNodeId wId) ++ "!" | |
(liftIO . threadDelay) 2000000 -- Wait a bit before return | |
main = do | |
prog <- getProgName | |
args <- getArgs | |
case args of | |
["master", host, port] -> do | |
backend <- initializeBackend host port rtable | |
node <- newLocalNode backend | |
runProcess node $ do | |
slaves <- findSlaves backend | |
(initialProcess "MASTER" (map processNodeId slaves)) | |
["worker", host, port] -> do | |
backend <- initializeBackend host port rtable | |
node <- newLocalNode backend | |
peers <- findPeers backend 50000 | |
runProcess node (initialProcess "WORKER" peers) | |
_ -> | |
putStrLn $ "usage: " ++ prog ++ " (master | worker) host port" | |
where | |
rtable :: RemoteTable | |
rtable = registerStatic "$ping.worker" (toDynamic worker) | |
. registerStatic "$ping.decodePing" (toDynamic | |
(decode :: ByteString -> Ping)) | |
$ initRemoteTable |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@cepete
Make sure you don't have another version of
binary
package registered.For instance, I had
binary-0.7.1.0
andbinary-0.6.4.0
. I removed thebinary-0.7.1.0
withghc-pkg unregister binary-0.7.1.0
and the same compile time error you got above went away.