SHARE
TWEET

Untitled

a guest Aug 19th, 2019 47 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. //MBRPopLPush : Multiple Blocking Right Pop + L Push
  2. //Returns the source queue, data, and (optionally) an error
  3. func MBRPopLPush(cli *redis.Client, queues ...string) (sourceQ string, val string, err error) {
  4.     eg, ctx := errgroup.WithContext(cli.Context())
  5.     cli = cli.WithContext(ctx)
  6.     semaphore := make(chan interface{}, 1)
  7.     for _, iq := range queues {
  8.         q := iq
  9.         eg.Go(func() error {
  10.             cmd, err := cli.BRPopLPush(q, q+":acked", 0).Result()
  11.             if err != nil {
  12.                 return err
  13.             }
  14.             _, semAquired := <-semaphore
  15.             if !semAquired {
  16.                 tx := cli.TxPipeline()
  17.                 tx.RPush(q, cmd)
  18.                 tx.LRem(q+":acked", 1, cmd)
  19.                 _, err := tx.Exec()
  20.                 if err != nil {
  21.                     return err
  22.                 }
  23.             }
  24.             sourceQ = q
  25.             val = cmd
  26.             return context.Canceled
  27.         })
  28.     }
  29.     semaphore <- true
  30.     err = eg.Wait()
  31.     if err == context.Canceled {
  32.         err = nil
  33.     }
  34.     return
  35. }
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top