I have some Haskell code that uses Pipes:
module Main(main) where
import Pipes
a :: Producer Int IO ()
a = each [1..10]
b :: Pipe Int Int IO ()
b = do
x <- await
yield (x*2)
b
c :: Consumer Int IO ()
c = do
x <- await
lift $ print x
c
main :: IO ()
main = runEffect $ a >-> b >-> c
The Pipes.Concurrent tutorial demonstrates using multiple workers along with work stealing. How can I do something similar inside of b
? I would like b
to perform it's work concurrently using a set number of workers.
Obviously, concurrency isn't useful in this exact case, but it's the simplest example I could come up with. In my real use case I'd like to make some web requests concurrently using a limited number of workers.
EDIT: I misunderstood what you were asking; You may be able to do this inside a pipe, but I'm not really sure what the motivation would be. I'd recommend building re-usable pipe chains and just dispatching to them using workers rather than trying to build workers INSIDE the pipe. You'll lose any ordering guarantees that the first in is the first out if you build it into the pipe itself.
The section on Work Stealing is what you're looking for, this code is basically verbatim from the tutorial, but let's break down how it works. Here's one way we could do what you want:
module Main(main) where
import Pipes
import Pipes.Concurrent
import Control.Concurrent.Async (async, wait)
import Control.Concurrent (threadDelay)
import Control.Monad (forM)
a :: Producer Int IO ()
a = each [1..10]
b :: Pipe Int Int IO ()
b = do
x <- await
yield (x*2)
b
c :: Consumer Int IO ()
c = do
x <- await
lift $ print x
c
main :: IO ()
main = do
(output, input) <- spawn unbounded
feeder <- async $ do runEffect $ a >-> toOutput output
performGC
workers <- forM [1..3] $ \i ->
async $ do runEffect $ fromInput input >-> b >-> c
performGC
mapM_ wait (feeder:workers)
The first line spawn unbounded
is from Pipes.Concurrent, it initializes a 'mailbox' that has a handle for input and output. It confused me at first, but in this case we send messages TO the output and pull them FROM the input. This resembles a push-pull message channel in languages like golang.
We specify a Buffer to say how many messages we can store, in this case we set no-limit with unbounded.
Okay, so the mailbox is initialized, we can now create Effect
s which send messages to it. The mailbox channels are implemented using the STM, so that's how it can collect messages asynchronously.
Let's create an asynchronous job that feeds the mailbox;
feeder <- async $ do runEffect $ a >-> toOutput output
performGC
The a >-> toOutput output
is just normal pipe composition, we need toOutput
to convert output into a pipe. Note the performGC
call that's also part of the IO, it allows Pipes.Concurrent to know to clean up after the job has completed. We could run this using forkIO
if we like, but in this case we use async
so that we can wait for the result to finish later on. Okay, so our mailbox should be asynchronously receiving messages, let's pull them out and do some work.
workers <- forM [1..3] $ \i ->
async $ do runEffect $ fromInput input >-> b >-> c
performGC
Same idea as before, but this time we're just spawning a few of them. We read from the input just like a normal pipe using fromInput
and then run it through the rest of our chain, cleaning up when we're done. input
will ensure that each time a value is pulled out that only one worker receives it. When all the jobs feeding into output
complete (it keeps track of all the open jobs) then it will close the input
pipe and the workers will finish.
If you're using this in a web-worker scenario you would have a main loop which keeps sending requests to the toOutput output
channel, and then spawn as many workers as you like who pull into their pipeline from fromInput input
.