Advertisement
Guest User

Untitled

a guest
Oct 21st, 2016
64
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.55 KB | None | 0 0
  1. #include <cstring>
  2. #include <fstream>
  3. #include <iostream>
  4.  
  5. #include <cstdlib>
  6. #include "airlink/event.h"
  7. #include "airlink/basic_io.h"
  8. #include "airlink/server_socket.h"
  9. #include "airlink/messaging.h"
  10. #include "airlink/logging.h"
  11.  
  12. using namespace Airlink;
  13. using namespace Airlink::Net;
  14.  
  15.  
  16. namespace {
  17. struct StandardInputReader : public BasicIO
  18. {
  19. StandardInputReader() : BasicIO(STDIN_FILENO) { }
  20. };
  21.  
  22.  
  23. struct MyPublisher
  24. {
  25. ServerSocket server;
  26. Messaging::Publisher pub;
  27. StandardInputReader in;
  28. Messaging::Subject subject;
  29.  
  30. MyPublisher(const IPv4SocketAddress&, const Messaging::Subject&);
  31. void read_and_send_event();
  32. void accept_connection();
  33. };
  34. }
  35.  
  36. MyPublisher::
  37. MyPublisher(const IPv4SocketAddress& bind_to, const Messaging::Subject& sub)
  38. : server(bind_to), subject(sub)
  39. {
  40. IPv4SocketAddress bind_addr = server.get_local_address();
  41.  
  42. server.add_to_event_loop<MyPublisher,
  43. &MyPublisher::accept_connection>(this);
  44. std::clog << LOG_DEBUG << "Listening on " << bind_addr << "." << std::endl;
  45.  
  46. // Register publisher.
  47. std::ofstream os;
  48. Messaging::registry_out(os, subject, "publisher");
  49. os << "127.0.0.1:" << bind_addr.port << std::endl;
  50.  
  51. in.add_to_event_loop<MyPublisher,
  52. &MyPublisher::read_and_send_event>(this);
  53. std::clog << LOG_DEBUG << "Reading messages from stdin." << std::endl;
  54. }
  55.  
  56.  
  57. void MyPublisher::read_and_send_event()
  58. {
  59. std::string body;
  60. std::getline(std::cin, body);
  61. if (std::cin.eof())
  62. {
  63. std::clog << LOG_DEBUG << "End-of-file on stdin; exiting." << std::endl;
  64. std::exit(0);
  65. }
  66.  
  67. Messaging::Message msg;
  68. msg.set_subject(subject);
  69. msg.put(body);
  70. pub.publish(msg);
  71. }
  72.  
  73.  
  74. void MyPublisher::accept_connection()
  75. {
  76. std::clog << LOG_DEBUG << "Accepting connection." << std::endl;
  77.  
  78. StreamSocket* socket = server.accept();
  79. Messaging::Engine* engine = Messaging::create_engine(socket);
  80. Messaging::Link* link = Messaging::create_link(engine);
  81.  
  82. pub.add_subscriber(link, engine);
  83. }
  84.  
  85. int main(int argc, char const* argv[])
  86. {
  87. try
  88. {
  89. char const *prog = basename(argv[0]);
  90. if (argc != 3)
  91. {
  92. std::clog << LOG_ERR << "Usage: " << prog << " HOST:PORT SUBJECT" << std::endl;
  93. return 1;
  94. }
  95. IPv4SocketAddress addr(argv[1]);
  96. Messaging::Subject subject = argv[2];
  97.  
  98. Messaging::container_name(prog);
  99. MyPublisher node(addr, subject);
  100. Event::run();
  101. }
  102. catch (const std::exception &e)
  103. {
  104. std::clog << LOG_ERR << e.what() << "." << std::endl;
  105. return 1;
  106. }
  107. }
  108.  
  109. #include <cerrno>
  110. #include <cstring>
  111. #include <iostream>
  112.  
  113. #include "airlink/event.h"
  114. #include "airlink/inet.h"
  115. #include "airlink/stream_socket.h"
  116. #include "airlink/messaging.h"
  117. #include "airlink/logging.h"
  118.  
  119. using namespace Airlink;
  120. using Airlink::Net::StreamSocket;
  121.  
  122. int main(int argc, char* argv[])
  123. {
  124. char const *prog = basename(argv[0]);
  125. if (argc < 2)
  126. {
  127. std::clog << LOG_ERR << "Usage: " << prog << " HOST:PORT" << std::endl;
  128. return 1;
  129. }
  130.  
  131. StreamSocket* socket = new StreamSocket;
  132. socket->connect(IPv4SocketAddress(argv[1]));
  133.  
  134. Messaging::container_name(prog);
  135. Messaging::Engine* engine = Messaging::create_engine(socket);
  136. Messaging::Link* link = Messaging::create_receiver(engine, "r", "", "");
  137.  
  138. while(not link->incoming())
  139. Event::run(Airlink::Event::ONCE);
  140.  
  141. Messaging::Message msg = link->receive_message(engine);
  142. std::cout << msg.get_string() << std::endl;
  143.  
  144. return 0;
  145. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement