Skip to content

Instantly share code, notes, and snippets.

@23Skidoo
Created May 20, 2011 07:14
Show Gist options
  • Save 23Skidoo/982483 to your computer and use it in GitHub Desktop.
Save 23Skidoo/982483 to your computer and use it in GitHub Desktop.
GSoC project prototype
module Main
where
import Control.Concurrent
import Control.Monad
import Data.Map((!))
import qualified Data.Map as M
import System.Environment(getArgs)
-- The program consists of several threads which communicate via Chans. There
-- are several worker threads, which compile the packages (threadDelay is used
-- to simulate actual operations). A single control thread maintains the package
-- graph and assigns tasks to the worker threads. A single logger thread prints
-- out messages received from the worker threads. A single install thread
-- installs the packages into the target directory (this is done serially, but
-- can also be parallelized if deemed safe).
-- After the install thread installs a package, it notifies the controller
-- thread, which then updates the package graph and adds new tasks for the
-- worker threads (if possible). The control thread terminates when the last
-- package has been installed (which leads to the termination of all other
-- threads).
type PackageName = String
type PackageGraph = M.Map PackageName
([PackageName] -- ^ List of packages depending on this one
, Int -- ^ Number of packages this package depends on
)
-- worker -> installer communication
data InstallTask = Install PackageName
type InstallChan = Chan InstallTask
-- (worker, installer) -> logger communication
data LogTask = Message String
type LogChan = Chan LogTask
-- controller -> workers communication
data WorkerTask = BuildPackage PackageName
type WorkerChan = Chan WorkerTask
-- installer -> controller communication
data ControlMessage = PackageInstalled PackageName
type ControlChan = Chan ControlMessage
oneSecond :: Int
oneSecond = 1 * 10^6
log_thread :: LogChan -> IO ()
log_thread lch = go
where
go = do Message s <- readChan lch
putStrLn s >> go
install_thread :: InstallChan -> LogChan -> ControlChan -> IO ()
install_thread ich lch cch = go
where
go = do Install pn <- readChan ich
threadDelay oneSecond
writeChan lch (Message $ pn ++ " installed.")
writeChan cch (PackageInstalled pn) >> go
worker_thread :: Int -> WorkerChan -> LogChan -> InstallChan -> IO ()
worker_thread num wch lch ich = go
where
makeMsg pn = (Message $ "[" ++ (show num) ++ "] " ++ pn ++ " built.")
go = do BuildPackage pn <- readChan wch
threadDelay oneSecond
writeChan lch (makeMsg pn)
writeChan ich (Install pn) >> go
control_thread :: ControlChan -> WorkerChan -> PackageGraph -> IO ()
control_thread cch wch g0 =
do let leaves = getLeaves g0
forM_ leaves (\l -> writeChan wch (BuildPackage l))
go g0 0
where
numPackages = M.size g0
go g n =
do PackageInstalled pn <- readChan cch
let v = g ! pn
let children = fst v
let n' = n + 1
if null children
then
-- This is one of the final packages.
if n' == numPackages
then return ()
else go g n'
else
-- Try to add more children to the queue
do let (newG, activatedChildren) = update g children
forM_ activatedChildren (\c ->
writeChan wch (BuildPackage c))
go newG n'
update :: PackageGraph -> [PackageName] -> (PackageGraph, [PackageName])
update g c = update' g c []
update' g [] acc = (g, acc)
update' g (c:cs) acc = let newG = decrNumParents c g
newNumParents = snd $ newG ! c
in if newNumParents == 0
then update' newG cs (c:acc)
else update' newG cs acc
decrNumParents :: PackageName -> PackageGraph -> PackageGraph
decrNumParents k g = M.adjust (\(p,n) -> (p, n-1)) k g
getLeaves :: PackageGraph -> [PackageName]
getLeaves m = M.keys . M.filter ((==) 0 . snd) $ m
-- Packages 0-4 can be built in parallel, Package 5 depends on all previous
-- packages. Package 6 depends only on package 4.
packageGraph :: PackageGraph
packageGraph = M.fromList [ ("Package0", (["Package5"], 0))
, ("Package1", (["Package5"], 0))
, ("Package2", (["Package5"], 0))
, ("Package3", (["Package5"], 0))
, ("Package4", (["Package5", "Package6"], 0))
, ("Package5", ([], 5))
, ("Package6", ([], 1))
]
-- Run as :main NUM_THREADS, where NUM_THREADS >= 1
main :: IO ()
main =
do numWorkers <- liftM (read . head) $ getArgs
-- Init comm. channels
workerChan <- newChan
logChan <- newChan
installChan <- newChan
controlChan <- newChan
-- Fork off worker threads
forM_ [1..numWorkers] (\n -> forkIO
(worker_thread n workerChan logChan installChan))
_ <- ($) forkIO $ log_thread logChan
_ <- ($) forkIO $ install_thread installChan logChan controlChan
-- Start the control thread
control_thread controlChan workerChan packageGraph
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment