Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env stack
- -- stack --resolver lts-11.10 script
- {-# LANGUAGE NoImplicitPrelude #-}
- import RIO
- import Conduit
- import Control.Concurrent.STM.TBMQueue
- doesn'tStall
- :: MonadUnliftIO m
- => Int -- ^ number of microseconds
- -> ConduitT () o m () -- ^ original source
- -> (ConduitT () o m () -> m a) -- ^ what to do with modified source
- -> m a
- doesn'tStall usec src inner = do
- queue <- liftIO $ newTBMQueueIO 2
- runConcurrently $
- Concurrently (filler queue) *>
- Concurrently (inner $ consumer queue)
- where
- filler queue =
- runConduit (src .| mapM_C (atomically . writeTBMQueue queue))
- `finally` atomically (closeTBMQueue queue)
- consumer queue =
- loop
- where
- loop = do
- res <- lift $ timeout usec $ atomically $ readTBMQueue queue
- case res of
- -- timeout occurred
- Nothing -> error "too slow Joe!" -- better exception type
- -- queue is closed
- Just Nothing -> pure ()
- -- more data available
- Just (Just o) -> yield o >> loop
- slowSource :: ConduitT () Int IO ()
- slowSource = forM_ [1..] $ \i -> do
- yield i
- threadDelay $ i * 100000
- main :: IO ()
- main = doesn'tStall 1000000 slowSource $ \src ->
- runConduit $ src .| printC
Add Comment
Please, Sign In to add comment