daily pastebin goal
50%
SHARE
TWEET

Untitled

a guest Jan 24th, 2019 79 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. package main
  2.  
  3. import (
  4.     "encoding/json"
  5.     "flag"
  6.     "fmt"
  7.     "github.com/elygiux/go-nexmo"
  8.     "github.com/joho/godotenv"
  9.     "github.com/streadway/amqp"
  10.     "log"
  11.     "time"
  12. )
  13.  
  14. type Message struct {
  15.     Id             int    `json:"id"`
  16.     To             string `json:"to"`
  17.     From           string `json:"from"`
  18.     Text           string `json:"text"`
  19.     CallbackString string `json:"callback"`
  20. }
  21.  
  22. type Job struct {
  23.     Id      int
  24.     Message Message
  25.     Delay   time.Duration
  26. }
  27.  
  28. type Worker struct {
  29.     id         int
  30.     jobQueue   chan Job
  31.     workerPool chan chan Job
  32.     quitChan   chan bool
  33. }
  34.  
  35. type Dispatcher struct {
  36.     workerPool chan chan Job
  37.     maxWorkers int
  38.     jobQueue   chan Job
  39. }
  40.  
  41. func NewWorker(id int, workerPool chan chan Job) Worker {
  42.     return Worker{
  43.         id:         id,
  44.         jobQueue:   make(chan Job),
  45.         workerPool: workerPool,
  46.         quitChan:   make(chan bool),
  47.     }
  48. }
  49.  
  50. func (w Worker) start() {
  51.     go func() {
  52.         for {
  53.             // Add my jobQueue to the worker pool.
  54.             w.workerPool <- w.jobQueue
  55.  
  56.             select {
  57.             case job := <-w.jobQueue:
  58.                 // Dispatcher has added a job to my jobQueue.
  59.                 //fmt.Println("worker%d: started %b", w.id, job.Id)
  60.                 //time.Sleep(job.Delay)
  61.                 fmt.Printf("worker%d: completed %b!\n", w.id, job.Id)
  62.  
  63.                 message := job.Message
  64.  
  65.                 nexmoClient, _ := go_nexmo.NewClient("41c0d9fd", "3rn54GdICsFEwIyB")
  66.  
  67.                 nexmoMessage := &go_nexmo.SMSMessage{
  68.                     From:  message.From,
  69.                     To:    message.To,
  70.                     Type:  go_nexmo.Text,
  71.                     Text:  message.Text,
  72.                     Class: go_nexmo.Standard,
  73.                     //VCal:  message.CallbackString,
  74.                 }
  75.  
  76.                 _, err := nexmoClient.SMS.Send(nexmoMessage)
  77.                 failOnError(err, "Failed sending Nexmo message")
  78.  
  79.  
  80.             case <-w.quitChan:
  81.                 // We have been asked to stop.
  82.                 //fmt.Printf("worker%b stopping\n", w.id)
  83.                 return
  84.             }
  85.         }
  86.     }()
  87. }
  88.  
  89. func (w Worker) stop() {
  90.     go func() {
  91.         w.quitChan <- true
  92.     }()
  93. }
  94.  
  95. func NewDispatcher(jobQueue chan Job, maxWorkers int) *Dispatcher {
  96.     workerPool := make(chan chan Job, maxWorkers)
  97.  
  98.     return &Dispatcher{
  99.         jobQueue:   jobQueue,
  100.         maxWorkers: maxWorkers,
  101.         workerPool: workerPool,
  102.     }
  103. }
  104.  
  105. func (d *Dispatcher) run() {
  106.     for i := 0; i < d.maxWorkers; i++ {
  107.         worker := NewWorker(i+1, d.workerPool)
  108.         worker.start()
  109.     }
  110.  
  111.     go d.dispatch()
  112. }
  113.  
  114. func (d *Dispatcher) dispatch() {
  115.     for {
  116.         select {
  117.         case job := <-d.jobQueue:
  118.             go func() {
  119.                 //fmt.Printf("fetching workerJobQueue for: %b\n", job.Id)
  120.                 workerJobQueue := <-d.workerPool
  121.                 //fmt.Printf("adding %b to workerJobQueue\n", job.Id)
  122.                 workerJobQueue <- job
  123.             }()
  124.         }
  125.     }
  126. }
  127.  
  128. func requestHandler(jobs chan Job, message Message) {
  129.     duration := time.Second
  130.  
  131.     job := Job{message.Id, message, duration}
  132.     go func() {
  133.         jobs <- job
  134.     }()
  135. }
  136.  
  137. func main() {
  138.     err := godotenv.Load()
  139.     failOnError(err, "No environment file provided")
  140.  
  141.     var (
  142.         maxWorkers   = flag.Int("max_workers", 20, "The number of workers to start")
  143.         maxQueueSize = flag.Int("max_queue_size", 500, "The size of job queue")
  144.     )
  145.  
  146.     flag.Parse()
  147.  
  148.     jobQueue := make(chan Job, *maxQueueSize)
  149.  
  150.     // Start the dispatcher.
  151.     dispatcher := NewDispatcher(jobQueue, *maxWorkers)
  152.     dispatcher.run()
  153.  
  154.     conn, err := amqp.Dial("amqp://mqadmin:rmq_super_secret_17_dybala@192.99.62.45:5672/")
  155.     failOnError(err, "Failed to connect to RabbitMQ")
  156.     defer conn.Close()
  157.  
  158.     ch, err := conn.Channel()
  159.     failOnError(err, "Failed to open a channel")
  160.     defer ch.Close()
  161.  
  162.     q, err := ch.QueueDeclare(
  163.         "nexmo-messages", // name
  164.         true,             // durable
  165.         false,            // delete when unused
  166.         false,            // exclusive
  167.         false,            // no-wait
  168.         nil,              // arguments
  169.     )
  170.     failOnError(err, "Failed to declare a queue")
  171.  
  172.     err = ch.Qos(
  173.         1,     // prefetch count
  174.         0,     // prefetch size
  175.         false, // global
  176.     )
  177.     failOnError(err, "Failed to set QoS")
  178.  
  179.     msgs, err := ch.Consume(
  180.         q.Name, // queue
  181.         "",     // consumer
  182.         false,  // auto-ack
  183.         false,  // exclusive
  184.         false,  // no-local
  185.         false,  // no-wait
  186.         nil,    // args
  187.     )
  188.     failOnError(err, "Failed to register a consumer")
  189.  
  190.     forever := make(chan bool)
  191.  
  192.     /*client := redis.NewClient(&redis.Options{
  193.         Addr:     "127.0.0.1:6379",
  194.         Password: "",
  195.         DB:       0,
  196.     })*/
  197.  
  198.     for m := range msgs {
  199.         message := Message{}
  200.         err := json.Unmarshal([]byte(m.Body), &message)
  201.  
  202.         failOnError(err, "Failed parsing message json")
  203.         defer conn.Close()
  204.  
  205.         m.Ack(false)
  206.  
  207.         requestHandler(jobQueue, message)
  208.  
  209.         /*key := strconv.Itoa(message.Id)
  210.         client.Set(key, messageResponse.Messages[0].Status, 30000000000).Err()
  211.         failOnError(err, "Failed adding redis key")
  212.         */
  213.     }
  214.  
  215.     log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
  216.     <-forever
  217. }
  218.  
  219. func failOnError(err error, msg string) {
  220.     if err != nil {
  221.         log.Fatalf("Failed with error: %s: %s", msg, err)
  222.     }
  223. }
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top