haskellcompositionhaskell-pipes

Composing Pipes into a loop or cycle in haskell


This question is about the Haskell library Pipes.

This question is related to 2019 Advent of Code Day 11 (possible spoiler warning)

I have two Pipe Int Int m r brain and robot that need to pass information too each other in a continuous loop. That is the output of brain need to go to the input of robot and the output of robot needs to go the the input of brain. When brain finished I need the result of the computation.

How do I compose brain and robot into a loop? Ideally a loop with the type Effect m r that I can pass to runEffect

Edit: The result should look like this:

   +-----------+     +-----------+   
   |           |     |           |   
   |           |     |           |   
a ==>    f    ==> b ==>    g    ==> a=|
^  |           |     |           |    |
|  |     |     |     |     |     |    |
|  +-----|-----+     +-----|-----+    |
|        v                 v          |
|        ()                r          |
+=====================================+

Solution

  • The answer

    The easiest solution would be to use Client and Server as danidiaz suggested in the comments, since pipes doesn't have any built in support for cyclic pipes and it would be incredibly difficult, if not impossible to do so correctly. This is mostly because we need to handle cases where the number of awaits doesn't match the number of yields.

    Edit: I added a section about the problems with the other answer. See section "Another problematic alternative"

    Edit 2: I have added a less problematic possible solution below. See the section "A possible solution"

    A problematic alternative

    It is however possible to simulate it with the help of the Proxy framework (with Client and Server) and the neat function generalize, which turns a unidirectional Pipe into a bidirectional Proxy.

                                           generalize f x0
       +-----------+                   +---------------------+
       |           |                   |                     |
       |           |                x <======================== x
    a ==>    f    ==> b   becomes      |                     |
       |           |                a ==>         f         ==> b
       |     |     |                   |                     |
       +-----|-----+                   +----------|----------+
             v                                    v     
             r                                    r     
    

    Now we can use //> and >\\ to plug the ends and make the flow cyclic:

    loop :: Monad m => Pipe a a m r -> a -> Effect m r
    loop p x0 = pure >\\ generalize p x0 //> pure
    

    which has this shape

                loop f
    
                  a 
            +-----|-----+
            |     |     |
     /====<=======/===<========\
     |      |           |      |
     \=> a ==>    f    ==> a ==/
            |           |
            +-----|-----+
                  v    
                  r    
    

    As you can see, we are required to input an initial value for a. This is because there is no guarantee that the pipe won't await before it yields, which would force it to wait forever.

    Note however that this will throw away data if the pipe yields multiple times before awaiting, since generalize is internally implemented with a state monad that saves the last value when yielding and retrieves the last value when awaiting.

    Usage (of the problematic idea)

    To use it with your pipes, simply compose them and give them to loop:

    runEffect $ loop (f >-> g)
    

    But please don't use it, since it will randomly throw away data if you are not careful

    Another problematic alternative

    You could also make a lazily infinite chain of pipes like mingmingrr suggested

    infiniteChain :: Functor m => Pipe a a m r -> Producer a m r
    infiniteChain f = infiniteChain >-> f
    

    This solves the problem of discarded/duplicated values, but has several other problems. First is that awaiting first before yielding will cause an infinite loop with infinite memory usage, but that is already addressed in mingmingrr's answer.

    Another, more difficult to solve, issue is that every action before the corresponding yield is duplicated once for each await. We can see this if we modify their example to log what is happening:

    import Pipes
    import qualified Pipes.Prelude as P
    
    f :: Monad m => Pipe Int Int m r
    f = P.map (* 2)
    
    g :: Monad m => Int -> Pipe Int Int m ()
    g 0 = return ()
    g n = do
      lift . putStrLn $ "Awaiting. n = " ++ show n
      x <- await
      lift . putStrLn $ "Got: x = " ++ show x ++ " and n = "++ show n ;
      yield (x + 1)
      g (n - 1)
    
    cyclic' :: Monad m => Int -> Producer Int m Int
    cyclic' input = let pipe = (yield input >> pipe) >-> f >-> g 6 in pipe
    

    Now, running runEffect (cyclic' 0 >-> P.print) will print the following:

    Awaiting. n = 6
    Got: x = 0 and n = 6
    1
    Awaiting. n = 5
    Awaiting. n = 6
    Got: x = 0 and n = 6
    Got: x = 2 and n = 5
    3
    Awaiting. n = 4
    Awaiting. n = 5
    Awaiting. n = 6
    Got: x = 0 and n = 6
    Got: x = 2 and n = 5
    Got: x = 6 and n = 4
    7
    Awaiting. n = 3
    Awaiting. n = 4
    Awaiting. n = 5
    Awaiting. n = 6
    Got: x = 0 and n = 6
    Got: x = 2 and n = 5
    Got: x = 6 and n = 4
    Got: x = 14 and n = 3
    15
    Awaiting. n = 2
    Awaiting. n = 3
    Awaiting. n = 4
    Awaiting. n = 5
    Awaiting. n = 6
    Got: x = 0 and n = 6
    Got: x = 2 and n = 5
    Got: x = 6 and n = 4
    Got: x = 14 and n = 3
    Got: x = 30 and n = 2
    31
    Awaiting. n = 1
    Awaiting. n = 2
    Awaiting. n = 3
    Awaiting. n = 4
    Awaiting. n = 5
    Awaiting. n = 6
    Got: x = 0 and n = 6
    Got: x = 2 and n = 5
    Got: x = 6 and n = 4
    Got: x = 14 and n = 3
    Got: x = 30 and n = 2
    Got: x = 62 and n = 1
    63
    

    As you can see, for each await, we re-executed everything until the corresponding yield. More specifically, an await triggers a new copy of the pipe to run until it reaches a yield. When we await again, the copy will run until the next yield again and if it triggers an await during that, it will create yet another copy and run it until the first yield, and so on.

    This means that in the best case, we get O(n^2) instead of linear performance (And using O(n) instead of O(1) memory), since we are repeating everything for each action. In the worst case, e.g. if we were reading from or writing to a file, we could get completely wrong results since we are repeating side-effects.

    A possible solution

    If you really must use Pipes and can't use request/respond instead and you are sure that your code will never await more than (or before) it yields (or have a good default to give it in those cases), we could build upon my previous attempt above to make a solution that at least handles the case when yielding more than you await.

    The trick is adding a buffer to the implementation of generalize, so the excess values are stored instead of being thrown away. We can also keep the extra argument as a default value for when the buffer is empty.

    import Pipes.Lift (evalStateP)
    import Control.Monad.Trans.State.Strict (state, modify)
    import qualified Data.Sequence
    
    generalize' :: Monad m => Pipe a b m r -> x -> Proxy x a x b m r
    generalize' p x0 = evalStateP Seq.empty $ up >\\ hoist lift p //> dn
      where
        up () = do
            x <- lift $ state (takeHeadDef x0)
            request x
        dn a = do
            x <- respond a
            lift $ modify (Seq.|> x)
        takeHeadDef :: a -> Seq.Seq a -> (a, Seq.Seq a)
        takeHeadDef x0 xs = (foldr const x0 xs, Seq.drop 1 xs)
    

    If we now plug this into our definition of of loop, we will have solved the problem of discarding excess values (at the memory cost of keeping a buffer). It also prevents duplicating any values other than the default value and only uses the default value when the buffer is empty.

    loop' :: Monad m => a -> Pipe a a m r -> Effect m r
    loop' x0 p = pure >\\ generalize' p x0 //> pure
    

    If we want awaiting before yielding to be an error, we can simply give error as our default value: loop' (error "Await without yield") somePipe.

    TL;DR

    Use Client and Server from Pipes.Core. It will solve your problem and not cause a ton of strange bugs.

    If that is not possible, my "Possible solution" section with a modified version of generalize should do the job in most cases.