haskellhaskell-polysemyeffect-systems

Using pooledMapConcurrentlyN in polysemy


I'm currently playing around with Polysemy, rewriting a small toy project of mine to get used to it. I'm stumbling upon a piece of code that uses pooledMapConcurrentlyN, so basically a parallel version of traverse with bounded concurrency.

I can strip my example down to this:

foo :: Sem r Int
foo = do
  res <- pooledMapConcurrentlyN 3 action (["foo", "bar", "baz"] :: [String])
  pure $ sum res

action :: String -> Sem r Int 
action = pure. length

This doesn't compile because there's no instance for MonadUnliftIO (Sem r). It does compile when I use traverse, but I'm looking for a concurrent version. I'm not sure which way I should go now.

I see the following options:

data ParTraverse m a where
  TraverseP :: (Traversable t) => Int -> (a -> m b) -> t a -> ParTraverse m (t b)

I'm not really familiar yet with neither GADTs nor Polysemy, so it's possible that I'm missing something obvious here.


EDIT: As pointed out in the answer below, the most appropriate solution is to model this as an effect and handle the concurrency in the effect interpretation as opposed to the business logic. This means that I'm looking for a higher order effect (?) similar to the ParTraverse effect above:

data ParTraverse m a where
  TraverseP :: (Traversable t) => (a -> m b) -> t a -> ParTraverse m (t b)

makeSem ''ParTraverse

parTraverseToIO :: (Member (Embed IO) r) => Sem (ParTraverse ': r) a -> Sem r a
parTraverseToIO = interpretH $ \case
  TraverseP f ta -> do
    _something

I'm not sure whether this type signature is correct or not (should the action have type a -> Sem r b? The signature for traverse has an Applicative constraint on m, how would I model that?)


Solution

  • As for the ParTraverse implementation, this is what I replied over on github, for a version specialized to [] for t:

    pooledMapConcurrently :: Member (Final IO) r => Int -> (a -> Sem r b) -> [a] -> Sem r [Maybe b]
    pooledMapConcurrently num f ta =
      ...
    
    data ParTraverse m a where
      TraverseP :: (a -> m b) -> [a] -> ParTraverse m [b]
    
    makeSem ''ParTraverse
    
    parTraverseToIO :: (Member (Final IO) r) => InterpreterFor ParTraverse r
    parTraverseToIO =
      interpretH \case
       TraverseP f ta -> do
         taT <- traverse pureT ta
         fT <- bindT f
         tb <- raise (parTraverseToIO (pooledMapConcurrently 1 fT taT))
         ins <- getInspectorT
         pureT (catMaybes (inspect ins <$> catMaybes tb))
    

    Some explanations for the combinators used inside interpretH, where we operate in the Tactical environment:


    For completeness, here's the implementation of pooledMapConcurrently, written by KingoftheHomeless:

    pooledMapConcurrently :: (Member (Final IO) r, Traversable t) => Int -> (a -> Sem r b) -> t a -> Sem r (t (Maybe b))
    pooledMapConcurrently i f t = withWeavingToFinal $ \s wv ins ->
      (<$ s) <$> pooledMapConcurrentlyIO i (\a -> ins <$> wv (f a <$ s)) t