haskellhaskell-pipes

Pipes unfolds composition


I am currently in the process of learning pipes. While playing around with bidirectional pipes I noticed that unfold composition looks pretty similar:

(//>) :: Monad m => Proxy x' x b' b m r -> (b -> Proxy x' x c' c m b') -> Proxy x' x c' c m r
-- instead of:
(//>) :: Monad m => Proxy x' x b' b m r -> (b -> Proxy b' b c' c m b') -> Proxy x' x c' c m r

But we have to share the x' and x types because of the way we wire up the continuation:

(>>=) :: Monad m => Proxy a' a b' b m r -> (r -> Proxy a' a b' b m r') -> Proxy a' a b' b m r'
case p0 of
    Request x' fx  -> Request x' (\x -> go (fx x))
    Respond b  fb' -> fb b >>= \b' -> (fb' b')
    ...

But that's pretty easy to get around:

import Pipes
import Pipes.Core hiding ((//>))

main :: IO ()
main = runEffect $ lhs //> rhs

infixl 4 //>
(//>) :: Monad m => Proxy x' x b' b m r -> (b -> Proxy b' b c' c m b') -> Proxy x' x c' c m r
p //> f = p >>~ go
    where go x = go =<< request =<< f x

lhs :: Proxy x' x String String IO ()
lhs = each [1..10::Int] //> \i -> do
    r <- respond $ "response nr. " ++ show i
    lift . putStrLn $ "lhs: " ++ show r

rhs :: String -> Proxy String String  x x' IO String
rhs x = do
    lift . putStrLn $ "rhs 1: " ++ show x
    y <- request "yield manually to upstream!"
    lift . putStrLn $ "rhs 2: " ++ show y
    return "return to upstream"

With the expected output:

rhs 1: "response nr. 1"
lhs: "yield manually to upstream!"
rhs 2: "response nr. 2"
lhs: "return to upstream"
rhs 1: "response nr. 3"
lhs: "yield manually to upstream!"
rhs 2: "response nr. 4"
lhs: "return to upstream"
rhs 1: "response nr. 5"
lhs: "yield manually to upstream!"
rhs 2: "response nr. 6"
lhs: "return to upstream"
rhs 1: "response nr. 7"
lhs: "yield manually to upstream!"
rhs 2: "response nr. 8"
lhs: "return to upstream"
rhs 1: "response nr. 9"
lhs: "yield manually to upstream!"
rhs 2: "response nr. 10"
lhs: "return to upstream"

Best I can tell this doesn't break any laws either.

So finally here is my question: Why does Pipes use the current definition?


Solution

  • I believe the relevant part of (//>)'s contract is...

    (p //> f) replaces each respond in p with f.

    ... which implies that f will handle all values received from p in the same manner. That, however, is exactly what your combinator circumvents -- in your example, you alternate between sets of messages as you go through the elements of each [1..10]. To further illustrate the point, here is a slightly modified version of your code (in particular, I have picked a different name for your combinator, and used plain old (//>) immediately after each [1..10], as your combinator behaves the same in that case):

    infixl 4 //>*
    (//>*) :: Monad m =>
        Proxy x' x b' b m r -> (b -> Proxy b' b c' c m b') -> Proxy x' x c' c m r
    p //>* f = p >>~ go
        where go x = f x >>= request >>= go
    
    src :: Monad m => Producer Int m ()
    src = each [1..10]
    
    -- The types of lhs and rhs are more restrictive than yours, but for this
    -- usage pattern (and with the adjustments I made) that is not a problem. 
    lhs :: Show a => a -> Server String String IO ()
    lhs = \i -> do
        r <- respond $ "response nr. " ++ show i
        lift . putStrLn $ "lhs: " ++ r
    
    rhs :: String -> Client String String IO String
    rhs x = do
        lift . putStrLn $ "rhs 0: Will this happen for every value?"
        lift . putStrLn $ "rhs 1: " ++ x
        y <- request "yield manually to upstream!"
        lift . putStrLn $ "rhs 2: " ++ y
        return "return to upstream"
    

    The answer to the question I slipped in at the beginning of rhs...

    GHCi> runEffect $ (src //> lhs) //>* rhs
    rhs 0: Will this happen for every value?
    rhs 1: response nr. 1
    lhs: yield manually to upstream!
    rhs 2: response nr. 2
    lhs: return to upstream
    rhs 0: Will this happen for every value?
    rhs 1: response nr. 3
    lhs: yield manually to upstream!
    rhs 2: response nr. 4
    lhs: return to upstream
    rhs 0: Will this happen for every value?
    rhs 1: response nr. 5
    lhs: yield manually to upstream!
    rhs 2: response nr. 6
    lhs: return to upstream
    rhs 0: Will this happen for every value?
    rhs 1: response nr. 7
    lhs: yield manually to upstream!
    rhs 2: response nr. 8
    lhs: return to upstream
    rhs 0: Will this happen for every value?
    rhs 1: response nr. 9
    lhs: yield manually to upstream!
    rhs 2: response nr. 10
    lhs: return to upstream
    

    ... is no. Contrast that with what happens when I wire your functions using (//>) as the outermost combinator, like this:

    GHCi> runEffect $ src //> (\x -> lhs x //>* rhs)
    rhs 0: Will this happen for every value?
    rhs 1: response nr. 1
    lhs: yield manually to upstream!
    rhs 0: Will this happen for every value?
    rhs 1: response nr. 2
    lhs: yield manually to upstream!
    rhs 0: Will this happen for every value?
    rhs 1: response nr. 3
    lhs: yield manually to upstream!
    rhs 0: Will this happen for every value?
    rhs 1: response nr. 4
    lhs: yield manually to upstream!
    rhs 0: Will this happen for every value?
    rhs 1: response nr. 5
    lhs: yield manually to upstream!
    rhs 0: Will this happen for every value?
    rhs 1: response nr. 6
    lhs: yield manually to upstream!
    rhs 0: Will this happen for every value?
    rhs 1: response nr. 7
    lhs: yield manually to upstream!
    rhs 0: Will this happen for every value?
    rhs 1: response nr. 8
    lhs: yield manually to upstream!
    rhs 0: Will this happen for every value?
    rhs 1: response nr. 9
    lhs: yield manually to upstream!
    rhs 0: Will this happen for every value?
    rhs 1: response nr. 10
    lhs: yield manually to upstream!
    

    Instead of setting a server that will give at most ten responses (src //> lhs), here every value gives rise to a single-response server, whose response is handled by the rhs client. Given that there is no second response to be gotten out of the server, the code in rhs after the request is never ran. As a consequence, the values from src are handled uniformly. To further emphasise that, note that using your combinator to do that is unnecessary: src //> (lhs >~> void . rhs) does the same thing.

    (Another thing to note is that, if we change back the types of lhs and rhs to what you had them to be at first, we can write the pipeline just above as src //>* (\x -> lhs x //>* rhs). However, that is not the same as (src //>* lhs) //>* rhs. That is an associativity failure, and so your combinator does not give rise to a category.)

    It also helps to clarify what is going on to replace your combinator with (>>~) (something that I'm sure you have tried in your tests):

    GHCi> runEffect $ (src //> lhs) >>~ void . rhs
    rhs 0: Will this happen for every value?
    rhs 1: response nr. 1
    lhs: yield manually to upstream!
    rhs 2: response nr. 2
    

    src //> lhs offers up to ten responses; rhs, however, only makes two requests, and so the other eight responses are left unused. To me, that suggests your combinator is best expressed as a way to make a client carry on requesting indefinitely:

    -- requestForever :: Monad m => (b -> Client b' b m b') -> b -> Client b' b m r
    requestForever :: Monad m => 
        (b -> Proxy b' b c' c m b') -> b -> Proxy b' b c' c m r
    requestForever f = go
        where go x = f x >>= request >>= go
    
    GHCi> runEffect $ (src //> lhs) >>~ requestForever rhs
    rhs 0: Will this happen for every value?
    rhs 1: response nr. 1
    lhs: yield manually to upstream!
    rhs 2: response nr. 2
    lhs: return to upstream
    rhs 0: Will this happen for every value?
    rhs 1: response nr. 3
    lhs: yield manually to upstream!
    rhs 2: response nr. 4
    lhs: return to upstream
    rhs 0: Will this happen for every value?
    rhs 1: response nr. 5
    lhs: yield manually to upstream!
    rhs 2: response nr. 6
    lhs: return to upstream
    rhs 0: Will this happen for every value?
    rhs 1: response nr. 7
    lhs: yield manually to upstream!
    rhs 2: response nr. 8
    lhs: return to upstream
    rhs 0: Will this happen for every value?
    rhs 1: response nr. 9
    lhs: yield manually to upstream!
    rhs 2: response nr. 10
    lhs: return to upstream