The withTimeout
function is suppose to pipe ConsoleEvent
with a CeTimeout
sent every s :: Int
seconds if nothing has been received. Instead it fails to send the CeTimeout
events at the appropriate times. One CeTimeout
event is replaced for other events if greater than s
seconds have passed with the original event being lost. Also instead of one CeTimeout
event, it should be n*s
CeTimeout
events with n
counting for each s
second period that has passed. Where is the mistake, and what would be the correction? Thanks!
withTimeout :: (MonadIO t) => Int -> Pipe ConsoleEvent ConsoleEvent t ()
withTimeout ((* 1000000) -> s) = join . liftIO $ work
where
work :: (MonadIO t) => IO (Pipe ConsoleEvent ConsoleEvent t ())
work =
do
(oSent, iKept) <- spawn $ bounded 1
(oKept, iSent) <- spawn $ unbounded
(oTimeout, iTimeout) <- spawn $ bounded 1
tid <- launchTimeout oTimeout >>= newMVar
forkIO $ do
runEffect . forever $ fromInput iKept >-> factorTimeout tid oTimeout >-> toOutput oKept
forkIO $ do
runEffect . forever $ fromInput iTimeout >-> toOutput oKept
return $ do
await >>= (liftIO . guardedSend oSent)
(liftIO . guardedRecv $ iSent) >>= yield
guardedSend :: Output ConsoleEvent -> ConsoleEvent -> IO ()
guardedSend o ce =
(atomically $ send o ce) >>= \case
True -> return ()
otherwise -> die $ "withTimeout can not send"
guardedRecv :: Input ConsoleEvent -> IO ConsoleEvent
guardedRecv i =
(atomically $ recv i) >>= \case
Just a -> return a
otherwise -> die $ "withTimeout can not recv"
launchTimeout :: Output ConsoleEvent -> IO ThreadId
launchTimeout o =
forkIO . forever $ do
threadDelay $ s
(atomically $ send o CeTimeout) >>= \case
True -> return ()
otherwise -> die "withTimeout can not send timeout"
relaunchTimeout :: Output ConsoleEvent -> ThreadId -> IO ThreadId
relaunchTimeout o oldTid =
do
tid <- launchTimeout o
killThread oldTid
return tid
factorTimeout :: MVar ThreadId -> Output ConsoleEvent -> Pipe ConsoleEvent ConsoleEvent IO ()
factorTimeout v o =
do
ce <- await
liftIO . modifyMVar_ v $ relaunchTimeout o
yield ce
Here is a fully executable script.
It seems like a Pipe
will only allow one yield
per await
. This means that a CeTimeout
can not arbitrarily be sent down the pipe because nothing came into the pipe to cause the flow. I will have to go through the source to confirm this; in the meantime this function has been refactored to return a Pipe
and a Producer
instead of just a Pipe
. The Producer
can then be joined back in the calling function. The initial plan was to return just a Pipe
so that the calling function would not have to do any additional work to make timeouts work. That would have been a more self contained solution. This alternative is nice in that it is more explicit. The timeouts won't look like they are appearing out of thin air to someone that is not familiar with the pipeline.
withTimeout :: (MonadIO t) => Int -> IO (Pipe ConsoleEvent ConsoleEvent t (), Producer ConsoleEvent t ())
withTimeout ((* 1000000) -> s) =
do
(oTimeout, iTimeout) <- spawn $ bounded 1
vTid <- launchTimeout oTimeout >>= newMVar
return (factorTimeout vTid oTimeout, fromInput iTimeout)
where
launchTimeout :: Output ConsoleEvent -> IO ThreadId
launchTimeout o =
forkIO . forever $ do
threadDelay $ s
(atomically $ send o CeTimeout) >>= \case
True -> return ()
otherwise -> die "withTimeout can not send timeout"
relaunchTimeout :: Output ConsoleEvent -> ThreadId -> IO ThreadId
relaunchTimeout o oldTid =
do
tid <- launchTimeout o
killThread oldTid
return tid
factorTimeout :: (MonadIO t) => MVar ThreadId -> Output ConsoleEvent -> Pipe ConsoleEvent ConsoleEvent t ()
factorTimeout v o =
do
ce <- await
liftIO . modifyMVar_ v $ relaunchTimeout o
yield ce
main :: IO ()
main =
do
hSetBuffering stdin NoBuffering
hSetEcho stdin False
exitSemaphore <- newEmptyMVar
(o1, i1) <- spawn $ bounded 1
(o2, i2) <- spawn $ bounded 1
(timeoutTrap, timeoutRender) <- withTimeout 2
runEffect $ yield CeBegan >-> toOutput o1
forkIO $ do
runEffect . forever $ chars >-> toOutput o1
putMVar exitSemaphore ()
-- other inputs would be piped to o1 here
forkIO $ do
runEffect . forever $ fromInput i1 >-> timeoutTrap >-> toOutput o2
putMVar exitSemaphore ()
forkIO $ do
runEffect . forever $ timeoutRender >-> toOutput o2
putMVar exitSemaphore ()
forkIO $ do
-- logic would be done before dumpPipe
runEffect . forever $ fromInput i2 >-> dumpPipe >-> (await >> return ())
putMVar exitSemaphore ()
takeMVar exitSemaphore
Here is a fully executable script.