multithreadinghaskellfunctional-programmingpipemkfifo

Understanding usage of withFileBlocking with named pipes


The following program

Therefore, provided I've created the pipe with mkfifo, the program prints this

Update loop
Blocking-read from pipe
Update loop
Update loop
Update loop
Update loop
... goes on ...

(Whether Blocking-read from pipe is print before or after the first Update loop line is an implementation detail and could also change depending on whatever, I think.)

As soon as I write into the pipe, e.g. via echo -n ciao > /path/to/mypipe, I see this

... continuing from before ...
Update loop
Update loop
Read succeeded
success: ciao
Blocking-read from pipe
Update loop
Update loop
Update loop
... goes on ...

testifying that the "async" thread unblocks, prints, and blocks again.

Here's the current program:

import Control.Concurrent.Async (withAsync)
import Control.Monad
import GHC.IO.Exception (ExitCode(..))
import System.Process.Extra (readProcessWithExitCode)
import System.Time.Extra (sleep)

main :: IO ()
main = do
  let pipe = "/path/to/mypipe"
  withAsync (forever $ do print "Blocking-read from pipe"
                          (ret, out, _) <- readProcessWithExitCode "cat" [pipe] ""
                          print "Read succeeded"
                          case ret of
                            ExitSuccess -> print $ "success: " ++ out
                            ExitFailure _ -> error "how is this possible?")
            (const $ forever $ print "Update loop" >> sleep 1)

The thing is, I would like to avoid the external process and do the reading in-process, via readFile-like actions. I've tried using withFileBlocking, like this:

import Control.Concurrent.Async (withAsync)
import Control.Monad
import GHC.IO.Handle.FD (withFileBlocking)
import System.IO (hGetContents)
import System.IO.Extra (IOMode(ReadMode))
import System.Time.Extra (sleep)

main :: IO ()
main = do
  let pipe = "/home/enrico/deleteme/deleteme/mypipe"
  withAsync (forever $ do putStrLn "Blocking-read from pipe"
                          out <- withFileBlocking pipe ReadMode hGetContents
                          putStrLn "Read succeeded"
                          putStrLn $ "success: " ++ out)
            (const $ forever $ putStrLn "Update loop" >> sleep 1)

However, the behavior of this program is different, and I don't quite understand it:

  1. as soon as I execute it, it prints the following

    Update loop
    Blocking-read from pipe
    

    and sits there, which I don't understand given that the "main" thread is unchanged with respect to my original code; I don't see what could keep it from continuing the forever action;

  2. then if I write something in the pipe, e.g. via the same command as above, I see this (I'm omitting the previous two lines)

    Update loop
    Read succeeded
    Update loop
    Update loop
    Update loop
    Update loop
    ... goes on ...
    

    which also surpises me, because I don't see how this can "unblock" the main thread that has nothing to do with what happens to the pipe, nor I see what can go wrong with putStrLn $ "success: " ++ out once the out <- withFileBlocking pipe ReadMode hGetContents is done;

  3. furthermore, if I try to write to the pipe (again, via echo -n ciao > /path/to/mypipe), it blocks, revealing that even though putStrLn "Read succeeded" executed successfully, putStrLn $ "success: " ++ out must have errored or blocked.

I've read the doc for withFileBlocking, and it does seem the right tool for what I want to do, at first. However, the doc for openFileBlocking (withFileBlocking, I read, opens the file just like openFileBlocking)


Solution

  • With the default (single-threaded) runtime, a GHC-compiled program only launches one OS thread. If you launch green threads with forkIO, say, they are managed by the runtime to run on that single OS thread. And, if you block on a system call, the entire process is blocked, including all Haskell threads.

    For example, the program:

    import System.IO
    import Control.Concurrent
    import Control.Monad
    import GHC.IO.Handle.FD
    
    main :: IO ()
    main = do
      _ <- forkIO $ forever $ putStrLn "ping" >> threadDelay 1000000
      _ <- openFileBlocking "/tmp/fifo" ReadMode
      pure ()
    

    compiled with ghc -O2 will (typically) print "ping" once and then block indefinitely, assuming /tmp/fifo is an otherwise unused FIFO.

    However, the same program compiled with the threaded runtime ghc -threaded -O2 works fine, and keeps printing ping while blocking on the FIFO.

    By the way, you don't even have to enable the extra capabilities, say by running your program with +RTS -N4 or whatever. The threaded runtime is designed to launch extra OS threads, even in the default one-capability +RTS -N1 mode to handle blocking system calls. This is documented under the -threaded option:

    The threaded runtime system provides the following benefits:

    It enables the -N ⟨x⟩ RTS option to be used, which allows threads to run in parallel on a multiprocessor or multicore machine. See Using SMP parallelism.

    If a thread makes a foreign call (and the call is not marked unsafe), then other Haskell threads in the program will continue to run while the foreign call is in progress. Additionally, foreign exported Haskell functions may be called from multiple OS threads simultaneously.

    There is also a much more detailed discussion in Multi-threading and the FFI. Though it is about the FFI, it applies to something like openFileBlocking which is ultimately implemented via the FFI.

    For your second program above, enabling -threaded prevents the blocking thread from stopping all Haskell threads, so when it's started the "Update loop" thread continues running:

    Update loop
    Blocking-read from pipe
    Update loop
    Update loop
    Update loop
    

    If you write to the FIFO, the first thread is unblocked and prints "Read succeeded":

    Update loop
    Blocking-read from pipe
    Update loop
    Update loop
    Read succeeded
    Update loop
    Update loop
    

    It doesn't print "success: <stuff read from pipe" because you've committed the cardinal sin of trying to use the output of hGetContents... after the file handle has been closed, so your first thread throws an exception which is hidden by the functionality of the withAsync call.

    You'd get exactly the same behavior without fifos, blocking I/O, or multiple threads. The following program fails, too:

    import System.IO
    
    main :: IO ()
    main = do
      out <- withFile "/etc/passwd" ReadMode hGetContents
      putStrLn "Read succeeded"
      putStrLn $ "success: " ++ out
    

    giving output:

    Read succeeded
    Three: /etc/passwd: hGetContents: illegal operation (delayed read on closed handle)
    

    You should modify your program to use the output completely before closing the handle. It's enough to bring the putStrLn functions into the withFileBlocking clause:

    import Control.Concurrent.Async (withAsync)
    import Control.Monad
    import GHC.IO.Handle.FD (withFileBlocking)
    import System.IO (hGetContents)
    import System.IO.Extra (IOMode(ReadMode))
    import System.Time.Extra (sleep)
    
    main :: IO ()
    main = do
      let pipe = "/tmp/fifo"
      withAsync (forever $ do putStrLn "Blocking-read from pipe"
                              withFileBlocking pipe ReadMode $ \h -> do
                                out <- hGetContents h
                                putStrLn "Read succeeded"
                                putStrLn $ "success: " ++ out)
                (const $ forever $ putStrLn "Update loop" >> sleep 1)
    

    which gives:

    Update loop
    Blocking-read from pipe
    Update loop
    Update loop
    Read succeeded            <-- wrote to pipe here
    success: hello
    
    Blocking-read from pipe
    Update loop
    Update loop
    

    Alternatively, you could replace hGetContents with hGetContents' in your code above, and that would consume the contents fully, making it safe to use the out value outside the withFileBlocking clause, after the handle has been closed.