I am implementing a simple network protocol with conduit; the protocol is a stream of messages, with each message prefixed with a uint32 describing the length of the message. (The message data then has further internal structure, but this isn't important here since I'm okay with reading the whole message into memory before parsing it, as the expected message size is small). The protocol is the same in both directions, with the client sending messages containing requests to the server, and the server returning messages containing the responses (with no concurrency of operations).
My idea was to build the code on top of two simple conduits to go from Message
(my own type describing the various possible messages) to ByteString
and vice-versa:
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as LB
data Message = ...
parseMessage :: LB.ByteString -> Message
serializeMessage :: Message -> LB.ByteString
messageReceiver :: Conduit B.ByteString IO Message
messageReceiver = loop
where
loop = do
lenBytes <- takeCE 4 =$= sinkLazy
message <- takeCE (runGet getWord32be' lenBytes) =$= sinkLazy
yield $ parseMessage message
loop
messageSender :: Conduit Message IO B.ByteString
messageSender = concatMapC $ \message ->
let messageBytes = serializeMessage message
lenBytes = runPut $ putWord32be' (LB.length messageBytes)
in map LB.toStrict [lenBytes, messageBytes]
So far, so good; or at least, the code typechecks, although I'm sure there's a more elegant way to write it (especially the loop in messageReceiver
). Now I want to write something to connect to the server, send a request, get a response, and disconnect. I wrote this:
runOneCommand request = do
yield request
response <- await
return response
However, I'm not sure how to actually hook this up to the network client source and sink in a way that I get the "response" value back. I tried this:
appSource agent $$ messageReceiver =$= runOneCommand =$= messageSender =$= appSink agent
which fails to compile:
Couldn't match type `Data.Maybe.Maybe SSH.Agent.Message' with `()'
Expected type: conduit-1.2.3.1:Data.Conduit.Internal.Conduit.Conduit
SSH.Agent.Message ghc-prim:GHC.Types.IO SSH.Agent.Message
Actual type: conduit-1.2.3.1:Data.Conduit.Internal.Conduit.ConduitM
SSH.Agent.Message
SSH.Agent.Message
ghc-prim:GHC.Types.IO
(Data.Maybe.Maybe SSH.Agent.Message)
In the return type of a call of `Main.runOneCommand'
In the first argument of `(conduit-1.2.3.1:Data.Conduit.Internal.Conduit.=$=)', namely
`Main.runOneCommand SSH.Agent.RequestIdentities'
In the second argument of `(conduit-1.2.3.1:Data.Conduit.Internal.Conduit.=$=)', namely
`Main.runOneCommand SSH.Agent.RequestIdentities
conduit-1.2.3.1:Data.Conduit.Internal.Conduit.=$=
Main.messageSender
conduit-1.2.3.1:Data.Conduit.Internal.Conduit.=$=
Data.Conduit.Network.appSink agent'
Couldn't match type `Data.Maybe.Maybe SSH.Agent.Message' with `()'
Expected type: conduit-1.2.3.1:Data.Conduit.Internal.Conduit.Conduit
SSH.Agent.Message ghc-prim:GHC.Types.IO SSH.Agent.Message
Actual type: conduit-1.2.3.1:Data.Conduit.Internal.Conduit.ConduitM
SSH.Agent.Message
SSH.Agent.Message
ghc-prim:GHC.Types.IO
(Data.Maybe.Maybe SSH.Agent.Message)
In the return type of a call of `Main.runOneCommand'
In the first argument of `(conduit-1.2.3.1:Data.Conduit.Internal.Conduit.=$=)', namely
`Main.runOneCommand SSH.Agent.RequestIdentities'
In the second argument of `(conduit-1.2.3.1:Data.Conduit.Internal.Conduit.=$=)', namely
`Main.runOneCommand SSH.Agent.RequestIdentities
conduit-1.2.3.1:Data.Conduit.Internal.Conduit.=$=
Main.messageSender
conduit-1.2.3.1:Data.Conduit.Internal.Conduit.=$=
Data.Conduit.Network.appSink agent'
Assuming I'm following the types correctly here, this is failing because the network client's sink expects a return type of ()
, not Message
, so I guess I need some other form of conduit composition here, but I don't know what.
After posting this question, I found https://stackoverflow.com/a/23925496/31490 which pointed me in the direction of fuseUpstream
:
response <- appSource agent $$ messageReceiver =$= runOneCommand RequestIdentities `fuseUpstream` messageSender `fuseUpstream` appSink agent
It seems the warning about the scariness of the type of fuseUpstream
in that answer no longer applies (since the Conduit types were simplfied?); compare:
(=$=) :: Monad m => Conduit a m b -> ConduitM b c m r -> ConduitM a c m r
fuseBoth :: Monad m => ConduitM a b m r1 -> ConduitM b c m r2 -> ConduitM a c m (r1, r2)
fuseUpstream :: Monad m => ConduitM a b m r -> Conduit b m c -> ConduitM a c m r