Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package messaging
- import (
- "strings"
- "github.com/streadway/amqp"
- )
- // change in pkg/messaging/client.go
- type IMessageingClient interface {
- ConnectToBroker(connectionString string) error
- DeclareExchange(moduleName string) error
- DeclareQueue(queueName string) error
- DeclareRequestQueue(queueName string) error
- PublishOnQueue(msg []byte, queueName string, maxPriority uint8) error
- SubscribeQueueByMethod(queueName string, handlerFunc func(*interfaces.RequestMessage)) error
- Close()
- }
- func (m *MessagingClient) DeclareExchange(moduleName string) error {
- ch, err := m.conn.Channel()
- failOnError(err, "Failed to open a channel")
- defer ch.Close()
- priority := make(map[string]interface{})
- priority["x-max-priority"] = 10
- err = ch.ExchangeDeclare(
- moduleName, // name of the exchange
- "direct", // type
- true, // durable
- false, // delete when complete
- false, // internal
- false, // noWait
- amqp.Table(priority), // arguments
- )
- failOnError(err, "Failed to register an Exchange")
- return nil
- }
- func (m *MessagingClient) DeclareQueue(queueName string) error {
- if m.conn == nil {
- return brokerIsNotConnected
- }
- // connect to rabbit channel
- ch, err := m.conn.Channel()
- failOnError(err, "Failed to open a channel")
- defer ch.Close()
- priority := make(map[string]interface{})
- priority["x-max-priority"] = 10
- _, err = ch.QueueDeclare(
- queueName, // name of the queue
- true, // durable
- false, // delete when usused
- false, // exclusive
- false, // noWait
- amqp.Table(priority), // arguments
- )
- failOnError(err, "Failed to register an Request Queue")
- return nil
- }
- func (m *MessagingClient) DeclareRequestQueue(queueName string) error {
- if m.conn == nil {
- return brokerIsNotConnected
- }
- // connect to rabbit channel
- ch, err := m.conn.Channel()
- failOnError(err, "Failed to open a channel")
- defer ch.Close()
- priority := make(map[string]interface{})
- priority["x-max-priority"] = 10
- _, err = ch.QueueDeclare(
- queueName+".req", // name of the queue
- true, // durable
- false, // delete when usused
- false, // exclusive
- false, // noWait
- amqp.Table(priority), // arguments
- )
- failOnError(err, "Failed to register an Request Queue")
- moduleName := strings.Split(queueName, ".")[0]
- err = ch.QueueBind(
- queueName+".req", // queue name
- queueName+".req", // routing key
- moduleName, // exchange
- false,
- nil,
- )
- failOnError(err, "Failed to bind a Request queue")
- return nil
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement