Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <iostream>
- #include <string>
- #include <sstream>
- #include <vector>
- #include <fstream>
- #include <chrono>
- #include <map> // map
- #include <functional> // bind, function
- #include <caf/all.hpp>
- namespace message
- {
- using namespace std;
- struct msg
- {
- virtual string type() const = 0;
- };
- struct greeting : msg
- {
- string message;
- unsigned seqNo;
- static const string type_code;
- string type() const override { return type_code; }
- };
- const string greeting::type_code = "greeting";
- template <class Inspector>
- typename Inspector::result_type inspect(Inspector& fn, greeting& g)
- {
- return fn(caf::meta::type_name(greeting::type_code.c_str()),g.message, g.seqNo);
- }
- struct salutation : msg
- {
- string message;
- unsigned seqNo;
- static const string type_code;
- string type() const override { return type_code; }
- };
- const string salutation::type_code = "salutation";
- template <class Inspector>
- typename Inspector::result_type inspect(Inspector& fn, salutation& s)
- {
- return fn(caf::meta::type_name(salutation::type_code.c_str()),s.message, s.seqNo);
- }
- }
- namespace
- {
- class msg_target : public caf::event_based_actor
- {
- typedef caf::event_based_actor base;
- unsigned _greetings;
- unsigned _salutations;
- public:
- msg_target(caf::actor_config& cfg)
- : base(cfg)
- {}
- protected:
- caf::behavior make_behavior() override
- {
- return caf::message_handler({
- [=](const message::greeting& g)
- {
- ++_greetings;
- //caf::aout(this) << g.message << std::endl;
- },
- [=](const message::salutation& s)
- {
- ++_salutations;
- //caf::aout(this) << s.message << std::endl;
- }
- });
- }
- };
- typedef std::vector<char> buffer;
- struct msg_serializer
- {
- msg_serializer(caf::actor_system& context)
- : _context(context)
- {}
- template <class T, class... Ts>
- buffer serialize(const T& x, const Ts&... xs)
- {
- buffer buf;
- caf::binary_serializer bs{_context, buf};
- bs(x, xs...);
- return buf;
- }
- template <class T, class... Ts>
- void deserialize(const buffer& buf, T& x, Ts&... xs)
- {
- caf::binary_deserializer bd{_context, buf};
- bd(x, xs...);
- }
- private:
- caf::actor_system& _context;
- msg_serializer(const msg_serializer&) = delete;
- msg_serializer& operator=(const msg_serializer&) = delete;
- };
- std::ostream& operator<<(std::ostream& os, const buffer& buf)
- {
- os << std::noskipws;
- os << buf.size();
- os.write((char*) &buf[0], buf.size());
- return os;
- }
- std::istream& operator>>(std::istream& is, buffer& buf)
- {
- is >> std::noskipws;
- buf.clear();
- buffer::size_type size;
- is >> size;
- buf.resize(size);
- is.read((char*) &buf[0], size);
- return is;
- }
- }
- void dump_buffer(const buffer& b)
- {
- using namespace std;
- cout << "BufferSize:"<<b.size()<<endl;
- for(char c : b)
- {
- cout<<c<<":"<<hex<<uppercase<<int(c)<<' ';
- }
- cout << dec << endl;
- }
- class msg_dispatcher
- {
- caf::scoped_actor& _sender;
- caf::actor& _target;
- msg_serializer _ser;
- public:
- msg_dispatcher(caf::scoped_actor& sender, caf::actor& target)
- : _sender(sender)
- , _target(target)
- , _ser(sender->system())
- {}
- void send_greeting(const buffer& msg_buf)
- {
- message::greeting msg;
- _ser.deserialize(msg_buf, msg);
- _sender->send(_target, std::move(msg));
- }
- void send_salutation(const buffer& msg_buf)
- {
- message::salutation msg;
- _ser.deserialize(msg_buf, msg);
- _sender->send(_target, std::move(msg));
- }
- };
- template <class StreamType>
- class stream_state
- {
- stream_state(const stream_state &rhs) = delete;
- stream_state& operator= (const stream_state& rhs) = delete;
- public:
- explicit stream_state(StreamType& s)
- : _s(s)
- , _flags(s.flags())
- , _width(s.width())
- {}
- ~stream_state()
- {
- _s.flags(_flags);
- _s.width(_width);
- }
- private:
- StreamType& _s;
- std::ios::fmtflags _flags;
- std::size_t _width;
- };
- typedef stream_state<std::ostream> ostream_state;
- typedef stream_state<std::istream> istream_state;
- void create_caf_stream(const char* filename, const unsigned max_msg, caf::actor_system& system)
- {
- using namespace std;
- ofstream ss(filename, ofstream::binary);
- ostream_state os_state(ss);
- msg_serializer ser{system};
- message::greeting g;
- message::salutation s;
- for (unsigned i = 0; i < (max_msg/2); ++i)
- {
- g.seqNo = i+100;
- g.message = "greeting " + to_string(i+100);
- unsigned x(1000);
- ss << ser.serialize(g.type(),x);
- ss << ser.serialize(g);
- s.seqNo = i+100;
- s.message = "salutation " + to_string(i+100);
- ss << ser.serialize(s.type());
- ss << ser.serialize(s);
- }
- }
- int main()
- {
- using namespace std;
- using namespace caf;
- using namespace message;
- actor_system_config cfg;
- actor_system system{cfg};
- const char* filename = "greetings.caf_stream";
- const unsigned max_msg = 10;
- create_caf_stream(filename, max_msg, system);
- {
- scoped_actor self{system};
- actor target = system.spawn<msg_target>();
- msg_serializer ser{system};
- greeting g;
- ifstream inbound(filename, ifstream::binary);
- istream_state is_state(inbound);
- buffer b(2048);
- auto start = std::chrono::high_resolution_clock::now();
- msg_dispatcher d(self, target);
- using std::placeholders::_1;
- map<string, function<void(const buffer&)>> operations;
- operations[message::greeting::type_code] = bind(&msg_dispatcher::send_greeting, ref(d), _1);
- operations[message::salutation::type_code] = bind(&msg_dispatcher::send_salutation, ref(d), _1);
- while (inbound >> b)
- {
- string type;
- ser.deserialize(b, type);
- inbound >> b;
- operations[type](b);
- }
- auto finish = std::chrono::high_resolution_clock::now();
- auto durationMs = std::chrono::duration_cast<std::chrono::milliseconds>(finish-start);
- cout << "Ingest+xmit: "<<durationMs.count() << "ms\n";
- }
- return EXIT_SUCCESS;
- }
Add Comment
Please, Sign In to add comment