Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <cstring>
- #include <fstream>
- #include <iostream>
- #include <cstdlib>
- #include "airlink/event.h"
- #include "airlink/basic_io.h"
- #include "airlink/server_socket.h"
- #include "airlink/messaging.h"
- #include "airlink/logging.h"
- using namespace Airlink;
- using namespace Airlink::Net;
- namespace {
- struct StandardInputReader : public BasicIO
- {
- StandardInputReader() : BasicIO(STDIN_FILENO) { }
- };
- struct MyPublisher
- {
- ServerSocket server;
- Messaging::Publisher pub;
- StandardInputReader in;
- Messaging::Subject subject;
- MyPublisher(const IPv4SocketAddress&, const Messaging::Subject&);
- void read_and_send_event();
- void accept_connection();
- };
- }
- MyPublisher::
- MyPublisher(const IPv4SocketAddress& bind_to, const Messaging::Subject& sub)
- : server(bind_to), subject(sub)
- {
- IPv4SocketAddress bind_addr = server.get_local_address();
- server.add_to_event_loop<MyPublisher,
- &MyPublisher::accept_connection>(this);
- std::clog << LOG_DEBUG << "Listening on " << bind_addr << "." << std::endl;
- // Register publisher.
- std::ofstream os;
- Messaging::registry_out(os, subject, "publisher");
- os << "127.0.0.1:" << bind_addr.port << std::endl;
- in.add_to_event_loop<MyPublisher,
- &MyPublisher::read_and_send_event>(this);
- std::clog << LOG_DEBUG << "Reading messages from stdin." << std::endl;
- }
- void MyPublisher::read_and_send_event()
- {
- std::string body;
- std::getline(std::cin, body);
- if (std::cin.eof())
- {
- std::clog << LOG_DEBUG << "End-of-file on stdin; exiting." << std::endl;
- std::exit(0);
- }
- Messaging::Message msg;
- msg.set_subject(subject);
- msg.put(body);
- pub.publish(msg);
- }
- void MyPublisher::accept_connection()
- {
- std::clog << LOG_DEBUG << "Accepting connection." << std::endl;
- StreamSocket* socket = server.accept();
- Messaging::Engine* engine = Messaging::create_engine(socket);
- Messaging::Link* link = Messaging::create_link(engine);
- pub.add_subscriber(link, engine);
- }
- int main(int argc, char const* argv[])
- {
- try
- {
- char const *prog = basename(argv[0]);
- if (argc != 3)
- {
- std::clog << LOG_ERR << "Usage: " << prog << " HOST:PORT SUBJECT" << std::endl;
- return 1;
- }
- IPv4SocketAddress addr(argv[1]);
- Messaging::Subject subject = argv[2];
- Messaging::container_name(prog);
- MyPublisher node(addr, subject);
- Event::run();
- }
- catch (const std::exception &e)
- {
- std::clog << LOG_ERR << e.what() << "." << std::endl;
- return 1;
- }
- }
- #include <cerrno>
- #include <cstring>
- #include <iostream>
- #include "airlink/event.h"
- #include "airlink/inet.h"
- #include "airlink/stream_socket.h"
- #include "airlink/messaging.h"
- #include "airlink/logging.h"
- using namespace Airlink;
- using Airlink::Net::StreamSocket;
- int main(int argc, char* argv[])
- {
- char const *prog = basename(argv[0]);
- if (argc < 2)
- {
- std::clog << LOG_ERR << "Usage: " << prog << " HOST:PORT" << std::endl;
- return 1;
- }
- StreamSocket* socket = new StreamSocket;
- socket->connect(IPv4SocketAddress(argv[1]));
- Messaging::container_name(prog);
- Messaging::Engine* engine = Messaging::create_engine(socket);
- Messaging::Link* link = Messaging::create_receiver(engine, "r", "", "");
- while(not link->incoming())
- Event::run(Airlink::Event::ONCE);
- Messaging::Message msg = link->receive_message(engine);
- std::cout << msg.get_string() << std::endl;
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement