haskellconcurrencymonadscontinuation-passing

How to make Koen Claessen's concurrency monad with pure variables?


It's known how to make a pure concurrency monad based on ContT, based on a functional pearl of Koen Claessen:

data Action m where
  Atom :: m (Action m) -> Action m
  Fork :: [Action m] -> Action m
  Stop :: Action m

fork :: Applicative m => [ContT (Action m) m a] -> ContT (Action m) m ()
fork processes = ContT $ \next -> Fork <$> sequenceA (next () : [ process $ const $ pure $ Const | ContT process <- processes ])

How would I implement shared variables like IORefs or MVars? Or at least an async/await mechanism? Bonus points if it's polymorphic in the type of data passed.


Solution

  • I assume by “implement shared variables like IORefs or MVars” you mean in a way other than just having the underlying monad m include IO and using IORef/MVar. That’s straightforward, something like this:

    newVar :: a -> ContT (Action IO) IO (IORef a)
    newVar x = ContT $ \ k -> Atom $ do
      v <- newIORef x
      pure $ k v
    

    A conventional way to add mutable variables to the “poor man’s concurrency monad” purely is by adding additional actions to the Action type for creating, reading, and writing mutable variables. Suppose we have some type Var m a that identifies mutable variables of type a that can be created & accessed in m.

    data Action m where
      Atom :: m (Action m) -> Action m
      Fork :: [Action m] -> Action m
      Stop :: Action m
    
      New   :: (Var m a -> Action m) -> Action m
      Read  :: Var m a -> (a -> Action m) -> Action m
      Write :: Var m a -> a -> Action m -> Action m
    

    Notice that the type parameter a does not appear in the result type of these new constructors, so it’s existentially quantified, and variables may thus contain values of any type. New is an action that continues to another action with a fresh variable as an argument; Read, given a variable, continues to the next action with that variable’s value; and Write, given a variable and a new value, writes the value into the variable before continuing.

    Like fork, these would be constructed with helper functions that produce actions in ContT (Action m) m:

    newVar
      :: (Applicative m)
      => ContT (Action m) m (Var m a)
    newVar = ContT $ \ k -> pure (New (Atom . k))
    
    readVar
      :: (Applicative m)
      => Var m a -> ContT (Action m) m a
    readVar v = ContT $ \ k -> pure (Read v (Atom . k))
    
    writeVar
      :: (Applicative m)
      => Var m a -> a -> ContT (Action m) m ()
    writeVar v x = ContT $ \ k -> pure (Write v x (Atom (k ())))
    

    After that, you just have to decide on a suitable representation of Var. One method is a data family, which makes it relatively easy to use IORef/MVar when IO is available, and something else like an Int index into an IntMap otherwise.

    data family Var (m :: Type -> Type) (a :: Type) :: Type
    
    data instance Var IO a = IOVar { unIOVar :: !(MVar a) }
    

    Of course this is just a sketch; a much more fleshed out implementation can be found in the monad-par package, whose design is described in A Monad for Deterministic Parallelism (Marlow, Newton, & Peyton Jones 2011); its Par monad is basically a continuation monad around an action type like this, and its IVar abstraction is implemented similarly to this, with some additional constraints like extra strictness to enforce determinism and allow pure execution of internally impure code (an IVar secretly wraps an IORef).