Last active
August 20, 2024 18:42
-
-
Save LSLeary/da963da122946d981b4b143cbcf3dd73 to your computer and use it in GitHub Desktop.
A thread-safe IC implementation supporting both eager parallel propagation and lazy demand driven evaluation.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE DerivingVia, BlockArguments, LambdaCase #-} | |
module Control.Concurrent.Incremental ( | |
Adaptive, adaptively, | |
static, dynamic, | |
ICVar, newICVar, newICVarIO, | |
demand, propagate, | |
compute, await, | |
writeICVar_, writeICVar, writeICVar', | |
modifyICVar_, modifyICVar, modifyICVar', | |
) where | |
-- base | |
import GHC.Exts (Any) | |
import Unsafe.Coerce (unsafeCoerce) | |
import Data.Type.Coercion(TestCoercion(..), Coercion(..)) | |
import Data.Unique (Unique, newUnique) | |
import Data.Monoid (Ap(..)) | |
import Data.Function (on) | |
import Data.Functor (void, ($>)) | |
import Data.Foldable (for_, traverse_) | |
import Control.Applicative (Alternative) | |
import Control.Monad (join) | |
import Control.Monad.Fix (MonadFix) | |
import Control.Concurrent (forkIO, ThreadId, myThreadId, killThread) | |
-- stm | |
import Control.Concurrent.STM (STM, atomically, retry) | |
import Control.Concurrent.STM.TVar | |
(TVar, newTVarIO, readTVar, readTVarIO, writeTVar, modifyTVar) | |
-- containers | |
import Data.Set (Set, (\\)) | |
import qualified Data.Set as S | |
-- transformers | |
import Control.Monad.Trans.Reader (ReaderT(..)) | |
-- The core state of an 'ICVar'. | |
data Value a | |
= Dirty | |
| Computing {-# UNPACK #-} !ThreadId | |
| Value !a | |
-- | The @Monad@ in which 'ICVar' recipes are specified. | |
newtype Adaptive a = Adaptive (AdaptEnv -> IO a) | |
deriving (Functor, Applicative, Alternative, Monad, MonadFix) | |
via ReaderT AdaptEnv IO | |
deriving (Semigroup, Monoid) | |
via Ap Adaptive a | |
newtype AdaptEnv = AdaptEnv{ observedDeps :: TVar (Set SomeICVar) } | |
runAdaptive :: Adaptive a -> AdaptEnv -> IO a | |
runAdaptive (Adaptive f) = f | |
inIO :: IO a -> Adaptive a | |
inIO = Adaptive . const | |
-- | Sidestep the 'ICVar' to execute a recipe directly. | |
adaptively :: Adaptive a -> IO a | |
adaptively ad = do | |
observedDeps <- newTVarIO S.empty | |
runAdaptive ad AdaptEnv{observedDeps} | |
-- | Depend statically upon an 'ICVar'. | |
static :: ICVar x -> Adaptive (x -> a) -> Adaptive a | |
static icv adxa = do | |
inIO (compute icv) | |
observe icv | |
adxa <*> inIO (atomically (await icv)) | |
-- | Depend dynamically upon an 'ICVar'. | |
dynamic :: ICVar a -> Adaptive a | |
dynamic icv = do | |
observe icv | |
inIO (demand icv) | |
observe :: ICVar a -> Adaptive () | |
observe icv = Adaptive \AdaptEnv{observedDeps} -> | |
(atomically . modifyTVar observedDeps . S.insert . some) icv | |
-- | An incrementally computed mutable variable @a@. | |
data ICVar a = ICVar | |
{ identifier :: {-# UNPACK #-} !Unique | |
, value :: {-# UNPACK #-} !(TVar (Value a)) | |
, recipe :: {-# UNPACK #-} !(TVar (Adaptive a)) | |
, depends :: {-# UNPACK #-} !(TVar (Set SomeICVar)) | |
, revdeps :: {-# UNPACK #-} !(TVar (Set SomeICVar)) | |
} | |
instance Eq (ICVar a) where (==) = (==) `on` identifier | |
instance Ord (ICVar a) where compare = compare `on` identifier | |
instance TestCoercion ICVar where | |
(icv1 :: ICVar a) `testCoercion` icv2 | |
| identifier icv1 == identifier icv2 = Just (unsafeCoerce refl) | |
| otherwise = Nothing | |
where refl = Coercion @a @a | |
-- | Given a recipe for computing an @a@ adaptively, produce an @'ICVar' a@ | |
-- in a recipe context. | |
newICVar :: Adaptive a -> Adaptive (ICVar a) | |
newICVar = inIO . newICVarIO | |
-- | Given a recipe for computing an @a@ adaptively, produce an @'ICVar' a@. | |
newICVarIO :: Adaptive a -> IO (ICVar a) | |
newICVarIO rcp = ICVar | |
<$> newUnique | |
<*> newTVarIO Dirty | |
<*> newTVarIO rcp | |
<*> newTVarIO S.empty | |
<*> newTVarIO S.empty | |
-- | Demand the value of an 'ICVar', triggering its computation and that of its | |
-- dependencies if necessary. | |
demand :: ICVar a -> IO a | |
demand icv = do | |
compute icv | |
atomically (await icv) | |
-- | Compute an 'ICVar's transitive reverse dependencies in the background. | |
propagate :: ICVar a -> IO () | |
propagate icv@ICVar{revdeps} = do | |
compute icv | |
readTVarIO revdeps >>= traverse_ (existentially propagate) | |
-- | Compute an 'ICVar' in the background. | |
compute :: ICVar a -> IO () | |
compute icv@ICVar{value} = (void . forkIO) do | |
threadId <- myThreadId | |
(join . atomically) do | |
readTVar value >>= \case | |
Dirty -> writeTVar value (Computing threadId) $> evaluate icv | |
_ -> mempty | |
evaluate :: ICVar a -> IO () | |
evaluate icv@ICVar{value,recipe,depends} = do | |
rcp <- readTVarIO recipe | |
observedDeps <- newTVarIO S.empty | |
result <- runAdaptive rcp AdaptEnv{observedDeps} | |
atomically do | |
writeTVar value (Value result) | |
olddeps <- readTVar depends | |
newdeps <- readTVar observedDeps | |
writeTVar depends newdeps | |
let deletions = olddeps \\ newdeps | |
insertions = newdeps \\ olddeps | |
for_ deletions $ existentially \ICVar{revdeps} -> | |
modifyTVar revdeps (S.delete (some icv)) | |
for_ insertions $ existentially \ICVar{revdeps} -> | |
modifyTVar revdeps (S.insert (some icv)) | |
-- | Wait on the computation of an 'ICVar'. | |
await :: ICVar a -> STM a | |
await ICVar{value} = readTVar value >>= \case | |
Value x -> pure x | |
_ -> retry | |
writeICVar_, writeICVar, writeICVar' :: ICVar a -> Adaptive a -> IO () | |
-- | Replace an 'ICVar's recipe lazily. | |
writeICVar_ icv = modifyICVar_ icv . const | |
-- | Replace an 'ICVar's recipe and recompute it in the background. | |
writeICVar icv = modifyICVar icv . const | |
-- | Replace an 'ICVar's recipe and 'propagate' the change. | |
writeICVar' icv = modifyICVar' icv . const | |
modifyICVar_, modifyICVar, modifyICVar' | |
:: ICVar a -> (Adaptive a -> Adaptive a) -> IO () | |
-- | Modify an 'ICVar's recipe lazily. | |
modifyICVar_ icv@ICVar{recipe} f = (join . atomically) do | |
modifyTVar recipe f | |
dirty icv | |
-- | Modify an 'ICVar's recipe and recompute it in the background. | |
modifyICVar icv f = do | |
modifyICVar_ icv f | |
compute icv | |
-- | Modify an 'ICVar's recipe and 'propagate' the change. | |
modifyICVar' icv f = do | |
modifyICVar_ icv f | |
propagate icv | |
dirty :: ICVar a -> STM (IO ()) | |
dirty icv@ICVar{value,revdeps} = readTVar value >>= \case | |
Dirty -> mempty | |
Computing threadId -> pure (killThread threadId >> compute icv) | |
<> doTheDirty | |
Value _ -> doTheDirty | |
where | |
doTheDirty = do | |
writeTVar value Dirty | |
readTVar revdeps >>= foldMap (existentially dirty) | |
-- Essentially @Data.Some.Newtype.Some ICVar@, however we cannot use the 'some' | |
-- package for this because the Eq and Ord instances for @Some f@ are based on | |
-- GEq and GOrd, which presume /nominality/ of @f@ in the hidden parameter. | |
-- That's too strong for us, as @ICVar a@ is only /representational/ in @a@. | |
newtype SomeICVar = UnsafeSome (ICVar Any) | |
instance Eq SomeICVar where (==) = (==) `on` existentially identifier | |
instance Ord SomeICVar where compare = compare `on` existentially identifier | |
some :: ICVar a -> SomeICVar | |
some = UnsafeSome . unsafeCoerce | |
existentially :: (forall x. ICVar x -> a) -> SomeICVar -> a | |
existentially k (UnsafeSome icv) = k (unsafeCoerce icv) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment