First, I appologize for not having a minimal example (I can attempt to construct one, but for now I have a "before and after" example):
First the "after" which has the memory leak:
protoReceiver :: RIO FdsEnv ()
protoReceiver = do
logItS Info ["Entering FarmPCMessage protoReceiver"]
tMap <- liftIO $ newThreadMap
fdsEnv <- ask
let lgr = fdsLogger fdsEnv
loopBody <- pure $ bracketOnError
(runResourceT $ protoServe fdsEnv tMap readFarmPCMessage)
(\(_,w) -> do
logLogItS Debug lgr ["Entering cleanup for protoReceiver"]
)
(\(server,_) -> do
logLogItS Debug lgr ["Entering FarmPCMessage protoReceiver bracket"]
server
.| mapMC (liftIO . traverse_ (persistFarmEntry fdsEnv))
.| mapMC ((logLogIt Info lgr) . pure)
.| sinkUnits & runConduitRes
)
liftIO loopBody
Here's the "before" code, which does not memory leak:
protoReceiver :: RIO FdsEnv ()
protoReceiver = do
logItS Info ["Entering FarmPCMessage protoReceiver"]
tMap <- liftIO $ newThreadMap
fdsEnv <- ask
let lgr = fdsLogger fdsEnv
(dmgrProtoServe, tcpWorker) <- liftIO $ runResourceT
$ protoServe fdsEnv tMap readFarmPCMessage
liftIO $ runResourceT $ dmgrProtoServe
.| mapMC (liftIO . traverse_ (persistFarmEntry fdsEnv))
.| mapMC ((logLogIt Info lgr) . pure)
.| sinkUnits & runConduit
I did some profiling of the leak, though I'm not sure if it is particularly helpful (any suggestions on better profiling charts appreciated):
The problem is a variant of the classic leak scenario in which we retain a reference to the head of a lazy list while it's being consumed:
import Data.Foldable (traverse_)
main :: IO ()
main = do
let xs = [1..]
traverse_ print xs
traverse_ print xs -- commenting this statement solves the leak
Here, the conduit Source
is working as a "lazy list" of sorts. We need to retain a reference to the original source value (server
) even as we consume it, because it must be passed to the exception handler in case of error. And yet the exception handler doesn't seem to make use of it.
The solution is to cut that reference as soon as the main computation we pass to bracketOnError
gets hold of the value. We can use an MVar
for that purpose. Not because of its synchronization abilities, but because it's a mutable reference that can be "left empty".
The allocation action, instead of returning a (Source m r, a)
value, could return an (MVar (Source m r), a)
value instead. Then, the main computation would perfrom a takeMVar
to get hold of the conduit source. Once we start consuming the source, the original value will be garbage collected because there won't remain any more references to it.
Here is the working code the OP used after following these suggestions:
protoReceiver :: RIO FdsEnv ()
protoReceiver = retryForever $ do
logItS Info ["Entering FarmPCMessage protoReceiver"]
tMap <- liftIO $ newThreadMap
fdsEnv <- ask
let lgr = fdsLogger fdsEnv
loopBody <- pure $ bracket
(runResourceT $ do
swTup <- protoServe fdsEnv tMap readFarmPCMessage
serverMVar <- newMVar $ fst swTup
pure (serverMVar, snd $! swTup)
)
(\(_, worker) -> do
logLogItS Debug lgr ["Entering cleanup for protoReceiver"]
killChildThreads tMap
cancel worker
)
(\(serverMVar, _) -> do
logLogItS Debug lgr ["Entering FarmPCMessage protoReceiver bracket"]
server <- takeMVar serverMVar
logLogItS Debug lgr ["FarmPCMessage protoReceiver bracket: got server"]
server
.| mapMC (liftIO . traverse_ (persistFarmEntry fdsEnv))
.| mapMC ((logLogIt Info lgr) . pure)
.| sinkUnits & runConduitRes
)
liftIO $ retryForever $ loopBody
where
killChildThreads = liftIO . killThreadHierarchy