haskellconduit

Conduit pipeline skips some elements of a stream


I tried to implement a simple wordcount with Haskell's Conduit library:

wordcountCv2 :: IO ()
wordcountCv2 = do 
    hashMap <- runConduitRes $ sourceFile "input.txt"
        .| decodeUtf8C
        .| omapCE Data.Char.toLower
        .| peekForeverE (do
            word <- takeWhileCE isAlphaNum
            dropCE 1
            return word)
        .| foldMC insertInHashMap empty
    print (toList hashMap)

insertInHashMap x v = do
    return (insertWith (+) v 1 x)

The problem is that this function works fine with small/medium input files, but as the file size grows, it tends to break some of the words. For instance, if I use a small file containing 100 times the word "hello", the result is: [("hello",100)], instead if the hellos are for instance 100000 the result is: [("hello",99988),("he",6),("hell",6),("o",6),("llo",6)]. The more the file grows, the more there are broken words. Is there something wrong in my implementation?


Solution

  • chi correctly commented that takeWhileCE returns () and sends the result downstream instead of returning it. They’re wrong about one thing, though: this is, in fact, the issue.

    Your conduit operates on a stream of chunks, and one of the reasons why takeWhileCE sends the result downstream is so it can leave the input split on the original chunk boundaries. That way it doesn’t force you to consume unbounded memory just because you might receive a long sequence of matching values.

    But if you want to combine the potentially multiple chunks that make up each word, you need to do a bit more work. Sending them through foldC is one way to do so.

            .| peekForeverE (do
                word <- takeWhileCE isAlphaNum .| foldC
                dropCE 1
                yield word)
    

    In your case, it’s easier to use the splitOnUnboundedE combinator which does that all for you.

            .| splitOnUnboundedE (not . isAlphaNum)