Guest User

Untitled

a guest
Jun 22nd, 2018
102
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.25 KB | None | 0 0
  1. #!/usr/bin/env stack
  2. -- stack --resolver lts-11.10 script
  3. {-# LANGUAGE NoImplicitPrelude #-}
  4. import RIO
  5. import Conduit
  6. import Control.Concurrent.STM.TBMQueue
  7.  
  8. doesn'tStall
  9. :: MonadUnliftIO m
  10. => Int -- ^ number of microseconds
  11. -> ConduitT () o m () -- ^ original source
  12. -> (ConduitT () o m () -> m a) -- ^ what to do with modified source
  13. -> m a
  14. doesn'tStall usec src inner = do
  15. queue <- liftIO $ newTBMQueueIO 2
  16. runConcurrently $
  17. Concurrently (filler queue) *>
  18. Concurrently (inner $ consumer queue)
  19. where
  20. filler queue =
  21. runConduit (src .| mapM_C (atomically . writeTBMQueue queue))
  22. `finally` atomically (closeTBMQueue queue)
  23.  
  24. consumer queue =
  25. loop
  26. where
  27. loop = do
  28. res <- lift $ timeout usec $ atomically $ readTBMQueue queue
  29. case res of
  30. -- timeout occurred
  31. Nothing -> error "too slow Joe!" -- better exception type
  32.  
  33. -- queue is closed
  34. Just Nothing -> pure ()
  35.  
  36. -- more data available
  37. Just (Just o) -> yield o >> loop
  38.  
  39. slowSource :: ConduitT () Int IO ()
  40. slowSource = forM_ [1..] $ \i -> do
  41. yield i
  42. threadDelay $ i * 100000
  43.  
  44. main :: IO ()
  45. main = doesn'tStall 1000000 slowSource $ \src ->
  46. runConduit $ src .| printC
Add Comment
Please, Sign In to add comment