I'm currently playing around with Polysemy, rewriting a small toy project of mine to get used to it. I'm stumbling upon a piece of code that uses pooledMapConcurrentlyN
, so basically a parallel version of traverse with bounded concurrency.
I can strip my example down to this:
foo :: Sem r Int
foo = do
res <- pooledMapConcurrentlyN 3 action (["foo", "bar", "baz"] :: [String])
pure $ sum res
action :: String -> Sem r Int
action = pure. length
This doesn't compile because there's no instance for MonadUnliftIO (Sem r)
. It does compile when I use traverse
, but I'm looking for a concurrent version. I'm not sure which way I should go now.
I see the following options:
MonadUnliftIO (Sem r)
instance. I see that there were some discussions about adding/implementing such an instance in this GitHub issue. However, it's not clear to me whether it's a good idea to do so.pooledMapConcurrentlyN
that gives me an equivalent behavior. I know that there's parTraverse
from the par-dual package, but that would require a ParDual
instance. The parallel
package could make a solution possible as well, but I'm not familiar with that so I can't tell if it's possible.data ParTraverse m a where
TraverseP :: (Traversable t) => Int -> (a -> m b) -> t a -> ParTraverse m (t b)
I'm not really familiar yet with neither GADTs nor Polysemy, so it's possible that I'm missing something obvious here.
EDIT: As pointed out in the answer below, the most appropriate solution is to model this as an effect and handle the concurrency in the effect interpretation as opposed to the business logic. This means that I'm looking for a higher order effect (?) similar to the ParTraverse
effect above:
data ParTraverse m a where
TraverseP :: (Traversable t) => (a -> m b) -> t a -> ParTraverse m (t b)
makeSem ''ParTraverse
parTraverseToIO :: (Member (Embed IO) r) => Sem (ParTraverse ': r) a -> Sem r a
parTraverseToIO = interpretH $ \case
TraverseP f ta -> do
_something
I'm not sure whether this type signature is correct or not (should the action have type a -> Sem r b
? The signature for traverse
has an Applicative
constraint on m
, how would I model that?)
As for the ParTraverse
implementation, this is what I replied over on github, for a version specialized to []
for t
:
pooledMapConcurrently :: Member (Final IO) r => Int -> (a -> Sem r b) -> [a] -> Sem r [Maybe b]
pooledMapConcurrently num f ta =
...
data ParTraverse m a where
TraverseP :: (a -> m b) -> [a] -> ParTraverse m [b]
makeSem ''ParTraverse
parTraverseToIO :: (Member (Final IO) r) => InterpreterFor ParTraverse r
parTraverseToIO =
interpretH \case
TraverseP f ta -> do
taT <- traverse pureT ta
fT <- bindT f
tb <- raise (parTraverseToIO (pooledMapConcurrently 1 fT taT))
ins <- getInspectorT
pureT (catMaybes (inspect ins <$> catMaybes tb))
Some explanations for the combinators used inside interpretH
, where we operate in the Tactical
environment:
a -> m b
, where m
is instantiated to Sem rInitial
inside the interpreter, we have to use bindT
to get a function that is something like f a -> Sem r (f b)
, with f
being the monadic state of the interpreters.pooledMapConcurrently
on the Sem rInitial
directly, because Member (Final IO)
is only given for r
.ta
contains the input for f
, but since we lifted that to expect f a
, we also have to call pureT
on each element of ta
, using traverse
since it is a monadic action.bindT
(and runT
) produce Sem
s that still have the current effect, ParTraverse
, at the head, because the effect has to be interpreted within the wrapped Sem
(passed in as a -> m b
). This even allows to use a different interpreter for the inner program. In our case, we simply run parTraverseToIO
on the result of f
again. After that, we have to lift this Sem
back into the Tactical
environment (which is just another effect at the head), so we use raise
.f
produces f (Maybe b)
as result, we need to unpack this in order to get the return type right. For that, we can use the inspector, which transforms f
to Maybe
, giving us Maybe (Maybe b)
, which we can then flatten into a list.For completeness, here's the implementation of pooledMapConcurrently
, written by KingoftheHomeless:
pooledMapConcurrently :: (Member (Final IO) r, Traversable t) => Int -> (a -> Sem r b) -> t a -> Sem r (t (Maybe b))
pooledMapConcurrently i f t = withWeavingToFinal $ \s wv ins ->
(<$ s) <$> pooledMapConcurrentlyIO i (\a -> ins <$> wv (f a <$ s)) t