haskellrecursionmemory-leaksconduit

Haskell based file streaming causes memory leak


I'm fairly new to Haskell and I'm working on an existing code base, that collects files from file shares. To parallelize the processing of the file shares Conduit is used. The scaffold is based on the this tutorial. To continuously read the file share I added the delay and a recursive call to the streamFile function. I am not sure if this is the problem, but the memory allocation is increasing constantly up to several gigabytes.

What could be the problem that causes the memory leak?

module FileScraper(runFileScraperFinal, FileScraper, watch, watchDirectories) where

import           Actions                         (PostProcAction)
import           Colog                           (LogAction, Msg, Severity)
import           Conduit                         (ConduitM, ConduitT, MonadIO (..), MonadResource, MonadTrans (lift), MonadUnliftIO (withRunInIO), ResourceT, await, bracketP, mapMC, mapM_C, runConduit, runResourceT, yield, (.|), takeWhileC)
import           Control.Concurrent              (threadDelay)
import qualified Control.Concurrent.Async        as Async
import qualified Control.Concurrent.STM          as STM
import qualified Control.Concurrent.STM.TBMQueue as STM
import           Data.ByteString                 (ByteString, readFile)
import           Data.Conduit.Combinators        (filterM, yieldMany)
import           Data.Functor                    ((<&>))
import           Data.Text                       (Text, unpack)
import           Filters                         (FileFilter, DirectoryFilter)
import           Polysemy                        (Final, Inspector (inspect), Member, Sem, makeSem)
import           Polysemy.Final                  (bindS, getInitialStateS, getInspectorS, interpretFinal, liftS)
import           Prelude                         hiding (filter, init, readFile)
import           System.FilePath.Find            (find, RecursionPredicate, (/~?), filePath, (&&?), (==?), fileType, FileType (RegularFile), always)
import           System.Posix                    (raiseSignal, sigTERM)

data FileScraper m a where
  Watch :: [(Text, Text, FileFilter, DirectoryFilter, PostProcAction)] -> (FilePath -> ByteString -> Text -> PostProcAction -> m Bool) -> FileScraper m ()

makeSem ''FileScraper

runFileScraperFinal :: forall m. (MonadUnliftIO m => forall r a. (Member (Final m) r) => LogAction m (Msg Severity) -> Sem (FileScraper ': r) a -> Sem r a)
runFileScraperFinal _ = do
  interpretFinal @m (\case
    Watch sources callback -> do
      is <- getInitialStateS
      ins <- getInspectorS
      cb' <- bindS $ uncurry4 callback
      liftS $ withRunInIO $ \runInIO -> liftIO $ do
        runResourceT . runConduit $ watchDirectories sources .| mapMC (\(fp,fc,dest,ppa) -> lift $ do
          eff <- runInIO $ cb' ((fp,fc,dest,ppa) <$ is)
          case inspect ins eff of
            Nothing -> do
              raiseSignal sigTERM
              pure False
            Just v -> do
              pure v
          ) .| takeWhileC id .| mapM_C (const $ pure ())
    )

uncurry4 :: (a -> b -> c -> d -> e) -> ((a, b, c, d) -> e)
uncurry4 f ~(a,b,c,d) = f a b c d

watchDirectories :: MonadResource m => [(Text, Text, FileFilter, DirectoryFilter, PostProcAction)] -> ConduitM a (FilePath, ByteString, Text, PostProcAction) m ()
watchDirectories sourceToFilterMap = parSources (fmap (\(src, dest, filter, dirFilter, postProcActions) -> streamFile (unpack src) dest filter dirFilter postProcActions) sourceToFilterMap)

streamFile :: MonadResource m => FilePath -> Text -> FileFilter -> DirectoryFilter -> PostProcAction -> ConduitM a (FilePath, ByteString, Text, PostProcAction) m ()
streamFile baseDir destination filter dirFilter postProcActions = do
    newFiles <- liftIO $ find (recursionPredicate dirFilter) (fileType ==? RegularFile) baseDir
    yieldMany newFiles .| filterM (liftIO . filter) .| mapMC (\entry -> do
      liftIO $ readFile entry <&> (entry,,destination,postProcActions))
    let minutes :: Int = 60_000_000
    liftIO $ threadDelay (5 * minutes)
    streamFile baseDir destination filter dirFilter postProcActions
    where
      recursionPredicate :: DirectoryFilter -> RecursionPredicate
      recursionPredicate df = case df of
        [] -> always
        excludes -> foldl1 (&&?) $ map ((/~?) filePath . unpack) excludes

parSources :: (MonadResource m, Foldable f) => f (ConduitM () o (ResourceT IO) ()) -> ConduitT i o m ()
parSources sources = bracketP init cleanup finalSource
  where
    init = do
        -- create the queue where all sources will put their items
        queue <- STM.newTBMQueueIO 100

        -- In a separate thread, run concurrently all conduits
        a <- Async.async $ do
            Async.mapConcurrently_ (\source -> runResourceT $ runConduit (source .| sinkQueue queue)) sources
            -- once all conduits are done, close the queue
            STM.atomically (STM.closeTBMQueue queue)
        pure (a, queue)
    cleanup (async, queue) = do
        -- upon exception or cancellation, close the queue and cancel the threads
        STM.atomically (STM.closeTBMQueue queue)
        Async.cancel async
    finalSource (_, queue) = sourceQueue queue

sourceQueue :: MonadIO m => STM.TBMQueue o -> ConduitT i o m ()
sourceQueue queue = do
        mbItem <- liftIO $ STM.atomically (STM.readTBMQueue queue)
        case mbItem of
            Nothing -> pure ()  -- queue closed
            Just item -> yield item *> sourceQueue queue

sinkQueue :: MonadIO m => STM.TBMQueue a -> ConduitT a o m ()
sinkQueue queue = do
        mbItem <- await
        case mbItem of
            Nothing -> pure ()  -- no more items to come
            Just item -> do
                liftIO $ STM.atomically (STM.writeTBMQueue queue item)
                sinkQueue queue

Update (Added function that uses the callback):

...
void $ async $ watch normalisedPrefixedSources (\fp content dest ppa -> do
    log Info $ "Sending file " <> pack fp

    result <- await =<< send (unpack dest) content

    case result of
      Just True -> do
        log Info $ "File sent " <> pack fp
        res <- embed @m $ liftIO $ ppa fp
        if res then pure True else do
          log Error "Raise signal for graceful shutdown."
          embed @m $ liftIO $ raiseSignal sigTERM
          pure False
      _ -> do
        log Error $ "Error sending file " <> pack fp <> ". Raise signal for graceful shutdown."
        embed @m $ liftIO $ raiseSignal sigTERM
        pure False
    )
...

Update 2: After removing the idempotent filter from the configuration (the changes from @K. A. Buhr are still in place) the memory consumption is constant.


type FileFilter = FilePath -> IO Bool

createIdempotentFilter :: LogAction IO Message -> M.Idempotent -> IO FileFilter
createIdempotentFilter la filterConfig = do
    cache <- newIORef []
    let configuredCacheSize :: Int = fromIntegral $ M.lruCacheSize filterConfig
    pure $ \path -> do
        fileModificationEpoch <- getModificationTime path
        cache' <- readIORef cache
        if (path, fileModificationEpoch) `elem` cache' then do
            la <& logText Debug ("File already in cache " <> pack path <> " | " <> pack (show fileModificationEpoch))
            pure False
        else do
            la <& logText Debug ("File not in cache " <> pack path <> " | " <> pack (show fileModificationEpoch))
            let alreadyScanned' = cache' <> [(path, fileModificationEpoch)]
            writeIORef cache $ drop (length alreadyScanned' - configuredCacheSize) alreadyScanned'
            pure True

Is there any problematic code - that causes a memory leak - in the function createIdempotentFilter?


Solution

  • First, make sure you rule out the ByteStrings of file contents as a source of the leak. You will have a maximum number of files in flight equal to the length of the bounded queue, and so your high watermark will be the contents of some arbitrary collection of 100 files from the input filesystems. If you're processing filesystems with large video/image files, you could see erratic, transient spikes from that. Also, if your callback is holding references to the pathnames and/or contents of (some or all of) those files, you'll see a very severe space leak as a result. Rule all this out by replacing readFile entry with return mempty and using a null callback (\_ _ _ _ -> return True).

    After making a similar change myself, I was able to duplicate your space leak and tracked it down to two technical issues.

    The first was:

    .| takeWhileC id .| mapM_C (const $ pure ())
    

    Replacing this with:

    .| Control.Monad.void andC
    

    reduced the maximum residency for a single pass through a test filesystem from 130MB to 15MB, but still with a characteristic linear increase in heap usage on a a heap profile.

    The second was:

    yield item *> sourceQueue queue
    

    Replacing this with:

    yield item >> sourceQueue queue
    

    removed the leak entirely. Maximum residency was only 2MB, and there was no discernible leak on a heap profile for multiple passes through the test filesystem.

    I'm not exactly sure what's going on here, for either issue. The *> versus >> issue is a problem I've seen before. While these are semantically equivalent, they don't necessarily have the same implementation, and sometimes *> leaks space where >> doesn't. However, the takeWhileC problem is a mystery to me.