While writing a deserialiser for a large (<bloblength><blob>)*
encoded binary file I got stuck with the various Haskell produce-transform-consume libraries. So far I'm aware of four streaming libraries:
conduit
(Haskell Cast #6 nicely reveals the differences between conduit
and pipes
)Here's a stripped down example of where things go wrong when I try to do Word32
streaming with conduit
. A slightly more realistic example would first read a Word32
that determines the blob length and then yield a lazy ByteString
of that length (which is then deserialised further).
But here I just try to extract Word32's in streaming fashion from a binary file:
module Main where
-- build-depends: bytestring, conduit, conduit-extra, resourcet, binary
import Control.Monad.Trans.Resource (MonadResource, runResourceT)
import qualified Data.Binary.Get as G
import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8 as C
import qualified Data.ByteString.Lazy as BL
import Data.Conduit
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.List as CL
import Data.Word (Word32)
import System.Environment (getArgs)
-- gets a Word32 from a ByteString.
getWord32 :: C.ByteString -> Word32
getWord32 bs = do
G.runGet G.getWord32be $ BL.fromStrict bs
-- should read BytesString and return Word32
transform :: (Monad m, MonadResource m) => Conduit BS.ByteString m Word32
transform = do
mbs <- await
case mbs of
Just bs -> do
case C.null bs of
False -> do
yield $ getWord32 bs
leftover $ BS.drop 4 bs
transform
True -> return ()
Nothing -> return ()
main :: IO ()
main = do
filename <- fmap (!!0) getArgs -- should check length getArgs
result <- runResourceT $ (CB.sourceFile filename) $$ transform =$ CL.consume
print $ length result -- is always 8188 for files larger than 32752 bytes
The output of the program is just the number of Word32's that were read. It turns out the stream terminates after reading the first chunk (about 32KiB). For some reason mbs
is never Nothing
, so I must check null bs
which stops the stream when the chunk is consumed. Clearly, my conduit transform
is faulty. I see two routes to a solution:
await
doesn't want to go to the second chunk of the ByteStream
, so is there another function that pulls the next chunk? In examples I've seen (e.g. Conduit 101) this is not how it's donetransform
.How is this done properly? Is this the right way to go? (Performance does matter.)
Update: Here's a BAD way to do it using Systems.IO.Streams
:
module Main where
import Data.Word (Word32)
import System.Environment (getArgs)
import System.IO (IOMode (ReadMode), openFile)
import qualified System.IO.Streams as S
import System.IO.Streams.Binary (binaryInputStream)
import System.IO.Streams.List (outputToList)
main :: IO ()
main = do
filename : _ <- getArgs
h <- openFile filename ReadMode
s <- S.handleToInputStream h
i <- binaryInputStream s :: IO (S.InputStream Word32)
r <- outputToList $ S.connect i
print $ last r
'Bad' means: Very demanding in time and space, does not handle Decode exception.
Your immediate problem is caused by how you are using leftover
. That function is used to "Provide a single piece of leftover input to be consumed by the next component in the current monadic binding", and so when you give it bs
before looping with transform
you are effectively throwing away the rest of the bytestring (i.e. what is after bs
).
A correct solution based on your code would use the incremental input interface of Data.Binary.Get
to replace your yield
/leftover
combination with something that consumes each chunk fully. A more pragmatic approach, though, is using the binary-conduit package, which provides that in the shape of conduitGet
(its source gives a good idea of what a "manual" implementation would look like):
import Data.Conduit.Serialization.Binary
-- etc.
transform :: (Monad m, MonadResource m) => Conduit BS.ByteString m Word32
transform = conduitGet G.getWord32be
One caveat is that this will throw a parse error if the total number of bytes is not a multiple of 4 (i.e. the last Word32
is incomplete). In the unlikely case of that not being what you want, a lazy way out would be simply using \bs -> C.take (4 * truncate (C.length bs / 4)) bs
on the input bytestring.