haskellconcurrencyhaskell-pipes

How would I pipe with a timeout that resets with each incoming?


The withTimeout function is suppose to pipe ConsoleEvent with a CeTimeout sent every s :: Int seconds if nothing has been received. Instead it fails to send the CeTimeout events at the appropriate times. One CeTimeout event is replaced for other events if greater than s seconds have passed with the original event being lost. Also instead of one CeTimeout event, it should be n*s CeTimeout events with n counting for each s second period that has passed. Where is the mistake, and what would be the correction? Thanks!

withTimeout :: (MonadIO t) => Int -> Pipe ConsoleEvent ConsoleEvent t ()
withTimeout ((* 1000000) -> s) = join . liftIO $ work
  where
    work :: (MonadIO t) => IO (Pipe ConsoleEvent ConsoleEvent t ()) 
    work =
      do
        (oSent, iKept) <- spawn $ bounded 1
        (oKept, iSent) <- spawn $ unbounded
        (oTimeout, iTimeout) <- spawn $ bounded 1

        tid <- launchTimeout oTimeout >>= newMVar

        forkIO $ do
          runEffect . forever $ fromInput iKept >-> factorTimeout tid oTimeout >-> toOutput oKept

        forkIO $ do
          runEffect . forever $ fromInput iTimeout >-> toOutput oKept

        return $ do
          await >>= (liftIO . guardedSend oSent)
          (liftIO . guardedRecv $ iSent) >>= yield

    guardedSend :: Output ConsoleEvent -> ConsoleEvent -> IO ()
    guardedSend o ce =
      (atomically $ send o ce) >>= \case
        True -> return ()
        otherwise -> die $ "withTimeout can not send"

    guardedRecv :: Input ConsoleEvent -> IO ConsoleEvent
    guardedRecv i =
      (atomically $ recv i) >>= \case
        Just a -> return a
        otherwise -> die $ "withTimeout can not recv"

    launchTimeout :: Output ConsoleEvent -> IO ThreadId
    launchTimeout o =
      forkIO . forever $ do
        threadDelay $ s
        (atomically $ send o CeTimeout) >>= \case
          True -> return ()
          otherwise -> die "withTimeout can not send timeout"

    relaunchTimeout :: Output ConsoleEvent -> ThreadId -> IO ThreadId
    relaunchTimeout o oldTid = 
      do
        tid <- launchTimeout o
        killThread oldTid
        return tid

    factorTimeout :: MVar ThreadId -> Output ConsoleEvent -> Pipe ConsoleEvent ConsoleEvent IO ()
    factorTimeout v o =
      do
        ce <- await
        liftIO . modifyMVar_ v $ relaunchTimeout o
        yield ce

Here is a fully executable script.


Solution

  • It seems like a Pipe will only allow one yield per await. This means that a CeTimeout can not arbitrarily be sent down the pipe because nothing came into the pipe to cause the flow. I will have to go through the source to confirm this; in the meantime this function has been refactored to return a Pipe and a Producer instead of just a Pipe. The Producer can then be joined back in the calling function. The initial plan was to return just a Pipe so that the calling function would not have to do any additional work to make timeouts work. That would have been a more self contained solution. This alternative is nice in that it is more explicit. The timeouts won't look like they are appearing out of thin air to someone that is not familiar with the pipeline.

    withTimeout :: (MonadIO t) => Int -> IO (Pipe ConsoleEvent ConsoleEvent t (), Producer ConsoleEvent t ())
    withTimeout ((* 1000000) -> s) =
      do
        (oTimeout, iTimeout) <- spawn $ bounded 1
        vTid <- launchTimeout oTimeout >>= newMVar
    
        return (factorTimeout vTid oTimeout, fromInput iTimeout)
      where
        launchTimeout :: Output ConsoleEvent -> IO ThreadId
        launchTimeout o =
          forkIO . forever $ do
            threadDelay $ s
            (atomically $ send o CeTimeout) >>= \case
              True -> return ()
              otherwise -> die "withTimeout can not send timeout"
    
        relaunchTimeout :: Output ConsoleEvent -> ThreadId -> IO ThreadId
        relaunchTimeout o oldTid = 
          do
            tid <- launchTimeout o
            killThread oldTid
            return tid
    
        factorTimeout :: (MonadIO t) => MVar ThreadId -> Output ConsoleEvent -> Pipe ConsoleEvent ConsoleEvent t ()
        factorTimeout v o =
          do
            ce <- await
            liftIO . modifyMVar_ v $ relaunchTimeout o
            yield ce
    
    main :: IO ()
    main =
      do
        hSetBuffering stdin NoBuffering
        hSetEcho stdin False
    
        exitSemaphore <- newEmptyMVar
        (o1, i1) <- spawn $ bounded 1
        (o2, i2) <- spawn $ bounded 1
    
        (timeoutTrap, timeoutRender) <- withTimeout 2
    
        runEffect $ yield CeBegan >-> toOutput o1
    
        forkIO $ do
          runEffect . forever $ chars >-> toOutput o1
          putMVar exitSemaphore ()
    
        -- other inputs would be piped to o1 here
    
        forkIO $ do
          runEffect . forever $ fromInput i1 >-> timeoutTrap >-> toOutput o2
          putMVar exitSemaphore ()
    
        forkIO $ do
          runEffect . forever $ timeoutRender >-> toOutput o2
          putMVar exitSemaphore ()
    
        forkIO $ do
          -- logic would be done before dumpPipe
          runEffect . forever $ fromInput i2 >-> dumpPipe >-> (await >> return ())
          putMVar exitSemaphore ()
    
        takeMVar exitSemaphore
    

    Here is a fully executable script.