Consider the following Haskell program (I'm doing this mostly for learning purposes):
import qualified Control.Concurrent.MSem as Sem
import System.Environment (getArgs)
import Control.Concurrent (forkIO)
import Control.Monad
-- Traverse with maximum n threads
parallelTraverse :: Foldable a => Int -> (b -> IO()) -> a b -> IO ()
parallelTraverse n action values = do
sem <- Sem.new n
forM_ values $ \value -> Sem.with sem (forkIO $ action value)
main :: IO ()
main = do
args <- getArgs
let nThreads = read . head $ args :: Int
parallelTraverse nThreads print [(1::Int)..]
when I run it, memory quickly climbs to several GB. I tried various combinations to make sure I discard the results of intermediate computations (the print actions). Why is it still leaking space?
First of all, you have an evident mistake in the following piece:
Sem.with sem (forkIO $ action value)
You're addressing the semaphore from the master thread around the "fork" operation instead of the action there. Following is the proper way to implement it:
forkIO (Sem.with sem (action value))
I.e., to address the semaphore from the context of the forked thread.
Secondly, in the following code you're calling the parallelTraverse
operation on an infinite list:
parallelTraverse nThreads print [(1::Int)..]
Which results in the infinite forking of threads. And since the forkIO
operation is roughly instantaneous for the calling thread, it's pretty much no surprise that you're running out of resources quite soon.
To use the semaphore to limit the number of worker threads the with
pattern simply won't do in your case. Instead you should use the explicit combination of wait
and signal
and not forget to treat the exceptions properly (in case you expect them). E.g.,:
parallelTraverse :: Foldable a => Int -> (b -> IO()) -> a b -> IO ()
parallelTraverse n action values = do
sem <- Sem.new n
forM_ values $ \value -> do
Sem.wait sem
forkIO $ finally (action value) (Sem.signal sem)