I'm fairly new to Haskell and I'm working on an existing code base, that collects files from file shares. To parallelize the processing of the file shares Conduit
is used. The scaffold is based on the this tutorial. To continuously read the file share I added the delay and a recursive call to the streamFile
function. I am not sure if this is the problem, but the memory allocation is increasing constantly up to several gigabytes.
What could be the problem that causes the memory leak?
module FileScraper(runFileScraperFinal, FileScraper, watch, watchDirectories) where
import Actions (PostProcAction)
import Colog (LogAction, Msg, Severity)
import Conduit (ConduitM, ConduitT, MonadIO (..), MonadResource, MonadTrans (lift), MonadUnliftIO (withRunInIO), ResourceT, await, bracketP, mapMC, mapM_C, runConduit, runResourceT, yield, (.|), takeWhileC)
import Control.Concurrent (threadDelay)
import qualified Control.Concurrent.Async as Async
import qualified Control.Concurrent.STM as STM
import qualified Control.Concurrent.STM.TBMQueue as STM
import Data.ByteString (ByteString, readFile)
import Data.Conduit.Combinators (filterM, yieldMany)
import Data.Functor ((<&>))
import Data.Text (Text, unpack)
import Filters (FileFilter, DirectoryFilter)
import Polysemy (Final, Inspector (inspect), Member, Sem, makeSem)
import Polysemy.Final (bindS, getInitialStateS, getInspectorS, interpretFinal, liftS)
import Prelude hiding (filter, init, readFile)
import System.FilePath.Find (find, RecursionPredicate, (/~?), filePath, (&&?), (==?), fileType, FileType (RegularFile), always)
import System.Posix (raiseSignal, sigTERM)
data FileScraper m a where
Watch :: [(Text, Text, FileFilter, DirectoryFilter, PostProcAction)] -> (FilePath -> ByteString -> Text -> PostProcAction -> m Bool) -> FileScraper m ()
makeSem ''FileScraper
runFileScraperFinal :: forall m. (MonadUnliftIO m => forall r a. (Member (Final m) r) => LogAction m (Msg Severity) -> Sem (FileScraper ': r) a -> Sem r a)
runFileScraperFinal _ = do
interpretFinal @m (\case
Watch sources callback -> do
is <- getInitialStateS
ins <- getInspectorS
cb' <- bindS $ uncurry4 callback
liftS $ withRunInIO $ \runInIO -> liftIO $ do
runResourceT . runConduit $ watchDirectories sources .| mapMC (\(fp,fc,dest,ppa) -> lift $ do
eff <- runInIO $ cb' ((fp,fc,dest,ppa) <$ is)
case inspect ins eff of
Nothing -> do
raiseSignal sigTERM
pure False
Just v -> do
pure v
) .| takeWhileC id .| mapM_C (const $ pure ())
)
uncurry4 :: (a -> b -> c -> d -> e) -> ((a, b, c, d) -> e)
uncurry4 f ~(a,b,c,d) = f a b c d
watchDirectories :: MonadResource m => [(Text, Text, FileFilter, DirectoryFilter, PostProcAction)] -> ConduitM a (FilePath, ByteString, Text, PostProcAction) m ()
watchDirectories sourceToFilterMap = parSources (fmap (\(src, dest, filter, dirFilter, postProcActions) -> streamFile (unpack src) dest filter dirFilter postProcActions) sourceToFilterMap)
streamFile :: MonadResource m => FilePath -> Text -> FileFilter -> DirectoryFilter -> PostProcAction -> ConduitM a (FilePath, ByteString, Text, PostProcAction) m ()
streamFile baseDir destination filter dirFilter postProcActions = do
newFiles <- liftIO $ find (recursionPredicate dirFilter) (fileType ==? RegularFile) baseDir
yieldMany newFiles .| filterM (liftIO . filter) .| mapMC (\entry -> do
liftIO $ readFile entry <&> (entry,,destination,postProcActions))
let minutes :: Int = 60_000_000
liftIO $ threadDelay (5 * minutes)
streamFile baseDir destination filter dirFilter postProcActions
where
recursionPredicate :: DirectoryFilter -> RecursionPredicate
recursionPredicate df = case df of
[] -> always
excludes -> foldl1 (&&?) $ map ((/~?) filePath . unpack) excludes
parSources :: (MonadResource m, Foldable f) => f (ConduitM () o (ResourceT IO) ()) -> ConduitT i o m ()
parSources sources = bracketP init cleanup finalSource
where
init = do
-- create the queue where all sources will put their items
queue <- STM.newTBMQueueIO 100
-- In a separate thread, run concurrently all conduits
a <- Async.async $ do
Async.mapConcurrently_ (\source -> runResourceT $ runConduit (source .| sinkQueue queue)) sources
-- once all conduits are done, close the queue
STM.atomically (STM.closeTBMQueue queue)
pure (a, queue)
cleanup (async, queue) = do
-- upon exception or cancellation, close the queue and cancel the threads
STM.atomically (STM.closeTBMQueue queue)
Async.cancel async
finalSource (_, queue) = sourceQueue queue
sourceQueue :: MonadIO m => STM.TBMQueue o -> ConduitT i o m ()
sourceQueue queue = do
mbItem <- liftIO $ STM.atomically (STM.readTBMQueue queue)
case mbItem of
Nothing -> pure () -- queue closed
Just item -> yield item *> sourceQueue queue
sinkQueue :: MonadIO m => STM.TBMQueue a -> ConduitT a o m ()
sinkQueue queue = do
mbItem <- await
case mbItem of
Nothing -> pure () -- no more items to come
Just item -> do
liftIO $ STM.atomically (STM.writeTBMQueue queue item)
sinkQueue queue
Update (Added function that uses the callback):
...
void $ async $ watch normalisedPrefixedSources (\fp content dest ppa -> do
log Info $ "Sending file " <> pack fp
result <- await =<< send (unpack dest) content
case result of
Just True -> do
log Info $ "File sent " <> pack fp
res <- embed @m $ liftIO $ ppa fp
if res then pure True else do
log Error "Raise signal for graceful shutdown."
embed @m $ liftIO $ raiseSignal sigTERM
pure False
_ -> do
log Error $ "Error sending file " <> pack fp <> ". Raise signal for graceful shutdown."
embed @m $ liftIO $ raiseSignal sigTERM
pure False
)
...
Update 2: After removing the idempotent filter from the configuration (the changes from @K. A. Buhr are still in place) the memory consumption is constant.
type FileFilter = FilePath -> IO Bool
createIdempotentFilter :: LogAction IO Message -> M.Idempotent -> IO FileFilter
createIdempotentFilter la filterConfig = do
cache <- newIORef []
let configuredCacheSize :: Int = fromIntegral $ M.lruCacheSize filterConfig
pure $ \path -> do
fileModificationEpoch <- getModificationTime path
cache' <- readIORef cache
if (path, fileModificationEpoch) `elem` cache' then do
la <& logText Debug ("File already in cache " <> pack path <> " | " <> pack (show fileModificationEpoch))
pure False
else do
la <& logText Debug ("File not in cache " <> pack path <> " | " <> pack (show fileModificationEpoch))
let alreadyScanned' = cache' <> [(path, fileModificationEpoch)]
writeIORef cache $ drop (length alreadyScanned' - configuredCacheSize) alreadyScanned'
pure True
Is there any problematic code - that causes a memory leak - in the function createIdempotentFilter
?
First, make sure you rule out the ByteString
s of file contents as a source of the leak. You will have a maximum number of files in flight equal to the length of the bounded queue, and so your high watermark will be the contents of some arbitrary collection of 100 files from the input filesystems. If you're processing filesystems with large video/image files, you could see erratic, transient spikes from that. Also, if your callback is holding references to the pathnames and/or contents of (some or all of) those files, you'll see a very severe space leak as a result. Rule all this out by replacing readFile entry
with return mempty
and using a null callback (\_ _ _ _ -> return True)
.
After making a similar change myself, I was able to duplicate your space leak and tracked it down to two technical issues.
The first was:
.| takeWhileC id .| mapM_C (const $ pure ())
Replacing this with:
.| Control.Monad.void andC
reduced the maximum residency for a single pass through a test filesystem from 130MB to 15MB, but still with a characteristic linear increase in heap usage on a a heap profile.
The second was:
yield item *> sourceQueue queue
Replacing this with:
yield item >> sourceQueue queue
removed the leak entirely. Maximum residency was only 2MB, and there was no discernible leak on a heap profile for multiple passes through the test filesystem.
I'm not exactly sure what's going on here, for either issue. The *>
versus >>
issue is a problem I've seen before. While these are semantically equivalent, they don't necessarily have the same implementation, and sometimes *>
leaks space where >>
doesn't. However, the takeWhileC
problem is a mystery to me.