socketshaskellconduitnetwork-conduit

conduit and sockets: allow multiple connections


Here's some code that implements a small receiving server using conduit, network-conduit, and stm-conduit. It receives data on a socket and then streams it through an STM-channel to the main thread.

import Control.Concurrent (forkIO)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TBMChan (newTBMChan, TBMChan())
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Trans.Class

import Data.ByteString (ByteString)
import qualified Data.ByteString as B
import Data.Conduit
import qualified Data.Conduit.Binary as DCB
import Data.Conduit.Extra.Resumable
import Data.Conduit.Network (sourceSocket)
import Data.Conduit.TMChan (sinkTBMChan, sourceTBMChan, mergeSources)

import System.Directory (removeFile)
import System.IO

type BSChan = TBMChan ByteString

listenSocket :: Socket -> Int -> IO BSChan
listenSocket soc bufSize = do
    chan <- atomically $ newTBMChan bufSize
    forkListener chan
    return chan
  where
    forkListener chan = void . forkIO $ listen soc 2 >> loop where 
      loop = do
        (conn, _) <- accept soc
        sourceSocket conn $$ sinkTBMChan chan
        close conn
        loop

main :: IO ()
main = do
  soc <- socket AF_UNIX Stream 0
  bind soc (SockAddrUnix "mysock")
  socChan <- listenSocket soc 8
  sourceTBMChan socChan $$ DCB.sinkHandle stdout
  removeFile "mysock"

(In the real application, the stream of data from the socket gets merged with some others, which is why I don't handle it directly in the listener).

The problem is that, where I had expected this to stay open until the main thread is killed, instead it exits after the first message is received on the socket. I cannot work out why it does this, unless it's that the sink (on 2nd to last line) is exiting once it sees the end of the first stream of data. Can I persuade it not to do this? There's some stuff in Conduit about making a source resumable, but not a sink.


Solution

  • From the documention of sinkTBMChan:

    When the sink is closed, the channel will close too.

    So when the first socket handle closes, it causes the Source from sourceSocket to close, closing the connected sink which in turn closes the TBMChan which propagates to sinkHandle stopping the sink.

    The simplest way to solve this is probably to change your loop into a custom source that doesn't close between connections and connect that source into the TBMChan.

    listenSocket :: Socket -> Int -> IO BSChan
    listenSocket soc bufSize = do
        chan <- atomically $ newTBMChan bufSize
        forkListener chan
        return chan
      where
        forkListener chan = void . forkIO $ do
          listen soc 2
          loop $$ sinkTBMChan chan
    
        loop = do
          (conn, _) <- liftIO $ accept soc
          sourceSocket conn
          liftIO $ close conn
          loop