haskellconcurrencystreamconduithaskell-pipes

Idiomatic prefetching in a streaming library


I'm working with the streaming library but would accept an answer using pipes or conduit.

Say I have

import Streaming (Stream, Of)
import qualified Streaming.Prelude as S

streamChunks :: Int -> Stream (Of Thing) IO ()
streamChunks lastID = do
  flip fix 0 $ \go thingID ->
    unless (thingID > lastID) $ do
      thing <- highLatencyGet thingID
      S.yield thing
      go (thingID+1)

To reduce latency I'd like to fork highLatencyGet to retrieve the next Thing in parallel with processing the previous Thing in the consumer.

Obviously I could transform my function above creating a new MVar and forking the next batch before calling yield, etc.

But I want to know if there is an idiomatic (composable) way to do this, such that it could be packaged in a library and could be used on arbitrary IO Streams. Ideally we could configure the prefetch value as well, like:

prefetching :: Int -> Stream (Of a) IO () -> Stream (Of a) IO ()

Solution

  • This solution uses pipes but it could be easily adapted to use streaming. To be precise, it requires the pipes, pipes-concurrency and async packages.

    It doesn't work in a "direct" style. Instead of simply transforming the Producer, it also takes a "folding function" that consumes a Producer. This continuation-passing style is necessary for setting up and tearing down the concurrency mechanism.

    import Pipes
    import Pipes.Concurrent (spawn',bounded,fromInput,toOutput,atomically)
    import Control.Concurrent.Async (Concurrently(..))
    import Control.Exception (finally)
    
    prefetching :: Int -> Producer a IO () -> (Producer a IO () -> IO r) -> IO r
    prefetching bufsize source foldfunc = do
        (outbox,inbox,seal) <- spawn' (bounded bufsize)
        let cutcord effect = effect `finally` atomically seal
        runConcurrently $
            Concurrently (cutcord (runEffect (source >-> toOutput outbox)))
            *>
            Concurrently (cutcord (foldfunc (fromInput inbox)))
    

    The output of the original producer is redirected to a bounded queue. At the same time, we apply the producer-folding function to a producer that reads from the queue.

    Whenever each of the concurrent actions completes, we take care to promptly close the channel to avoid leaving the other side hanging.