haskellconduit

Merging sources using Haskell Conduit


Is it possible to build a function (say zipC2) in Conduit that would turn the following sources:

series1 = yieldMany [2, 4, 6, 8, 16 :: Int]

series2 = yieldMany [1, 5, 6 :: Int]

into one that would produce the following pairs (shown here as list):

[(Nothing, Just 1), (Just 2, Just 1), (Just 4, Just 1), (Just 4, Just 5), (Just 6, Just 6), (Just 8, Just 6), (Just 16, Just 6)]

It would be called with a comparison function in the following way:

runConduitPure ( zipC2 (<=) series1 series1 .| sinkList )

There used to be a mergeSources function in previous versions that did something relatively similar (without a memory effect though), but it disappeared in the most recent version (1.3.1).

Clarification about how the function works: The idea is to take 2 sources A (generating values a) and B (generating values b).

We then generate pairs:

If a < b we first build (Just a, Nothing)

If b < a it would yield (Nothing, Just b)

If a == b we update both sides and we produce (Just a, Just b)

The value from the source that was not updated is not consumed and is used for the next round of comparisons. Only updated values are consumed.

We then keep updating the pair, according to the values from A and B relative to each other.

In other words: we update the left side of the pair if a < b, the right side if b < a, or both sides if a == b. Any unconsumed value is kept in memory for the next round of comparison.


Solution

  • I've managed to create your zipC2 function:

    import Data.Ord
    import Conduit
    import Control.Monad
    
    zipC2Def :: (Monad m) => (a -> a -> Bool) -> ConduitT () a m () -> ConduitT () a m () -> (Maybe a, Maybe a) -> ConduitT () (Maybe a, Maybe a) m ()
    zipC2Def f c1 c2 (s1, s2) = do
      ma <- c1 .| peekC
      mb <- c2 .| peekC
      case (ma, mb) of
        (Just a, Just b) ->
          case (f a b, f b a) of
            (True, True) -> do
              yield (ma, mb)
              zipC2Def f (c1 .| drop1) (c2 .| drop1) (ma, mb)
            (_, True) -> do
              yield (s1, mb)
              zipC2Def f c1 (c2 .| drop1) (s1, mb)
            (True, _) -> do
              yield (ma, s2)
              zipC2Def f (c1 .| drop1) c2 (ma, s2)
            _ ->
              zipC2Def f (c1 .| drop1) (c2 .| drop1) (ma, s2)
        (Just a, Nothing) -> do
          yield (ma, s2)
          zipC2Def f (c1 .| drop1) c2 (ma, s2)
        (Nothing, Just b) -> do
          yield (s1, mb)
          zipC2Def f c1 (c2 .| drop1) (s1, mb)
        _ -> return ()
      where
        drop1 = dropC 1 >> takeWhileC (const True)
    
    zipC2 :: (Monad m) => (a -> a -> Bool) -> ConduitT () a m () -> ConduitT () a m () -> ConduitT () (Maybe a, Maybe a) m ()
    zipC2 f c1 c2 = zipC2Def f c1 c2 (Nothing, Nothing)
    
    main :: IO ()
    main = 
      let
        series1 = yieldMany [2, 4, 6, 8, 16 :: Int] :: ConduitT () Int Identity ()
        series2 = yieldMany [1, 5, 6 :: Int] :: ConduitT () Int Identity ()
      in
      putStrLn $ show $ runConduitPure $
        (zipC2 (<=) series1 series2)
        .| sinkList
    

    output:

    [(Nothing,Just 1),(Just 2,Just 1),(Just 4,Just 1),(Just 4,Just 5),(Just 6,Just 6),(Just 8,Just 6),(Just 16,Just 6)]