Advertisement
Guest User

Untitled

a guest
Feb 28th, 2019
90
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.90 KB | None | 0 0
  1. package main
  2.  
  3. import (
  4. "fmt"
  5. "log"
  6. "os"
  7. "sync"
  8. "time"
  9.  
  10. uuid "github.com/satori/go.uuid"
  11. "github.com/streadway/amqp"
  12. )
  13.  
  14. func exit1(err error, msg string) {
  15. if err != nil {
  16. log.Fatalf("%s: %s", msg, err)
  17. }
  18. }
  19.  
  20. func main() {
  21. rUSER := "bunny"
  22. rPASS := "test"
  23. rHOST := "my-rabbit"
  24. rPORT := "5672"
  25. rVHOST := "hole"
  26.  
  27. // read from ENV
  28. if e := os.Getenv("RABBITMQ_USER"); e != "" {
  29. rUSER = e
  30. }
  31. if e := os.Getenv("RABBITMQ_PASS"); e != "" {
  32. rPASS = e
  33. }
  34. if e := os.Getenv("RABBITMQ_HOST"); e != "" {
  35. rHOST = e
  36. }
  37. if e := os.Getenv("RABBITMQ_PORT"); e != "" {
  38. rPORT = e
  39. }
  40. if e := os.Getenv("RABBITMQ_VHOST"); e != "" {
  41. rVHOST = e
  42. }
  43.  
  44. conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%s/%s",
  45. rUSER, rPASS, rHOST, rPORT, rVHOST))
  46. exit1(err, "Failed to connect to RabbitMQ")
  47. defer conn.Close()
  48.  
  49. ch, err := conn.Channel()
  50. exit1(err, "Failed to open a channel")
  51. defer ch.Close()
  52.  
  53. // buggy part
  54. args := map[string]interface{}{
  55. "x-message-ttl": int32(3000),
  56. "x-expires": int32(8000), // <-- culprit
  57. }
  58.  
  59. concurrent := 500
  60.  
  61. wg := sync.WaitGroup{}
  62. semaphore := make(chan struct{}, concurrent)
  63.  
  64. for i := 0; i < 1000; i++ {
  65. semaphore <- struct{}{}
  66. wg.Add(1)
  67. go func() {
  68. queueName := fmt.Sprintf("carrot-%s-%s", time.Now().Format("2006-01-02"), uuid.Must(uuid.NewV4()))
  69. fmt.Printf("Creating queue: %sn", queueName)
  70. defer func() {
  71. <-semaphore
  72. wg.Done()
  73. }()
  74. q, err := ch.QueueDeclare(
  75. queueName,
  76. false, // durable
  77. false, // delete when usused
  78. false, // exclusive
  79. false, // no-wait
  80. args, // arguments
  81. )
  82. exit1(err, "Failed to declare a queue")
  83.  
  84. // how to measure here time elapsed between ch.Consume is called
  85.  
  86. _, err = ch.Consume(
  87. q.Name, // queue
  88. "", // consumer
  89. true, // auto-ack
  90. false, // exclusive
  91. false, // no-local
  92. false, // no-wait
  93. nil, // args
  94. )
  95. exit1(err, "Failed to register a consumer")
  96. }()
  97. }
  98. wg.Wait()
  99. }
  100.  
  101. "x-expires": int32(8000),
  102.  
  103. q, err := ch.QueueDeclare(
  104. queueName,
  105. false, // durable
  106. false, // delete when usused
  107. false, // exclusive
  108. false, // no-wait
  109. args, // arguments
  110. )
  111.  
  112. _, err = ch.Consume(
  113. q.Name, // queue
  114. "", // consumer
  115. true, // auto-ack
  116. false, // exclusive
  117. false, // no-local
  118. false, // no-wait
  119. nil, // args
  120. )
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement