haskell-pipes

How to use bidirectional pipes properly?


I am trying to use pipes to model a problem for which bidirectional Proxy instances would be ideal. Basically, I have something like the following architecture:

api   logic
  |   ^
  |   | 
  v   | 
  A   A'
 layer 1
  B   B' 
  |   ^
  |   |
  v   |
 layer 2     

so basically, I have layer 1 which is a bidirectional transformer. The model is pull-based so I message stream transformations to be triggered by pulls from the logic component.

So I should have layer1 :: Proxy A' A B' B m x, the idea being that layer1 pulls A from api, does some transformation A -> B then uses the B' from layer2, applies B' -> A' and pass it to the logic.

What is not clear is: I know how to request an A and respond a B but how do I produce the A' from the B'? There does not seem any combinator in the library that fits in here...


Solution

  • There are three types you need to be aware of: Clients can request but never respond, Servers can respond but never request and Proxy can do both.

    The argument to request/respond is the value to send out, and the result you bind is the response/request, respectively. This makes intuitive sense for request (you bind the response), but it took a little while before I got it to click for respond (you bind out the next request). It makes your processing steps neat little recursive functions. (My initial instinct was to use Control.Monad.forever, which works well for unidirectional pipes but is the wrong tool here.)

    The bit that gets confusing: because pipes themselves are synchronous, you need to get an initial value to pass around and kick things off. Either you pass it into request (making a pull pipeline which you compose with (>~>)) or you pass it into respond (making a push pipeline which you compose with (>+>)). Then you pass the initial value into the composed pipeline, giving you the Effect m r that can go to runEffect.

    I've used a pull pipeline in the example below, because it fits your API-request metaphor. It implements this three-stage bidirectional pipeline:

    +--------+  Yoken +-----------+  Token +--------+
    |        |<-------|           |<-------|        |
    | server |        | transform |        | client |
    |        |------->|           |------->|        |
    +--------+ String +-----------+ String +--------+
                                   (shouty)
    

    client generates request Tokens and prints out responses. transform turns the Tokens into Yokens (hey, the keys are right next to each other) and passes them upstream. It also turns the response into a shout by upcasing and appending a !. server receives Yokens and generates the requested number of yos.

    import Data.Char
    import Control.Monad
    import Control.Monad.IO.Class
    import Pipes.Core
    import System.IO
    
    data Token = Token Int
    data Yoken = Yoken Int
    
    main :: IO ()
    main = do
        hSetBuffering stdout NoBuffering
        -- We have to get an initial n outside the pipeline to kick things off.
        putStr "N? "
        n <- read <$> getLine
        runEffect $ server >+> transform >+> client $ Token n
    
    -- The "server" generates a string of "yo"s based on the number inside the Yoken
    server :: Monad m => Yoken -> Server Yoken String m a
    server (Yoken n) = (respond . concat $ replicate n "yo") >>= server
    
    -- A processing step just for the sake of having one, turn the Token into a 
    -- Yoken, upcase the string, and append a "!".
    transform :: Monad m => Token -> Proxy Yoken String Token String m a
    transform (Token t) = do
        resp <- request $ Yoken t
        next <- respond $ map toUpper resp ++ "!"
        transform next
    
    -- Clients request "yo"s, by sending `Token`s upstream.
    client :: Token -> Client Token String IO a
    client t = do
        resp <- request t
        n <- liftIO $ putStrLn resp *> putStr "N? " *> fmap read getLine
        client $ Token n