Skip to content

Instantly share code, notes, and snippets.

@LSLeary
Last active August 20, 2024 18:42
Show Gist options
  • Save LSLeary/da963da122946d981b4b143cbcf3dd73 to your computer and use it in GitHub Desktop.
Save LSLeary/da963da122946d981b4b143cbcf3dd73 to your computer and use it in GitHub Desktop.
A thread-safe IC implementation supporting both eager parallel propagation and lazy demand driven evaluation.
{-# LANGUAGE DerivingVia, BlockArguments, LambdaCase #-}
module Control.Concurrent.Incremental (
Adaptive, adaptively,
static, dynamic,
ICVar, newICVar, newICVarIO,
demand, propagate,
compute, await,
writeICVar_, writeICVar, writeICVar',
modifyICVar_, modifyICVar, modifyICVar',
) where
-- base
import GHC.Exts (Any)
import Unsafe.Coerce (unsafeCoerce)
import Data.Type.Coercion(TestCoercion(..), Coercion(..))
import Data.Unique (Unique, newUnique)
import Data.Monoid (Ap(..))
import Data.Function (on)
import Data.Functor (void, ($>))
import Data.Foldable (for_, traverse_)
import Control.Applicative (Alternative)
import Control.Monad (join)
import Control.Monad.Fix (MonadFix)
import Control.Concurrent (forkIO, ThreadId, myThreadId, killThread)
-- stm
import Control.Concurrent.STM (STM, atomically, retry)
import Control.Concurrent.STM.TVar
(TVar, newTVarIO, readTVar, readTVarIO, writeTVar, modifyTVar)
-- containers
import Data.Set (Set, (\\))
import qualified Data.Set as S
-- transformers
import Control.Monad.Trans.Reader (ReaderT(..))
-- The core state of an 'ICVar'.
data Value a
= Dirty
| Computing {-# UNPACK #-} !ThreadId
| Value !a
-- | The @Monad@ in which 'ICVar' recipes are specified.
newtype Adaptive a = Adaptive (AdaptEnv -> IO a)
deriving (Functor, Applicative, Alternative, Monad, MonadFix)
via ReaderT AdaptEnv IO
deriving (Semigroup, Monoid)
via Ap Adaptive a
newtype AdaptEnv = AdaptEnv{ observedDeps :: TVar (Set SomeICVar) }
runAdaptive :: Adaptive a -> AdaptEnv -> IO a
runAdaptive (Adaptive f) = f
inIO :: IO a -> Adaptive a
inIO = Adaptive . const
-- | Sidestep the 'ICVar' to execute a recipe directly.
adaptively :: Adaptive a -> IO a
adaptively ad = do
observedDeps <- newTVarIO S.empty
runAdaptive ad AdaptEnv{observedDeps}
-- | Depend statically upon an 'ICVar'.
static :: ICVar x -> Adaptive (x -> a) -> Adaptive a
static icv adxa = do
inIO (compute icv)
observe icv
adxa <*> inIO (atomically (await icv))
-- | Depend dynamically upon an 'ICVar'.
dynamic :: ICVar a -> Adaptive a
dynamic icv = do
observe icv
inIO (demand icv)
observe :: ICVar a -> Adaptive ()
observe icv = Adaptive \AdaptEnv{observedDeps} ->
(atomically . modifyTVar observedDeps . S.insert . some) icv
-- | An incrementally computed mutable variable @a@.
data ICVar a = ICVar
{ identifier :: {-# UNPACK #-} !Unique
, value :: {-# UNPACK #-} !(TVar (Value a))
, recipe :: {-# UNPACK #-} !(TVar (Adaptive a))
, depends :: {-# UNPACK #-} !(TVar (Set SomeICVar))
, revdeps :: {-# UNPACK #-} !(TVar (Set SomeICVar))
}
instance Eq (ICVar a) where (==) = (==) `on` identifier
instance Ord (ICVar a) where compare = compare `on` identifier
instance TestCoercion ICVar where
(icv1 :: ICVar a) `testCoercion` icv2
| identifier icv1 == identifier icv2 = Just (unsafeCoerce refl)
| otherwise = Nothing
where refl = Coercion @a @a
-- | Given a recipe for computing an @a@ adaptively, produce an @'ICVar' a@
-- in a recipe context.
newICVar :: Adaptive a -> Adaptive (ICVar a)
newICVar = inIO . newICVarIO
-- | Given a recipe for computing an @a@ adaptively, produce an @'ICVar' a@.
newICVarIO :: Adaptive a -> IO (ICVar a)
newICVarIO rcp = ICVar
<$> newUnique
<*> newTVarIO Dirty
<*> newTVarIO rcp
<*> newTVarIO S.empty
<*> newTVarIO S.empty
-- | Demand the value of an 'ICVar', triggering its computation and that of its
-- dependencies if necessary.
demand :: ICVar a -> IO a
demand icv = do
compute icv
atomically (await icv)
-- | Compute an 'ICVar's transitive reverse dependencies in the background.
propagate :: ICVar a -> IO ()
propagate icv@ICVar{revdeps} = do
compute icv
readTVarIO revdeps >>= traverse_ (existentially propagate)
-- | Compute an 'ICVar' in the background.
compute :: ICVar a -> IO ()
compute icv@ICVar{value} = (void . forkIO) do
threadId <- myThreadId
(join . atomically) do
readTVar value >>= \case
Dirty -> writeTVar value (Computing threadId) $> evaluate icv
_ -> mempty
evaluate :: ICVar a -> IO ()
evaluate icv@ICVar{value,recipe,depends} = do
rcp <- readTVarIO recipe
observedDeps <- newTVarIO S.empty
result <- runAdaptive rcp AdaptEnv{observedDeps}
atomically do
writeTVar value (Value result)
olddeps <- readTVar depends
newdeps <- readTVar observedDeps
writeTVar depends newdeps
let deletions = olddeps \\ newdeps
insertions = newdeps \\ olddeps
for_ deletions $ existentially \ICVar{revdeps} ->
modifyTVar revdeps (S.delete (some icv))
for_ insertions $ existentially \ICVar{revdeps} ->
modifyTVar revdeps (S.insert (some icv))
-- | Wait on the computation of an 'ICVar'.
await :: ICVar a -> STM a
await ICVar{value} = readTVar value >>= \case
Value x -> pure x
_ -> retry
writeICVar_, writeICVar, writeICVar' :: ICVar a -> Adaptive a -> IO ()
-- | Replace an 'ICVar's recipe lazily.
writeICVar_ icv = modifyICVar_ icv . const
-- | Replace an 'ICVar's recipe and recompute it in the background.
writeICVar icv = modifyICVar icv . const
-- | Replace an 'ICVar's recipe and 'propagate' the change.
writeICVar' icv = modifyICVar' icv . const
modifyICVar_, modifyICVar, modifyICVar'
:: ICVar a -> (Adaptive a -> Adaptive a) -> IO ()
-- | Modify an 'ICVar's recipe lazily.
modifyICVar_ icv@ICVar{recipe} f = (join . atomically) do
modifyTVar recipe f
dirty icv
-- | Modify an 'ICVar's recipe and recompute it in the background.
modifyICVar icv f = do
modifyICVar_ icv f
compute icv
-- | Modify an 'ICVar's recipe and 'propagate' the change.
modifyICVar' icv f = do
modifyICVar_ icv f
propagate icv
dirty :: ICVar a -> STM (IO ())
dirty icv@ICVar{value,revdeps} = readTVar value >>= \case
Dirty -> mempty
Computing threadId -> pure (killThread threadId >> compute icv)
<> doTheDirty
Value _ -> doTheDirty
where
doTheDirty = do
writeTVar value Dirty
readTVar revdeps >>= foldMap (existentially dirty)
-- Essentially @Data.Some.Newtype.Some ICVar@, however we cannot use the 'some'
-- package for this because the Eq and Ord instances for @Some f@ are based on
-- GEq and GOrd, which presume /nominality/ of @f@ in the hidden parameter.
-- That's too strong for us, as @ICVar a@ is only /representational/ in @a@.
newtype SomeICVar = UnsafeSome (ICVar Any)
instance Eq SomeICVar where (==) = (==) `on` existentially identifier
instance Ord SomeICVar where compare = compare `on` existentially identifier
some :: ICVar a -> SomeICVar
some = UnsafeSome . unsafeCoerce
existentially :: (forall x. ICVar x -> a) -> SomeICVar -> a
existentially k (UnsafeSome icv) = k (unsafeCoerce icv)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment