
Chunk data with Conduit

Here is an example of a conduit combinator that supposed to yield downstream when a complete message is received from upstream:

import qualified Data.ByteString as BS
import Data.Conduit
import Data.Conduit.Combinators
import Data.Conduit.Network

message :: Monad m => ConduitT BS.ByteString BS.ByteString m ()
message = loop
    loop = await >>= maybe (return ()) go
    go x = if (BS.isSuffixOf "|" x)
        then yield (BS.init x) >> loop
        else leftover x

Server code itself looks like following:

main :: IO ()
main = do
  runTCPServer (serverSettings 5000 "!4") $ \ appData -> runConduit $
    (appSource appData)
    .| message
    .| (appSink appData)

For some reason telnet 5000 disconnects after sending any message:

telnet 5000
Connected to
Escape character is '^]'.
Connection closed by foreign host.

Please advice, what am I doing wrong here?


More importantly what I try doing here is wait for completion signal | and then yield the complete message downstream. Here is the evolution of message combinator:

message :: Monad m => ConduitT BS.ByteString BS.ByteString m ()
message = do
  minput <- await
  case minput of
    Nothing    -> return ()
    Just input -> do
      case BS.breakSubstring "|" input of
        ("", "")  -> return ()
        ("", "|") -> return ()
        ("", xs)  -> leftover $ BS.tail xs
        (x, "")   -> leftover x -- problem is in this leftover
        (x, xs)   -> do
          yield x
          leftover $ BS.tail xs

The idea I had is that if there is nothing coming from the upstream combinator will have to wait until there will be something, such that it can send a complete message downstream. But it seams that conduit starts spinning on CPU a lot on that leftover call in the above message combinator.


  • Finally figured out that it was necessary to await instead of leftover on the base case. Here is how working message combinator looks like:

    message :: Monad m => ConduitT BS.ByteString BS.ByteString m ()
    message = do
      minput <- await
      case minput of
        Nothing    -> return ()
        Just input -> process input >> message
        process input =
          case BS.breakSubstring "|" input of
            ("", "")  -> return ()
            ("", "|") -> return ()
            ("", xs)  -> leftover $ BS.tail xs
            (x, "")   -> do
              minput <- await
              case minput of
                Nothing -> return ()
                Just newInput -> process $ BS.concat [x, newInput]
            (x, xs)   -> do
              yield x
              leftover $ BS.tail xs

    A bit of boilerplate that can probably be cleaned up, but it works.