haskellconduithaskell-warptyped-process

How to Pipe Typed Process to wai-conduit's responseSource?


I want to have warp run a process, then respond with that process' output. The output is assumed to be larger than the server's RAM; loading the entire output then responding is not an option. I'd thought that I could accomplish this using something like

withProcessWait_ (setStdout createSource "cat largefile") (pure . responseSource ok200 [] . getStdout)

but responseSource uses ConduitT i (Flush Builder) IO () and createSource uses ConduitT i ByteString m (). I could not figure how to convert a ByteString conduit to a Flush Builder conduit.


So I devised a solution that seems to work, but it's regrettably less simply defined:

responseProcess :: Status -> ResponseHeaders -> ProcessConfig in out err -> Response
responseProcess s hs cfg = responseStream s hs $ \send flush ->                                                                                                                                
     withProcessWait_ (setStdout createPipe cfg) $ \p@(getStdout -> h) ->
         let loop = do
             bs <- hGetSome h defaultChunkSize
             unless (BS.null bs) (send (byteString bs) *> flush *> loop)
         in loop *> hClose h 

. Is this necessary, even if I may try prettying-it-up by wrapping in mkStreamSpec or something? Or is there a simpler method I'm missing?


edit: comments on the solution:

intersperseC lets me use Chunk and Flush together. That solves the Flush Builder/ByteString conversion problem. I haven't tested it, but it looks right and I trust it's been used.

However, I found that

withProcessWait_ (setStdout createSource "cat largefile") $ \p ->
    responseSource ok200 [] (getStdout p .| mapC (Chunk . byteString) .| intersperseC Flush)

closes the process handle too early. Thus I need to manage the pipe myself: using createPipe instead of createSource. But this means that I need to call hClose at the end, which means that I need a response handler that returns IO (); the only one that does (excepting responseRaw) is responseStream, which uses StreamingBody as an alternative to Conduit. Thus I conclude that my original solution is needed and that Conduit cannot be used for streaming processes. Feel free to correct this if it's incorrect.


Solution

  • responseSource has type

    responseSource :: Status -> ResponseHeaders -> Source IO (Flush Builder) -> Response

    and the definition of Flush is

    data Flush a = Chunk a | Flush

    That is, a value of type Flush Builder is either a Builder or a command that instructs warp to flush the output stream.

    Builder is from the binary package. It's basically a representation of a chunk of bytes, optimized for efficient concatenation. And it can be constructed from a ByteString, using the fromByteString function.

    Knowing that, and using mapC from conduit, we can define this adapter:

    adapter :: :: Monad m => ConduitT ByteString (Flush Builder) m ()
    adapter = mapC (Chunk . fromByteString) 
    

    There's a problem though, the adapter never flushes. But we can intersperse flusing commands by means of intersperseC:

    adapter :: :: Monad m => ConduitT ByteString (Flush Builder) m ()
    adapter = mapC (Chunk . fromByteString) .| intersperseC Flush
    

    And what if we don't want to flush after every chunk? Perhaps we could use chunksOfCE to group the byte chunks before converting them into Flush values.