I want to have warp run a process, then respond with that process' output. The output is assumed to be larger than the server's RAM; loading the entire output then responding is not an option. I'd thought that I could accomplish this using something like
withProcessWait_ (setStdout createSource "cat largefile") (pure . responseSource ok200 [] . getStdout)
but responseSource
uses ConduitT i (Flush Builder) IO ()
and createSource
uses ConduitT i ByteString m ()
. I could not figure how to convert a ByteString
conduit to a Flush Builder
conduit.
So I devised a solution that seems to work, but it's regrettably less simply defined:
responseProcess :: Status -> ResponseHeaders -> ProcessConfig in out err -> Response
responseProcess s hs cfg = responseStream s hs $ \send flush ->
withProcessWait_ (setStdout createPipe cfg) $ \p@(getStdout -> h) ->
let loop = do
bs <- hGetSome h defaultChunkSize
unless (BS.null bs) (send (byteString bs) *> flush *> loop)
in loop *> hClose h
. Is this necessary, even if I may try prettying-it-up by wrapping in mkStreamSpec
or something? Or is there a simpler method I'm missing?
edit: comments on the solution:
intersperseC
lets me use Chunk
and Flush
together. That solves the Flush Builder
/ByteString
conversion problem. I haven't tested it, but it looks right and I trust it's been used.
However, I found that
withProcessWait_ (setStdout createSource "cat largefile") $ \p ->
responseSource ok200 [] (getStdout p .| mapC (Chunk . byteString) .| intersperseC Flush)
closes the process handle too early. Thus I need to manage the pipe myself: using createPipe
instead of createSource
. But this means that I need to call hClose
at the end, which means that I need a response handler that returns IO ()
; the only one that does (excepting responseRaw
) is responseStream
, which uses StreamingBody
as an alternative to Conduit. Thus I conclude that my original solution is needed and that Conduit cannot be used for streaming processes. Feel free to correct this if it's incorrect.
responseSource
has type
responseSource :: Status -> ResponseHeaders -> Source IO (Flush Builder) -> Response
and the definition of Flush
is
data Flush a = Chunk a | Flush
That is, a value of type Flush Builder
is either a Builder
or a command that instructs warp to flush the output stream.
Builder
is from the binary package. It's basically a representation of a chunk of bytes, optimized for efficient concatenation. And it can be constructed from a ByteString
, using the fromByteString
function.
Knowing that, and using mapC
from conduit, we can define this adapter:
adapter :: :: Monad m => ConduitT ByteString (Flush Builder) m ()
adapter = mapC (Chunk . fromByteString)
There's a problem though, the adapter never flushes. But we can intersperse flusing commands by means of intersperseC
:
adapter :: :: Monad m => ConduitT ByteString (Flush Builder) m ()
adapter = mapC (Chunk . fromByteString) .| intersperseC Flush
And what if we don't want to flush after every chunk? Perhaps we could use chunksOfCE
to group the byte chunks before converting them into Flush
values.