haskellconduitnetwork-conduit

Chunk data with Conduit


Here is an example of a conduit combinator that supposed to yield downstream when a complete message is received from upstream:

import qualified Data.ByteString as BS
import Data.Conduit
import Data.Conduit.Combinators
import Data.Conduit.Network

message :: Monad m => ConduitT BS.ByteString BS.ByteString m ()
message = loop
  where
    loop = await >>= maybe (return ()) go
    go x = if (BS.isSuffixOf "|" x)
        then yield (BS.init x) >> loop
        else leftover x

Server code itself looks like following:

main :: IO ()
main = do
  runTCPServer (serverSettings 5000 "!4") $ \ appData -> runConduit $
    (appSource appData)
    .| message
    .| (appSink appData)

For some reason telnet 127.0.0.1 5000 disconnects after sending any message:

telnet 127.0.0.1 5000
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
123|
Connection closed by foreign host.

Please advice, what am I doing wrong here?

Update

More importantly what I try doing here is wait for completion signal | and then yield the complete message downstream. Here is the evolution of message combinator:

message :: Monad m => ConduitT BS.ByteString BS.ByteString m ()
message = do
  minput <- await
  case minput of
    Nothing    -> return ()
    Just input -> do
      case BS.breakSubstring "|" input of
        ("", "")  -> return ()
        ("", "|") -> return ()
        ("", xs)  -> leftover $ BS.tail xs
        (x, "")   -> leftover x -- problem is in this leftover
        (x, xs)   -> do
          yield x
          leftover $ BS.tail xs
      message

The idea I had is that if there is nothing coming from the upstream combinator will have to wait until there will be something, such that it can send a complete message downstream. But it seams that conduit starts spinning on CPU a lot on that leftover call in the above message combinator.


Solution

  • Finally figured out that it was necessary to await instead of leftover on the base case. Here is how working message combinator looks like:

    message :: Monad m => ConduitT BS.ByteString BS.ByteString m ()
    message = do
      minput <- await
      case minput of
        Nothing    -> return ()
        Just input -> process input >> message
      where
        process input =
          case BS.breakSubstring "|" input of
            ("", "")  -> return ()
            ("", "|") -> return ()
            ("", xs)  -> leftover $ BS.tail xs
            (x, "")   -> do
              minput <- await
              case minput of
                Nothing -> return ()
                Just newInput -> process $ BS.concat [x, newInput]
            (x, xs)   -> do
              yield x
              leftover $ BS.tail xs
    

    A bit of boilerplate that can probably be cleaned up, but it works.