Advertisement
Guest User

Untitled

a guest
Jul 23rd, 2019
104
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.60 KB | None | 0 0
  1. package messaging
  2.  
  3. import (
  4. "strings"
  5.  
  6. "github.com/streadway/amqp"
  7. )
  8.  
  9.  
  10. // change in pkg/messaging/client.go
  11. type IMessageingClient interface {
  12. ConnectToBroker(connectionString string) error
  13. DeclareExchange(moduleName string) error
  14. DeclareQueue(queueName string) error
  15. DeclareRequestQueue(queueName string) error
  16. PublishOnQueue(msg []byte, queueName string, maxPriority uint8) error
  17. SubscribeQueueByMethod(queueName string, handlerFunc func(*interfaces.RequestMessage)) error
  18. Close()
  19. }
  20.  
  21. func (m *MessagingClient) DeclareExchange(moduleName string) error {
  22. ch, err := m.conn.Channel()
  23. failOnError(err, "Failed to open a channel")
  24. defer ch.Close()
  25.  
  26. priority := make(map[string]interface{})
  27. priority["x-max-priority"] = 10
  28.  
  29. err = ch.ExchangeDeclare(
  30. moduleName, // name of the exchange
  31. "direct", // type
  32. true, // durable
  33. false, // delete when complete
  34. false, // internal
  35. false, // noWait
  36. amqp.Table(priority), // arguments
  37. )
  38.  
  39. failOnError(err, "Failed to register an Exchange")
  40. return nil
  41. }
  42.  
  43. func (m *MessagingClient) DeclareQueue(queueName string) error {
  44. if m.conn == nil {
  45. return brokerIsNotConnected
  46. }
  47. // connect to rabbit channel
  48. ch, err := m.conn.Channel()
  49. failOnError(err, "Failed to open a channel")
  50. defer ch.Close()
  51.  
  52. priority := make(map[string]interface{})
  53. priority["x-max-priority"] = 10
  54.  
  55. _, err = ch.QueueDeclare(
  56. queueName, // name of the queue
  57. true, // durable
  58. false, // delete when usused
  59. false, // exclusive
  60. false, // noWait
  61. amqp.Table(priority), // arguments
  62. )
  63. failOnError(err, "Failed to register an Request Queue")
  64.  
  65. return nil
  66. }
  67.  
  68. func (m *MessagingClient) DeclareRequestQueue(queueName string) error {
  69. if m.conn == nil {
  70. return brokerIsNotConnected
  71. }
  72. // connect to rabbit channel
  73. ch, err := m.conn.Channel()
  74. failOnError(err, "Failed to open a channel")
  75. defer ch.Close()
  76.  
  77. priority := make(map[string]interface{})
  78. priority["x-max-priority"] = 10
  79.  
  80. _, err = ch.QueueDeclare(
  81. queueName+".req", // name of the queue
  82. true, // durable
  83. false, // delete when usused
  84. false, // exclusive
  85. false, // noWait
  86. amqp.Table(priority), // arguments
  87. )
  88. failOnError(err, "Failed to register an Request Queue")
  89. moduleName := strings.Split(queueName, ".")[0]
  90.  
  91. err = ch.QueueBind(
  92. queueName+".req", // queue name
  93. queueName+".req", // routing key
  94. moduleName, // exchange
  95. false,
  96. nil,
  97. )
  98. failOnError(err, "Failed to bind a Request queue")
  99.  
  100. return nil
  101. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement