haskellconduitnetwork-conduit

Getting a result from a streaming network protocol with conduit


I am implementing a simple network protocol with conduit; the protocol is a stream of messages, with each message prefixed with a uint32 describing the length of the message. (The message data then has further internal structure, but this isn't important here since I'm okay with reading the whole message into memory before parsing it, as the expected message size is small). The protocol is the same in both directions, with the client sending messages containing requests to the server, and the server returning messages containing the responses (with no concurrency of operations).

My idea was to build the code on top of two simple conduits to go from Message (my own type describing the various possible messages) to ByteString and vice-versa:

import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as LB

data Message = ...
parseMessage :: LB.ByteString -> Message
serializeMessage :: Message -> LB.ByteString

messageReceiver :: Conduit B.ByteString IO Message
messageReceiver = loop
  where
    loop = do
      lenBytes <- takeCE 4 =$= sinkLazy
      message <- takeCE (runGet getWord32be' lenBytes) =$= sinkLazy
      yield $ parseMessage message
      loop

messageSender :: Conduit Message IO B.ByteString
messageSender = concatMapC $ \message ->
  let messageBytes = serializeMessage message
      lenBytes = runPut $ putWord32be' (LB.length messageBytes)
  in map LB.toStrict [lenBytes, messageBytes]

So far, so good; or at least, the code typechecks, although I'm sure there's a more elegant way to write it (especially the loop in messageReceiver). Now I want to write something to connect to the server, send a request, get a response, and disconnect. I wrote this:

runOneCommand request = do
  yield request
  response <- await
  return response

However, I'm not sure how to actually hook this up to the network client source and sink in a way that I get the "response" value back. I tried this:

appSource agent $$ messageReceiver =$= runOneCommand =$= messageSender =$= appSink agent

which fails to compile:

Couldn't match type `Data.Maybe.Maybe SSH.Agent.Message' with `()'
Expected type: conduit-1.2.3.1:Data.Conduit.Internal.Conduit.Conduit
                 SSH.Agent.Message ghc-prim:GHC.Types.IO SSH.Agent.Message
  Actual type: conduit-1.2.3.1:Data.Conduit.Internal.Conduit.ConduitM
                 SSH.Agent.Message
                 SSH.Agent.Message
                 ghc-prim:GHC.Types.IO
                 (Data.Maybe.Maybe SSH.Agent.Message)
In the return type of a call of `Main.runOneCommand'
In the first argument of `(conduit-1.2.3.1:Data.Conduit.Internal.Conduit.=$=)', namely
  `Main.runOneCommand SSH.Agent.RequestIdentities'
In the second argument of `(conduit-1.2.3.1:Data.Conduit.Internal.Conduit.=$=)', namely
  `Main.runOneCommand SSH.Agent.RequestIdentities
   conduit-1.2.3.1:Data.Conduit.Internal.Conduit.=$=
     Main.messageSender
     conduit-1.2.3.1:Data.Conduit.Internal.Conduit.=$=
       Data.Conduit.Network.appSink agent'
Couldn't match type `Data.Maybe.Maybe SSH.Agent.Message' with `()'
Expected type: conduit-1.2.3.1:Data.Conduit.Internal.Conduit.Conduit
                 SSH.Agent.Message ghc-prim:GHC.Types.IO SSH.Agent.Message
  Actual type: conduit-1.2.3.1:Data.Conduit.Internal.Conduit.ConduitM
                 SSH.Agent.Message
                 SSH.Agent.Message
                 ghc-prim:GHC.Types.IO
                 (Data.Maybe.Maybe SSH.Agent.Message)
In the return type of a call of `Main.runOneCommand'
In the first argument of `(conduit-1.2.3.1:Data.Conduit.Internal.Conduit.=$=)', namely
  `Main.runOneCommand SSH.Agent.RequestIdentities'
In the second argument of `(conduit-1.2.3.1:Data.Conduit.Internal.Conduit.=$=)', namely
  `Main.runOneCommand SSH.Agent.RequestIdentities
   conduit-1.2.3.1:Data.Conduit.Internal.Conduit.=$=
     Main.messageSender
     conduit-1.2.3.1:Data.Conduit.Internal.Conduit.=$=
       Data.Conduit.Network.appSink agent'

Assuming I'm following the types correctly here, this is failing because the network client's sink expects a return type of (), not Message, so I guess I need some other form of conduit composition here, but I don't know what.


Solution

  • After posting this question, I found https://stackoverflow.com/a/23925496/31490 which pointed me in the direction of fuseUpstream:

      response <- appSource agent $$ messageReceiver =$= runOneCommand RequestIdentities `fuseUpstream` messageSender `fuseUpstream` appSink agent
    

    It seems the warning about the scariness of the type of fuseUpstream in that answer no longer applies (since the Conduit types were simplfied?); compare:

    (=$=) :: Monad m => Conduit a m b -> ConduitM b c m r -> ConduitM a c m r
    fuseBoth :: Monad m => ConduitM a b m r1 -> ConduitM b c m r2 -> ConduitM a c m (r1, r2)
    fuseUpstream :: Monad m => ConduitM a b m r -> Conduit b m c -> ConduitM a c m r