I'm working with the streaming library but would accept an answer using pipes or conduit.
Say I have
import Streaming (Stream, Of)
import qualified Streaming.Prelude as S
streamChunks :: Int -> Stream (Of Thing) IO ()
streamChunks lastID = do
flip fix 0 $ \go thingID ->
unless (thingID > lastID) $ do
thing <- highLatencyGet thingID
S.yield thing
go (thingID+1)
To reduce latency I'd like to fork highLatencyGet
to retrieve the next Thing
in parallel with processing the previous Thing
in the consumer.
Obviously I could transform my function above creating a new MVar
and forking the next batch before calling yield
, etc.
But I want to know if there is an idiomatic (composable) way to do this, such that it could be packaged in a library and could be used on arbitrary IO Stream
s. Ideally we could configure the prefetch value as well, like:
prefetching :: Int -> Stream (Of a) IO () -> Stream (Of a) IO ()
This solution uses pipes but it could be easily adapted to use streaming. To be precise, it requires the pipes, pipes-concurrency and async packages.
It doesn't work in a "direct" style. Instead of simply transforming the Producer
, it also takes a "folding function" that consumes a Producer
. This continuation-passing style is necessary for setting up and tearing down the concurrency mechanism.
import Pipes
import Pipes.Concurrent (spawn',bounded,fromInput,toOutput,atomically)
import Control.Concurrent.Async (Concurrently(..))
import Control.Exception (finally)
prefetching :: Int -> Producer a IO () -> (Producer a IO () -> IO r) -> IO r
prefetching bufsize source foldfunc = do
(outbox,inbox,seal) <- spawn' (bounded bufsize)
let cutcord effect = effect `finally` atomically seal
runConcurrently $
Concurrently (cutcord (runEffect (source >-> toOutput outbox)))
*>
Concurrently (cutcord (foldfunc (fromInput inbox)))
The output of the original producer is redirected to a bounded queue. At the same time, we apply the producer-folding function to a producer that reads from the queue.
Whenever each of the concurrent actions completes, we take care to promptly close the channel to avoid leaving the other side hanging.