Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- //MBRPopLPush : Multiple Blocking Right Pop + L Push
- //Returns the source queue, data, and (optionally) an error
- func MBRPopLPush(cli *redis.Client, queues ...string) (sourceQ string, val string, err error) {
- eg, ctx := errgroup.WithContext(cli.Context())
- cli = cli.WithContext(ctx)
- semaphore := make(chan interface{}, 1)
- for _, iq := range queues {
- q := iq
- eg.Go(func() error {
- cmd, err := cli.BRPopLPush(q, q+":acked", 0).Result()
- if err != nil {
- return err
- }
- _, semAquired := <-semaphore
- if !semAquired {
- tx := cli.TxPipeline()
- tx.RPush(q, cmd)
- tx.LRem(q+":acked", 1, cmd)
- _, err := tx.Exec()
- if err != nil {
- return err
- }
- }
- sourceQ = q
- val = cmd
- return context.Canceled
- })
- }
- semaphore <- true
- err = eg.Wait()
- if err == context.Canceled {
- err = nil
- }
- return
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement