Created
September 20, 2021 16:14
-
-
Save khibino/d21370b441fe06b29eac0758b9741160 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{--# OPTIONS_GHC -Wno-name-shadowing #-} | |
import Control.Concurrent (forkIO) | |
import Control.Concurrent (threadDelay) | |
import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan) | |
import Control.Monad (void, replicateM, replicateM_) | |
import System.IO (BufferMode (LineBuffering), hSetBuffering, stdout) | |
----- | |
newtype Queue k a = Queue (Chan a) | |
newQueue :: IO (Queue k a) | |
newQueue = Queue <$> newChan | |
readQueue :: Queue k a -> IO a | |
readQueue (Queue c) = readChan c | |
writeQueue :: Queue k a -> a -> IO () | |
writeQueue (Queue c) = writeChan c | |
closeQueue :: Queue k (Maybe a) -> IO () | |
closeQueue (Queue c) = writeChan c Nothing | |
----- | |
type JobId = Int | |
data Dep | |
data Cap | |
data Runnable | |
data Resolve | |
data Result | |
type DepQ a = Queue Dep (JobId, a) | |
type CapQ = Queue Cap () | |
type RunnableQ a = Queue Runnable (Maybe (Job a)) | |
type ResolveQ = Queue Resolve () | |
type ResultQ a = Queue Result a | |
data WaitDepends a = | |
WaitDepends | |
{ depQueue :: DepQ a | |
, depCount :: Int | |
} | |
data Job a = | |
Job | |
{ jobId :: JobId | |
, action :: IO a | |
, depends :: Maybe (WaitDepends a) | |
, notifyList :: [DepQ a] | |
} | |
invokeJobs :: CapQ -> RunnableQ a -> ResultQ a -> IO () | |
invokeJobs capQ runQ resQ = loop | |
where | |
loop = do | |
readQueue capQ -- wait to get one capability | |
maybe (pure () {- end with sentinel -}) ((*> loop) . invoke) =<< readQueue runQ | |
invoke rj = forkIO $ do | |
x <- action rj | |
mapM_ (\dq -> writeQueue dq (jobId rj, x)) $ notifyList rj | |
writeQueue resQ x -- push job result | |
writeQueue capQ () -- release capability | |
waitDepends :: RunnableQ a -> ResolveQ -> Job a -> IO () | |
waitDepends runQ rsvQ job@(Job { depends = mayDep }) = do | |
let waitDep dep = replicateM_ (depCount dep) (readQueue $ depQueue dep) | |
maybe (pure ()) waitDep mayDep -- wait dependencies | |
writeQueue runQ (Just job) -- job is runnable | |
writeQueue rsvQ () -- notify resolved | |
waitResolved :: Int -> ResolveQ -> RunnableQ a -> IO () | |
waitResolved count rsvQ runQ = do | |
replicateM_ count (readQueue rsvQ) -- wait resolve deps | |
closeQueue runQ -- close runnable list. stopping invokeJobs | |
waitResult :: Int -> ResultQ a -> IO [a] | |
waitResult count resQ = replicateM count (readQueue resQ) | |
----- | |
data Log | |
type LogQ = Queue Log (Maybe String) | |
getLogger :: IO (IO (), String -> IO (), IO ()) | |
getLogger = do | |
logQ <- newQueue :: IO LogQ | |
let loop = maybe (pure ()) ((*> loop) . putStrLn) =<< readQueue logQ | |
return (loop, writeQueue logQ . Just, closeQueue logQ) | |
----- | |
data Runner a = | |
Runner | |
{ runnable :: RunnableQ a | |
, resolve :: ResolveQ | |
, result :: ResultQ a | |
, jobCount :: !Int | |
, loggerThread :: IO () | |
, putLogLn :: String -> IO () | |
, closeLogger :: IO () | |
} | |
prunner :: Int -- ^ capability count | |
-> IO (Runner a) | |
prunner cap = do | |
capQ <- newQueue | |
runQ <- newQueue | |
rsvQ <- newQueue | |
resQ <- newQueue | |
(logTh, putLn, close) <- getLogger | |
void $ forkIO $ invokeJobs capQ runQ resQ | |
void $ replicateM cap (writeQueue capQ ()) | |
return $ Runner runQ rsvQ resQ 0 logTh putLn close | |
addJob :: Runner a -> [Job a] -> IO (Runner a) | |
addJob r@(Runner { runnable = runQ, resolve = rsvQ, jobCount = n }) jobs = do | |
mapM_ (forkIO . waitDepends runQ rsvQ) jobs | |
return r { jobCount = n + length jobs } | |
waitJobResult :: Runner a -> IO [a] | |
waitJobResult (Runner { runnable = runQ, resolve = rsvQ, result = resQ, jobCount = n }) = do | |
waitResolved n rsvQ runQ | |
waitResult n resQ | |
runParallel :: Int -> [Job a] -> IO [a] | |
runParallel cap jobs = | |
prunner cap >>= (`addJob` jobs) >>= waitJobResult | |
----- | |
delayJob :: (String -> IO ()) -> Int -> IO () | |
delayJob logLn i = do | |
logLn $ "job " ++ show i ++ " started" | |
threadDelay $ (i `rem` 2 + 1) * 1000 * 1000 | |
logLn $ "job " ++ show i ++ " finished" | |
job_ :: (String -> IO ()) -> JobId -> Maybe (WaitDepends ()) -> [DepQ ()] -> Job () | |
job_ logLn i = Job i (delayJob logLn i) | |
exampleJobs :: (String -> IO ()) -> IO ([Job ()]) | |
exampleJobs logLn = do | |
depQ1 <- newQueue | |
depQ2 <- newQueue | |
let job1 = job_ logLn 1 (Just (WaitDepends depQ1 10)) [] | |
job2 = job_ logLn 2 (Just (WaitDepends depQ2 10)) [] | |
jobs = map (\i -> job_ logLn i Nothing [depQ1, depQ2]) $ take 10 [3..] | |
-- job3 = job_ logLn 3 Nothing [depQ1, depQ2] | |
-- job4 = job_ logLn 4 Nothing [depQ1, depQ2] | |
return ([job1, job2] ++ jobs) | |
runExample :: IO () | |
runExample = do | |
hSetBuffering stdout LineBuffering | |
runner <- prunner 4 | |
jobs <- exampleJobs $ putLogLn runner | |
void $ forkIO $ do | |
void $ addJob runner jobs >>= waitJobResult | |
closeLogger runner | |
loggerThread runner |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment