Advertisement
Guest User

Untitled

a guest
Aug 19th, 2019
70
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 0.82 KB | None | 0 0
  1. //MBRPopLPush : Multiple Blocking Right Pop + L Push
  2. //Returns the source queue, data, and (optionally) an error
  3. func MBRPopLPush(cli *redis.Client, queues ...string) (sourceQ string, val string, err error) {
  4. eg, ctx := errgroup.WithContext(cli.Context())
  5. cli = cli.WithContext(ctx)
  6. semaphore := make(chan interface{}, 1)
  7. for _, iq := range queues {
  8. q := iq
  9. eg.Go(func() error {
  10. cmd, err := cli.BRPopLPush(q, q+":acked", 0).Result()
  11. if err != nil {
  12. return err
  13. }
  14. _, semAquired := <-semaphore
  15. if !semAquired {
  16. tx := cli.TxPipeline()
  17. tx.RPush(q, cmd)
  18. tx.LRem(q+":acked", 1, cmd)
  19. _, err := tx.Exec()
  20. if err != nil {
  21. return err
  22. }
  23. }
  24. sourceQ = q
  25. val = cmd
  26. return context.Canceled
  27. })
  28. }
  29. semaphore <- true
  30. err = eg.Wait()
  31. if err == context.Canceled {
  32. err = nil
  33. }
  34. return
  35. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement