Created
May 1, 2022 12:29
-
-
Save Bulat-Ziganshin/a593b11bc6febd2ec201a6bef095bc25 to your computer and use it in GitHub Desktop.
CSP implemented in Haskell
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--------------------------------------------------------------------------------------------------- | |
---- "Взаимодействующие последовательные процессы", как описано в книге Хоара. ---- | |
--------------------------------------------------------------------------------------------------- | |
-- | | |
-- Module : Process | |
-- Copyright : (c) Bulat Ziganshin <[email protected]> | |
-- License : Public domain | |
-- | |
-- Maintainer : [email protected] | |
-- Stability : experimental | |
-- Portability : GHC | |
-- | |
----------------------------------------------------------------------------- | |
module Process where | |
{- | |
Процессы соединяются в цепочку операторами "|>" или "|>>>" и запускаются на выполнение функцией runP: | |
runP( read_files |>>> compress |> write_data ) | |
Процессы исполняются параллельно благодаря тому, что для их запуска используется функция forkOS. | |
Каждый процесс описывается обычной функцией, которая получает дополнительный параметр типа Pipe. | |
С этой переменной можно выполнять операцию receiveP для получения данных от предыдущего | |
процесса в списке, и операцию sendP для посылки данных следующему процессу в списке: | |
compress pipe = foreverM (do data <- receiveP pipe; .....; sendP pipe compressed_data) | |
Данные от процесса к процессу передаются "слева направо". В зависимости от использованной | |
при создании связи между процессами операции - "|>" или "|>>>" - в канал между этими процессами | |
можно поместить только одно или неограниченное кол-во значений (реализуется с помощью MVar/Chan, | |
соответственно). | |
Данные также можно посылать в обратную сторону ("справа налево") операциями send_backP и receive_backP. | |
Канал обратной связи всегда имеет неограниченную ёмкость. Его можно использовать, например, | |
для подтверждения выполнения операций, синхронизации, возвращения использованных ресурсов | |
(например, буферов ввода/вывода): | |
производитель: sendP pipe (buf,len); receive_backP pipe; теперь буфер свободен | |
потребитель: (buf,len) <- receiveP pipe; hPutBuf file buf len; send_backP pipe () | |
Операция runP выполняется синхронно, она завершается по окончании выполнения последнего процесса | |
в цепочке (даже если остальные процессы ещё не завершились). Если первый процесс в списке | |
запускаемых пытается обмениваться с предыдущим (т.е. выполняет операции receiveP/send_backP) или | |
последний процесс пытается обмениваться со следующим - то сигнализируется ошибка. | |
Операция runAsyncP запускает процесс или цепочку процессов асинхронно и возвращает Pipe для обмена | |
с ним(и). В этом случае и первый процесс в цепочке может общаться с "предыдущим", и последний - со | |
"следующим", хотя это и не обязательно: | |
pipe <- runAsyncP compress; sendP pipe data; compressed_data <- receiveP pipe | |
pipe <- runAsyncP( compress |> write_data ); sendP pipe data | |
pipe <- runAsyncP( read_files |>>> compress ); compressed_data <- receiveP pipe | |
runAsyncP( read_files |>>> compress |> write_data ) | |
Входная и выходная очереди асинхронно запущенного процесса - (пока) одноэлементные. | |
-} | |
import Prelude hiding (catch) | |
import Control.Concurrent | |
import Control.OldException | |
import Control.Monad | |
import Data.IORef | |
-- |Операция соединения двух последовательных процессов: | |
-- выходной канал первого становится входным каналом второго. | |
-- "|>" создаёт одноэлементную очередь, а "|>>>" - очередь неограниченной длины | |
infixl 1 |>, |>>> | |
p1 |> p2 = createP p1 p2 newEmptyMVar | |
p1 |>>> p2 = createP p1 p2 newChan | |
createP p1 p2 create_inner (Pipe pid finished income income_back outcome outcome_back) = do | |
inner <- create_inner -- Канал между p1 и p2 (MVar или Chan) | |
inner_back <- newChan -- Обратный канал между p1 и p2 | |
p1_finished <- newEmptyMVar -- Признак завершения выполнения p1 | |
-- Запустим первый процесс в отдельном треде, а второй исполним напрямую | |
p1_id <- forkOS$ (p1 (Pipe pid finished income income_back inner inner_back) >> return ()) | |
`finally` (putMVar p1_finished ()) | |
-- | |
p2 (Pipe (Just p1_id) (Just p1_finished) inner inner_back outcome outcome_back) | |
takeMVar p1_finished | |
return () | |
-- |Запустить комбинированный процесс, созданный операциями "|>" и "|>>>" | |
runP p = do | |
p (Pipe Nothing | |
Nothing | |
(error "First process in runP tried to receive") | |
(error "First process in runP tried to send_back") | |
(error "Last process in runP tried to send") | |
(error "Last process in runP tried to receive_back")) | |
-- |Запустить процесс асинхронно и возвратить канал для обмена с ним | |
runAsyncP p = do | |
income <- newEmptyMVar | |
outcome <- newEmptyMVar | |
income_back <- newChan | |
outcome_back <- newChan | |
parent_id <- myThreadId | |
p_finished <- newEmptyMVar | |
p_id <- forkOS (p (Pipe Nothing Nothing income income_back outcome outcome_back) | |
-- `catch` (\e -> do killThread parent_id; throwIO e) | |
`finally` putMVar p_finished ()) | |
return (Pipe (Just p_id) (Just p_finished) outcome outcome_back income income_back) | |
-- Создадим тред и гарантируем его корректное завершение посылкой спец. значения | |
bracketedRunAsyncP process value = | |
bracket (runAsyncP process) | |
(\pipe -> do sendP pipe value; joinP pipe) | |
-- |Канал обмена с соседними процессами, который получает в своё распоряжение каждый процесс. | |
-- Канал имеет 6 элементов - ИД предыдущего (запущенного асинхронно) процесса, | |
-- MVar-переменная, сигнализирующая о его завершении, | |
-- входные данные, отсылка подтверждений, | |
-- выходные данные, получение подтверждений | |
data Pipe a b c d = Pipe (Maybe ThreadId) (Maybe (MVar ())) a b c d | |
killP pipe@(Pipe (Just pid) _ _ _ _ _) = killThread pid >> joinP pipe | |
joinP (Pipe _ (Just finished) _ _ _ _) = takeMVar finished | |
receiveP (Pipe pid finished income income_back outcome outcome_back) = getP income | |
sendP (Pipe pid finished income income_back outcome outcome_back) = putP outcome | |
receive_backP (Pipe pid finished income income_back outcome outcome_back) = getP outcome_back | |
send_backP (Pipe pid finished income income_back outcome outcome_back) = putP income_back | |
-- |Довольно странная операция - "возвращение" сообщений самому себе - так, как если бы это сделал | |
-- последующий процесс в очереди. Но она нужна для создания начального пула ресурсов, используемых | |
-- процессом | |
send_back_itselfP (Pipe pid finished income income_back outcome outcome_back) = putP outcome_back | |
-- |Элемент канала между процессами - может иметь тип как MVar, так и Chan | |
class PipeElement e where | |
getP :: e a -> IO a | |
putP :: e a -> a -> IO () | |
instance PipeElement MVar where | |
getP = takeMVar | |
putP = putMVar | |
instance PipeElement Chan where | |
getP = readChan | |
putP = writeChan | |
-- |Псевдо-канал процесса - состоит из двух явно заданных функций для получения и посылки данных | |
data PairFunc a = PairFunc (IO a) (a -> IO ()) | |
instance PipeElement PairFunc where | |
getP (PairFunc get_f put_f) = get_f | |
putP (PairFunc get_f put_f) = put_f | |
-- |Процедура запуска процесса с 4 функциями для эмуляции каналов ввода/вывода | |
runFuncP p receive_f send_back_f send_f receive_back_f = | |
p (Pipe Nothing | |
Nothing | |
(PairFunc receive_f undefined) | |
(PairFunc undefined send_back_f) | |
(PairFunc undefined send_f) | |
(PairFunc receive_back_f undefined)) | |
-- |Создать pipe, состоящий просто из каналов "туда" и "обратно" | |
newPipe = do | |
chan <- newChan -- Канал "туда" | |
chan_back <- newChan -- Канал "обратно" | |
return (Pipe Nothing Nothing chan chan_back chan chan_back) | |
{-# NOINLINE createP #-} | |
{-# NOINLINE runP #-} | |
{-# NOINLINE runAsyncP #-} | |
{-# NOINLINE runFuncP #-} | |
{-# NOINLINE newPipe #-} | |
-- Пример использования: | |
{- | |
exampleP = do | |
-- Demonstrates using of "runP" | |
print "runP: before" | |
runP( producer 5 |> transformer (++"*2") |> transformer (++"+1") |> printer "runP" ) | |
print "runP: after" | |
-- Demonstrates using of "runAsyncP" to run computation as parallelly computed function | |
pipe <- runAsyncP (transformer (++" modified")) | |
sendP pipe "value" | |
n <- receiveP pipe | |
print n | |
-- Demonstrates using of "runAsyncP" with "|>" | |
pipe <- runAsyncP( transformer (++"*2") |> transformer (++"+1") ) | |
sendP pipe "7" | |
n <- receiveP pipe | |
print n | |
-- Demonstrates using of "runAsyncP" to run asynchronous process | |
print "runAsyncP: before" | |
pipe <- runAsyncP( producer 7 |> printer "runAsyncP" ) | |
print "runAsyncP: after?" | |
producer n pipe = do | |
mapM_ (sendP pipe.show) [1..n] | |
sendP pipe "0" | |
transformer f pipe = do | |
n <- receiveP pipe | |
sendP pipe (f n) | |
transformer f pipe | |
printer str pipe = do | |
n <- receiveP pipe | |
when (head n/='0')$ do print$ str ++ ": " ++ n | |
printer str pipe | |
-} | |
{- Design principles: | |
1. Процессы в runP должны запускаться слева направо. При небольшом объёме | |
обрабатываемых данных это приведёт к тому, что первый процесс в | |
транспортёре произведёт все необходимые данные и завершится прежде, чем | |
второй и последующие процессы вообще будут запущены | |
2. runP должен запустить все процессы в дополнительных тредах и дожидаться | |
их завершения. Выход из runP желательно производить только после | |
завершения работы всех процессов | |
3. При завершении процесса предыдущий процесс в транспортёре должен получить | |
исключение для того, чтобы завершиться как можно быстрее (после чего | |
предшествующий ему процесс должен получить исключение в свою очередь). | |
Следующий же процесс должен получить только информацию о завершении | |
входных данных при попытке их прочитать (tryReceiveP, eofP) | |
4. При возникновении необработанного исключения в одном из процессов все | |
остальные процессы в транспортёре должны быть прекращены (посылкой сигнала | |
KillThread) и это исключение перевозбуждено в основном процессе | |
5. runP (p1 |> p2 |> protectP p) защищает процесс `p` от возбуждения в нём исключений, | |
вместо этого возникающие ситуации только сигнализируются в состоянии канала | |
6. Для ожидания завершения процесса, запущенного по runAsyncP или | |
предыдущего процесса в транспортёре ввести операцию joinP pipe | |
7. "p |> yP p1 p2" посылает вывод одного процесса двум другим | |
8. killP pipe убивает все процессы из запущенного асинхронно транспортёра | |
9. Нужны удобные и эффективные средства для создания процессов, имеющих | |
несколько входных и/или выходных каналов (использовать getP/putP?) | |
10. new_pipe <- insertOnInputP old_pipe process - вставить новый процесс перед своим входом | |
new_pipe <- insertOnOutputP old_pipe process - вставить новый процесс после своего выхода | |
p1 |> p2 --> PChain p1 p2 ? | |
(p1 |> p2) pipe{ MainThreadId, ref_threads... } | |
p2_threadId <- forkIO $ (p2 pipe2 >> writeIORef pipe.isEof True - по окончании второго процесса) | |
`catch` (throwTo MainThreadId) | |
addToMVar ref_threads p2_threadId | |
p1 pipe1 | |
p1 |> (p2 |> p3) | |
forkIO (forkIO p3; p2) | |
p1 | |
runP p = | |
p_threadId <- forkIO$ p pipe{ MainThreadId = MyThreadId, ref_threads = newIORef [], ...} | |
addToMVar ref_threads p_threadId | |
wait them all `catch` (\e -> mapM killThread ref_threads; throw e) | |
11. Вокруг запущенного треда - катч, который убивает сына, дожидается его завершения и посылает | |
полученное исключение отцу | |
-} | |
{-New design guidelines: | |
1. a|>b запускается как "fork a; b" | |
2. При завершении b дождаться завершения a | |
3. При возникновении в любом из процессов необработанного сигнала | |
надо убивать все порцессы в транспортёре и перевозбуждать этот сигнал в основной порграмме | |
-} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment