Here is an example of a conduit
combinator that supposed to yield
downstream when a complete message is received from upstream:
import qualified Data.ByteString as BS
import Data.Conduit
import Data.Conduit.Combinators
import Data.Conduit.Network
message :: Monad m => ConduitT BS.ByteString BS.ByteString m ()
message = loop
where
loop = await >>= maybe (return ()) go
go x = if (BS.isSuffixOf "|" x)
then yield (BS.init x) >> loop
else leftover x
Server code itself looks like following:
main :: IO ()
main = do
runTCPServer (serverSettings 5000 "!4") $ \ appData -> runConduit $
(appSource appData)
.| message
.| (appSink appData)
For some reason telnet 127.0.0.1 5000
disconnects after sending any message:
telnet 127.0.0.1 5000
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
123|
Connection closed by foreign host.
Please advice, what am I doing wrong here?
Update
More importantly what I try doing here is wait for completion signal |
and then yield
the complete message downstream. Here is the evolution of message
combinator:
message :: Monad m => ConduitT BS.ByteString BS.ByteString m ()
message = do
minput <- await
case minput of
Nothing -> return ()
Just input -> do
case BS.breakSubstring "|" input of
("", "") -> return ()
("", "|") -> return ()
("", xs) -> leftover $ BS.tail xs
(x, "") -> leftover x -- problem is in this leftover
(x, xs) -> do
yield x
leftover $ BS.tail xs
message
The idea I had is that if there is nothing coming from the upstream combinator will have to wait until there will be something, such that it can send a complete message downstream. But it seams that conduit
starts spinning on CPU a lot on that leftover
call in the above message
combinator.
Finally figured out that it was necessary to await
instead of leftover
on the base case. Here is how working message
combinator looks like:
message :: Monad m => ConduitT BS.ByteString BS.ByteString m ()
message = do
minput <- await
case minput of
Nothing -> return ()
Just input -> process input >> message
where
process input =
case BS.breakSubstring "|" input of
("", "") -> return ()
("", "|") -> return ()
("", xs) -> leftover $ BS.tail xs
(x, "") -> do
minput <- await
case minput of
Nothing -> return ()
Just newInput -> process $ BS.concat [x, newInput]
(x, xs) -> do
yield x
leftover $ BS.tail xs
A bit of boilerplate that can probably be cleaned up, but it works.