Advertisement
Guest User

Untitled

a guest
Dec 7th, 2019
126
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 3.10 KB | None | 0 0
  1. #include <ev.h>
  2. #include <amqpcpp.h>
  3. #include <amqpcpp/libev.h>
  4.  
  5.  
  6. class MyHandler : public AMQP::LibEvHandler
  7. {
  8. public:
  9.     MyHandler(struct ev_loop* loop) : AMQP::LibEvHandler(loop) {}
  10.  
  11.     virtual ~MyHandler() = default;
  12.  
  13. private:
  14.     virtual void onError(AMQP::TcpConnection *connection, const char *message) override
  15.     {
  16.         std::cout << "error: " << message << std::endl;
  17.     }
  18.  
  19.     virtual void onConnected(AMQP::TcpConnection *connection) override
  20.     {
  21.         std::cout << "connected" << std::endl;
  22.     }
  23.  
  24.     virtual void onReady(AMQP::TcpConnection *connection) override
  25.     {
  26.         std::cout << "ready" << std::endl;
  27.     }
  28.  
  29.     virtual void onClosed(AMQP::TcpConnection *connection) override
  30.     {
  31.         std::cout << "closed" << std::endl;
  32.     }
  33.  
  34.     virtual void onDetached(AMQP::TcpConnection *connection) override
  35.     {
  36.         std::cout << "detached" << std::endl;
  37.     }
  38. };
  39.  
  40. class MyTimer
  41. {
  42. private:
  43.     struct ev_timer _timer;
  44.  
  45.     AMQP::TcpChannel* _channel;
  46.  
  47.     std::string _queue;
  48.  
  49.     static void callback(struct ev_loop *loop, struct ev_timer *timer, int revents)
  50.     {
  51.         MyTimer *self = static_cast<MyTimer*>(timer->data);
  52.  
  53.         static int i = 2130;
  54.         i++;
  55.  
  56.         std::string msg(std::string(
  57.             "{"
  58.                 "\"creationDate\": null,"
  59.                 "\"value\": \"") + std::to_string(i) + std::string("\","
  60.                 "\"sensorKey\" : \"boguc\","
  61.                 "\"time\": \"2019-12-07T19:31:59.370Z\""
  62.             "}")
  63.         );
  64.         AMQP::Envelope envelope(msg.c_str(), sizeof(char)*msg.length());
  65.         envelope.setDeliveryMode(uint8_t(1));
  66.         envelope.setExpiration("5000");
  67.         envelope.setContentType("application/json");
  68.         envelope.setContentEncoding("UTF8");
  69.  
  70.         self->_channel->publish("scorpio.direct", "SaveSensorDataEvent",
  71.             envelope);
  72.     }
  73.  
  74. public:
  75.     MyTimer(struct ev_loop *loop, AMQP::TcpChannel *channel, std::string queue) :
  76.         _channel(channel), _queue(std::move(queue))
  77.     {
  78.         ev_timer_init(&_timer, callback, 0.005, 1.005);
  79.         _timer.data = this;
  80.         ev_timer_start(loop, &_timer);
  81.     }
  82.  
  83.     virtual ~MyTimer() = default;
  84. };
  85.  
  86.  
  87. auto main(int argc, char** argv) -> int
  88. {
  89.     auto *loop = EV_DEFAULT;
  90.  
  91.     MyHandler handler(loop);
  92.  
  93.     AMQP::Address address("amqp://guest:guest@192.168.1.94:5672/");
  94.     AMQP::TcpConnection connection(&handler, address);
  95.  
  96.     AMQP::TcpChannel channel(&connection);
  97.  
  98.     channel.declareExchange("scorpio.direct", AMQP::direct);
  99.     channel.declareQueue("scorpio.rover.jetson").onSuccess([&connection, &channel, loop](
  100.         const std::string &name, uint32_t messagecount, uint32_t consumercount)
  101.         {
  102.             std::cout << "declared queue " << name << std::endl;
  103.        
  104.             // construct a timer that is going to publish stuff
  105.             auto *timer = new MyTimer(loop, &channel, name);
  106.             //connection.close();
  107.         });
  108.     channel.bindQueue("scorpio.direct", "scorpio.rover.jetson", "FiluToSzmata");
  109.  
  110.     ev_run(loop, 0);
  111.  
  112.     return 0;
  113. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement