Last active
January 9, 2018 23:16
-
-
Save gelisam/291224df23cdc24fb55718103667cc90 to your computer and use it in GitHub Desktop.
Distributing a computation using Cloud Haskell
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- in response to https://twitter.com/jfischoff/status/948768178070470656 | |
{-# LANGUAGE TemplateHaskell #-} | |
module Main where | |
import Control.Concurrent (threadDelay) | |
import Control.Monad.IO.Class (liftIO) | |
import Data.Traversable (for) | |
import Text.Printf (printf) | |
import Control.Distributed.Process (Process, NodeId, spawn) | |
import Control.Distributed.Process (SendPort, newChan, sendChan, receiveChan) | |
import Control.Distributed.Process.Serializable (Serializable) | |
import Control.Distributed.Process (RemoteTable, Closure) | |
import Control.Distributed.Process.Closure (mkClosure, remotable) | |
import Control.Distributed.Process.Node (initRemoteTable) | |
import System.Environment (getArgs, getProgName) | |
import Control.Distributed.Process.Backend.SimpleLocalnet (startSlave, initializeBackend, startMaster) | |
-- We will run a distributed map over a number of nodes. | |
-- Here is the computation we will run on each node: | |
slowIncr :: Int -> IO Int | |
slowIncr n = do | |
printf "incrementing %d...\n" n | |
let seconds = 3 | |
threadDelay $ seconds * 1000 * 1000 | |
pure (n+1) | |
-- We cannot serialize arbitrary IO computations, so the set of remotable | |
-- computations has to be known in advance. This is not a big limitation in | |
-- practice, because we can construct complex computations by combining a | |
-- finite number of primitive remotable computations; but let's keep things | |
-- simple for now. | |
-- | |
-- The type of each remotable computation must be a specialization of the type | |
-- @Serializable a => a -> Process ()@, for a _concrete_ type @a@. Polymorphism | |
-- is supported, but messier. | |
-- | |
-- Since we want our computations to produce a value but we are forced to | |
-- return unit, let's ask for a SendPort to which we can send that value. Yes, | |
-- SendPort is Serializable. | |
remotableSlowIncr :: (Int, SendPort Int) -> Process () | |
remotableSlowIncr (n, sendPort) = do | |
r <- liftIO $ slowIncr n | |
sendChan sendPort r | |
-- The idea is that the remote nodes are running the same code as we are, so if | |
-- we ask them to run the top-level definition called "remotableSlowIncr", | |
-- we'll both know which computation we mean. To do that, both sides need to | |
-- have a table mapping top-level names to the corresponding computation. | |
remotable ['remotableSlowIncr] | |
-- The above generated __remoteTable, which is a @RemoteTable -> RemoteTable@, | |
-- so we need to apply it to the empty table in order to get our remote table. | |
-- This is supposed to make it easy to combine remote tables from different | |
-- modules, but I wonder why they didn't simply write a Monoid instance for | |
-- RemoteTable? | |
remoteTable :: RemoteTable | |
remoteTable = __remoteTable initRemoteTable | |
-- Now that we have a remote table, we can use it to construct a Closure, which | |
-- describes both which remotable computation we want to run and which | |
-- parameters we want to give it. | |
mkSlowIncrClosure :: (Int, SendPort Int) -> Closure (Process ()) | |
mkSlowIncrClosure = $(mkClosure 'remotableSlowIncr) | |
-- We can finally write a higher-order function which distributes a computation | |
-- over a number of nodes. | |
distributedTraverse :: (Serializable a, Serializable b) | |
=> [NodeId] | |
-> ((a, SendPort b) -> Closure (Process ())) | |
-> [a] | |
-> Process [b] | |
distributedTraverse [] _ _ = error "expected at least one node" | |
distributedTraverse nodes cAction xs = do | |
receivePorts <- for (zip (cycle nodes) xs) $ \(node, x) -> do | |
(sendPort, receivePort) <- newChan | |
_ <- spawn node $ cAction (x, sendPort) | |
pure receivePort | |
traverse receiveChan receivePorts | |
-- One of the key ideas of Cloud Haskell (aka distributed-process) is that | |
-- instead of viewing your distributed application as a collection of | |
-- collaborating programs, you can view it as a single concurrent program in | |
-- which threads can be scheduled to run on other nodes instead of other CPUs. | |
-- This is a paradigm called "distributed concurrency". | |
distributedMain :: [NodeId] -> Process () | |
distributedMain nodes = do | |
liftIO $ printf "distributing 10 computations...\n" | |
ys <- distributedTraverse nodes mkSlowIncrClosure [0..9] | |
liftIO $ printf "result: %s\n" (show ys) | |
-- Under the hood, of course, distributedMain is implemented by a collection of | |
-- collaborating programs. | |
main :: IO () | |
main = do | |
args <- getArgs | |
case args of | |
["master", host, port] -> do | |
backend <- initializeBackend host port remoteTable | |
startMaster backend $ \slaveNodes -> do | |
if null slaveNodes | |
then liftIO $ printf "please start the slave nodes first\n" | |
else distributedMain slaveNodes | |
["slave", host, port] -> do | |
backend <- initializeBackend host port remoteTable | |
printf "waiting for the master\n" | |
startSlave backend | |
_ -> do | |
progName <- getProgName | |
printf "usage:\n" | |
printf " %s slave localhost 1234\n" progName | |
printf " %s slave localhost 1235\n" progName | |
printf " %s slave localhost 1236\n" progName | |
printf " %s master localhost 1237\n" progName | |
printf "\n" | |
printf "To run this on different machines, replace localhost with the ip\n" | |
printf "of the machine on which that node runs, so the other nodes can\n" | |
printf "find it. The nodes find each other using broadcast packets, which\n" | |
printf "are typically blocked by commercial routers, so this example should\n" | |
printf "work on your home's local network, but not across the internet.\n" |
One important aspect I did not cover is error-handling. When running distributed computations, there is always the possibility that some of the remote nodes will crash or that we will lose communication with them. Cloud Haskell supports sophisticated ways of dealing with those error conditions, but for simplicity I did not cover any of them.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
If SimpleLocalnet isn't working on your network (the master complains that you need to start the slaves first even though you did), try this reimplementation of the SimpleLocalnet API which uses a hardcoded list of nodes instead of broadcast packets.