I'm writing a server and one of the requirements is that it needs to be able to push data to clients without having the data directly requested by the client. I'm using conduits but it feels like this is beyond the capabilities of conduits. The problem that I run into is that there doesn't appear to be a way to tell if the socket has data available or not and await will block execution until there is data available. Let's say I have the following functions
getPacket :: Conduit ByteString IO ClientPacket --take a bytestring and yield a ClientPacket i.e. the ByteString deserialized into a sensible form
processPacket :: Conduit ClientPacket IO ServerPacket --take a ClientPacket and yield a ServerPacket i.e. a response to the client's request
putPacket :: Conduit ServerPacket IO ByteString --serialize the ServerPacket
then I connect the conduits together with the source and sink from the Conduit.Network library
appSource appData $$ getPacket =$= processPacket =$= putPacket $= appSink appData
Now, I introduce a source of data from outside the conduit and I want to incorporate that data into the conduit. For example, if this were a chat server the outside data would be messages sent by other clients. The problem is that no matter where I try to introduce this outside data, it'll get blocked by calls to await. Essentially, I'll end up with code that looks like this.
yield processOutsideData --deal with the outside data
data <- await --await data from upstream
The only way that more outside data will get processed is if an upstream component yields something, but upstream will only yield if it gets data from a client, which is exactly what I'm trying to avoid. I've tried using multiple threads and TChan to solve this, but it appears that the appSource and appSink must be used in the same thread otherwise I get invalid file descriptors exceptions from recv (which makes sense).
However, if the socket source and sink are running in the same thread I once again run into the problem that await is blocking and I have no way of checking if data is available from the socket. At this point it seems like I have hit a wall with conduits.
But I really do enjoy using conduits and would prefer to continue using them. So my questions is: is there a way to do what I'm trying to achieve with conduits?
Michael Snoyman's conduit network examples use concurrency. The telnet client example runs one thread for sending input and another for displaying what's recieved. I've adapted it to send and recieve entire lines
{-# LANGUAGE OverloadedStrings #-}
import Conduit
import Control.Concurrent.Async (concurrently)
import Control.Monad (liftM, void)
import Data.ByteString (ByteString)
import Data.ByteString.Char8 (unpack)
import Data.Conduit.Network
import Data.String (IsString, fromString)
import Network (withSocketsDo)
getLines :: (IsString a, MonadIO m) => Producer m a
getLines = repeatMC . liftM fromString $ liftIO getLine
putLines :: (MonadIO m) => Consumer ByteString m ()
putLines = mapM_C $ liftIO . putStrLn . unpack
main :: IO ()
main = withSocketsDo $
runTCPClient (clientSettings 4000 "localhost") $ \server ->
void $ concurrently
(getLines $$ appSink server)
(appSource server $$ putLines)
We can do the same thing on the server. Create an STM channel, write recieved data into the channel, and send data from the channel to the clients. This uses the stm-conduit package's simple wrappers around an STM channel, sourceTBMChan
and sinkTBMChan
.
{-# LANGUAGE OverloadedStrings #-}
import Conduit
import Control.Concurrent.Async (concurrently)
import Control.Concurrent.STM.TBMChan (newTBMChan)
import Control.Monad (void)
import Control.Monad.STM (atomically)
import Data.Conduit.Network
import Data.Conduit.TMChan (sourceTBMChan, sinkTBMChan)
import Network (withSocketsDo)
main :: IO ()
main = withSocketsDo $ do
channel <- atomically $ newTBMChan 10
runTCPServer (serverSettings 4000 "*") $ \server ->
void $ concurrently
(appSource server $$ sinkTBMChan channel False)
(sourceTBMChan channel $$ appSink server)
If we run the server with only a single client connected, it echos back what the client sent.
----------
| a | (sent)
| a | (received)
| b | (sent)
| b | (received)
| c | (sent)
| c | (received)
----------
If we run the server with multiple clients connected, the messages are distributed among the clients with one client getting each message.
---------- ----------
| 1 | (sent) | 1 | (received)
| 2 | (sent) | 3 | (received)
| 2 | (received) | |
| 3 | (sent) | |
| | | |
| | | |
---------- ----------
This example doesn't handle what to do when the client closes the connection.