haskellconduit

Can I combine a sink and a source in order to produce a ConduitT with inputs and outputs decoupled?


When using Akka Streams I can create a a Flow with function fromSinkAndSource. The rationale is because you require a Flow but you want to provide a Sink and Source whose flows of elements are decoupled.

This is my case since I want to provide a ConduitT for a servant websocket but the ingestion and generation will be mostly decoupled and driven by the underlying monad doing concurrent reads and writes in a multithreaded environment.

Is there some combinator or usage of the ConduitT monad that can provide this?

Edit:

I'll provide more context. Basically I'm trying to implement a game server communicating with the user through a ConduitT. So let's think that our game is FizzBuzz and our engine introduces a delay when computing the result. The user input commands are integers, and the game outputs Strings (sometimes). This is my attempt (does not work):

#!/usr/bin/env stack
-- stack --resolver lts-18.28 script --package conduit --package stm

{-# LANGUAGE NumericUnderscores #-}

-- We are going to create an effectful fizzbuzz

import Conduit
import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.STM
import Control.Monad
import Data.Conduit.List as CL
import Data.Void

producer :: TQueue String -> ConduitT String String IO ()
producer q = repeatMC readFromQueue
  where
    readFromQueue = atomically $ readTQueue q

consumer :: TQueue String -> ConduitT Int String IO ()
consumer q = awaitForever $ \elem ->
  lift $ fizz elem
  where
    fizz :: Int -> IO ()
    fizz n
      | n `mod` 15 == 0 = delayedWrite 15 "FizzBuzz"
      | n `mod` 3 == 0 = delayedWrite 3 "Fizz"
      | n `mod` 5 == 0 = delayedWrite 5 "Buzz"
      | otherwise = return ()

    delayedWrite :: Int -> String -> IO ()
    delayedWrite i s = void . forkIO $ do
      threadDelay (i * 1_000_000)
      atomically $ writeTQueue q s

flowSinkAndSource :: TQueue String -> ConduitT Int String IO ()
flowSinkAndSource q = consumer q .| producer q

main :: IO ()
main = do
  let source = CL.sourceList [1 .. 30]
      sink = CL.mapM_ print
   in do
        q <- newTQueueIO
        runConduit $ source .| flowSinkAndSource q .| sink

Since producer and consumer are decoupled I thought that having a queue and waiting for an element would be the solution, but it seems that producer is not "pulling" from consumer and I get a blocking error:

❯ ./fromSinkAndSource-in-conduit.hs
fromSinkAndSource-in-conduit.hs: thread blocked indefinitely in an STM transaction

I've tried also to try to consume all input with:

flowSinkAndSource :: TQueue String -> ConduitT Int String IO ()
flowSinkAndSource q = consumer q .| CL.sinkNull .| producer q

but the problem persists.


Solution

  • I believe the problem is that pipeline composition, .|, only runs upstream conduits if downstream conduits require their output. Instead, you really want to enforce sequential composition. You can do that by using *> instead of .|:

    flowSinkAndSource :: TQueue String -> ConduitT Int String IO ()
    flowSinkAndSource q = consumer q *> producer q
    

    For the types to work out you also need to change the input type of producer. The producer doesn't use its inputs so that is not a problem:

    producer :: TQueue String -> ConduitT Int String IO ()
    

    Or more generally:

    producer :: TQueue String -> ConduitT i String IO ()
    

    One thing to consider is how the producer knows that all input has been processed. Bypassing the normal flow of data in this way also hides that information. So running your example with my changes does print the Fizz, Buzz and FizzBuzz, but after that it still produces the "thread blocked" exception.