haskelllazy-evaluationconduit

Why does this code memory leak when adding `bracketOnError`?


First, I appologize for not having a minimal example (I can attempt to construct one, but for now I have a "before and after" example):

First the "after" which has the memory leak:

protoReceiver :: RIO FdsEnv ()
protoReceiver = do
  logItS Info ["Entering FarmPCMessage protoReceiver"]
  tMap <- liftIO $ newThreadMap
  fdsEnv <- ask
  let lgr = fdsLogger fdsEnv
  loopBody <- pure $ bracketOnError
    (runResourceT $ protoServe fdsEnv tMap readFarmPCMessage)
    (\(_,w) -> do
      logLogItS Debug lgr ["Entering cleanup for protoReceiver"]
      )
    (\(server,_) -> do
      logLogItS Debug lgr ["Entering FarmPCMessage protoReceiver bracket"]
      server
        .| mapMC (liftIO . traverse_ (persistFarmEntry fdsEnv))
        .| mapMC ((logLogIt Info lgr) . pure)
        .| sinkUnits & runConduitRes
      )
  liftIO loopBody

Here's the "before" code, which does not memory leak:

protoReceiver :: RIO FdsEnv ()
protoReceiver = do
  logItS Info ["Entering FarmPCMessage protoReceiver"]
  tMap <- liftIO $ newThreadMap
  fdsEnv <- ask
  let lgr = fdsLogger fdsEnv

  (dmgrProtoServe, tcpWorker) <- liftIO $ runResourceT
    $ protoServe fdsEnv tMap readFarmPCMessage
  liftIO $ runResourceT $ dmgrProtoServe
    .| mapMC (liftIO . traverse_ (persistFarmEntry fdsEnv))
    .| mapMC ((logLogIt Info lgr) . pure)
    .| sinkUnits & runConduit

I did some profiling of the leak, though I'm not sure if it is particularly helpful (any suggestions on better profiling charts appreciated):

profiling RTS with -hc profiling RTS with -hd


Solution

  • The problem is a variant of the classic leak scenario in which we retain a reference to the head of a lazy list while it's being consumed:

    import Data.Foldable (traverse_)
    
    main :: IO ()
    main = do
        let xs = [1..]
        traverse_ print xs 
        traverse_ print xs -- commenting this statement solves the leak 
    

    Here, the conduit Source is working as a "lazy list" of sorts. We need to retain a reference to the original source value (server) even as we consume it, because it must be passed to the exception handler in case of error. And yet the exception handler doesn't seem to make use of it.

    The solution is to cut that reference as soon as the main computation we pass to bracketOnError gets hold of the value. We can use an MVar for that purpose. Not because of its synchronization abilities, but because it's a mutable reference that can be "left empty".

    The allocation action, instead of returning a (Source m r, a) value, could return an (MVar (Source m r), a) value instead. Then, the main computation would perfrom a takeMVar to get hold of the conduit source. Once we start consuming the source, the original value will be garbage collected because there won't remain any more references to it.

    Here is the working code the OP used after following these suggestions:

    protoReceiver :: RIO FdsEnv ()
    protoReceiver = retryForever $ do
      logItS Info ["Entering FarmPCMessage protoReceiver"]
      tMap <- liftIO $ newThreadMap
      fdsEnv <- ask
      let lgr = fdsLogger fdsEnv
      loopBody <- pure $ bracket
        (runResourceT $ do
          swTup <- protoServe fdsEnv tMap readFarmPCMessage
          serverMVar <- newMVar $ fst swTup
          pure (serverMVar, snd $! swTup)
          )
        (\(_, worker) -> do
          logLogItS Debug lgr ["Entering cleanup for protoReceiver"]
          killChildThreads tMap
          cancel worker
          )
        (\(serverMVar, _) -> do
          logLogItS Debug lgr ["Entering FarmPCMessage protoReceiver bracket"]
          server <- takeMVar serverMVar
          logLogItS Debug lgr ["FarmPCMessage protoReceiver bracket: got server"]
          server
            .| mapMC (liftIO . traverse_ (persistFarmEntry fdsEnv))
            .| mapMC ((logLogIt Info lgr) . pure)
            .| sinkUnits & runConduitRes
          )
      liftIO $ retryForever $ loopBody
      where
        killChildThreads = liftIO . killThreadHierarchy