I have a value body :: BS.ByteString (ResourceT IO) ()
, from a function based on BS.readFile
. I want to stream that value as the response body from a Wai Application
. There's a helper, streamingResponse
that takes a value of the type Stream (Of ByteString) IO r
. I'm able to convert my BS.ByteString (ResourceT IO) ()
to Stream (Of ByteString) (ResourceT IO) ()
through the use of BS.toChunks
, but it contains an extra ResourceT
monad layer. Passing the body
to streamingResponse
gives me:
Couldn't match type ‘ResourceT IO’ with ‘IO’
Expected type: Stream (Of ByteString) IO ()
Actual type: Stream (Of ByteString) (ResourceT IO) ()
I've tried various things like wrapping things in runResourceT, binding and hoisting values etc. but really have no idea how to proceed. Here's the line in the full project if extra context is required.
hoist runResourceT body
seems to type check. Someone also referred me to a Haskell Pipes thread, which may be a very related problem, and possible hint toward a solution.
If we want to allow Stream
s that live in ResourceT
, we can do without the functions from streaming-wai (that only work for Stream
s based on IO
) and instead build on top of functions like responseStream
from network-wai:
import Control.Monad.Trans.Resource
import Network.Wai
import Streaming
import qualified Streaming.Prelude as S
import Data.ByteString.Builder (byteString, Builder)
streamingResponseR :: Stream (Of ByteString) (ResourceT IO) r
-> Status
-> ResponseHeaders
-> Response
streamingResponseR stream status headers =
responseStream status headers streamingBody
where
streamingBody writeBuilder flush =
let writer a =
do liftIO (writeBuilder (byteString a))
-- flushes for every produced bytestring, perhaps not optimal
liftIO flush
in runResourceT $ void $ S.effects $ S.for stream writer
streamingBody
has type StreamingBody
, which is actually a type synonym for a function (Builder -> IO ()) -> IO () -> IO ()
that takes a write callback and a flush callback as parameters, and uses them to write the response using some source of data that is in scope. (Note that these callbacks are provided by WAI, not by the user.)
In our case, the source of data is a Stream
that lives in ResourceT
. We need to lift the write and flush callbacks (that live in IO
) using liftIO
, an also remember to invoke runResourceT
to return a plain IO
action at the end.
What if we wanted to flush the response only after the accumulated length of the emitted bytestrings reached some limit?
We would need a function (not implemented here) to create a division each time the limit is reached:
breaks' :: Monad m
=> Int
-> Stream (Of ByteString) m r
-> Stream (Stream (Of ByteString) m) m r
breaks' breakSize = undefined
And then we could intercalate the flushing action between each group using intercalates
, before writing the stream:
streamingBodyFrom :: Stream (Of ByteString) (ResourceT IO) ()
-> Int
-> StreamingBody
streamingBodyFrom stream breakSize writeBuilder flush =
let writer a = liftIO (writeBuilder (byteString a))
flusher = liftIO flush
broken = breaks' breakSize stream
in runResourceT . S.mapM_ writer . S.intercalates flusher $ broken