Guest User

Untitled

a guest
May 25th, 2018
100
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.86 KB | None | 0 0
  1. #include <iostream>
  2. #include <string>
  3. #include <sstream>
  4. #include <vector>
  5. #include <fstream>
  6. #include <chrono>
  7. #include <map> // map
  8. #include <functional> // bind, function
  9.  
  10. #include <caf/all.hpp>
  11.  
  12. namespace message
  13. {
  14. using namespace std;
  15.  
  16. struct msg
  17. {
  18. virtual string type() const = 0;
  19. };
  20.  
  21. struct greeting : msg
  22. {
  23. string message;
  24. unsigned seqNo;
  25.  
  26. static const string type_code;
  27. string type() const override { return type_code; }
  28. };
  29.  
  30. const string greeting::type_code = "greeting";
  31.  
  32. template <class Inspector>
  33. typename Inspector::result_type inspect(Inspector& fn, greeting& g)
  34. {
  35. return fn(caf::meta::type_name(greeting::type_code.c_str()),g.message, g.seqNo);
  36. }
  37.  
  38. struct salutation : msg
  39. {
  40. string message;
  41. unsigned seqNo;
  42.  
  43. static const string type_code;
  44. string type() const override { return type_code; }
  45. };
  46.  
  47. const string salutation::type_code = "salutation";
  48.  
  49.  
  50. template <class Inspector>
  51. typename Inspector::result_type inspect(Inspector& fn, salutation& s)
  52. {
  53. return fn(caf::meta::type_name(salutation::type_code.c_str()),s.message, s.seqNo);
  54. }
  55. }
  56.  
  57. namespace
  58. {
  59. class msg_target : public caf::event_based_actor
  60. {
  61. typedef caf::event_based_actor base;
  62.  
  63. unsigned _greetings;
  64. unsigned _salutations;
  65. public:
  66. msg_target(caf::actor_config& cfg)
  67. : base(cfg)
  68. {}
  69. protected:
  70. caf::behavior make_behavior() override
  71. {
  72. return caf::message_handler({
  73. [=](const message::greeting& g)
  74. {
  75. ++_greetings;
  76. //caf::aout(this) << g.message << std::endl;
  77. },
  78. [=](const message::salutation& s)
  79. {
  80. ++_salutations;
  81. //caf::aout(this) << s.message << std::endl;
  82. }
  83. });
  84. }
  85. };
  86.  
  87. typedef std::vector<char> buffer;
  88.  
  89. struct msg_serializer
  90. {
  91. msg_serializer(caf::actor_system& context)
  92. : _context(context)
  93. {}
  94.  
  95. template <class T, class... Ts>
  96. buffer serialize(const T& x, const Ts&... xs)
  97. {
  98. buffer buf;
  99. caf::binary_serializer bs{_context, buf};
  100. bs(x, xs...);
  101. return buf;
  102. }
  103.  
  104. template <class T, class... Ts>
  105. void deserialize(const buffer& buf, T& x, Ts&... xs)
  106. {
  107. caf::binary_deserializer bd{_context, buf};
  108. bd(x, xs...);
  109. }
  110. private:
  111. caf::actor_system& _context;
  112.  
  113. msg_serializer(const msg_serializer&) = delete;
  114. msg_serializer& operator=(const msg_serializer&) = delete;
  115. };
  116.  
  117. std::ostream& operator<<(std::ostream& os, const buffer& buf)
  118. {
  119. os << std::noskipws;
  120. os << buf.size();
  121. os.write((char*) &buf[0], buf.size());
  122. return os;
  123. }
  124.  
  125. std::istream& operator>>(std::istream& is, buffer& buf)
  126. {
  127. is >> std::noskipws;
  128.  
  129. buf.clear();
  130. buffer::size_type size;
  131.  
  132. is >> size;
  133. buf.resize(size);
  134.  
  135. is.read((char*) &buf[0], size);
  136.  
  137. return is;
  138. }
  139. }
  140.  
  141. void dump_buffer(const buffer& b)
  142. {
  143. using namespace std;
  144.  
  145. cout << "BufferSize:"<<b.size()<<endl;
  146. for(char c : b)
  147. {
  148. cout<<c<<":"<<hex<<uppercase<<int(c)<<' ';
  149. }
  150. cout << dec << endl;
  151. }
  152.  
  153. class msg_dispatcher
  154. {
  155. caf::scoped_actor& _sender;
  156. caf::actor& _target;
  157. msg_serializer _ser;
  158.  
  159. public:
  160. msg_dispatcher(caf::scoped_actor& sender, caf::actor& target)
  161. : _sender(sender)
  162. , _target(target)
  163. , _ser(sender->system())
  164. {}
  165.  
  166. void send_greeting(const buffer& msg_buf)
  167. {
  168. message::greeting msg;
  169. _ser.deserialize(msg_buf, msg);
  170. _sender->send(_target, std::move(msg));
  171. }
  172.  
  173. void send_salutation(const buffer& msg_buf)
  174. {
  175. message::salutation msg;
  176. _ser.deserialize(msg_buf, msg);
  177. _sender->send(_target, std::move(msg));
  178. }
  179. };
  180.  
  181. template <class StreamType>
  182. class stream_state
  183. {
  184. stream_state(const stream_state &rhs) = delete;
  185. stream_state& operator= (const stream_state& rhs) = delete;
  186.  
  187. public:
  188. explicit stream_state(StreamType& s)
  189. : _s(s)
  190. , _flags(s.flags())
  191. , _width(s.width())
  192. {}
  193.  
  194. ~stream_state()
  195. {
  196. _s.flags(_flags);
  197. _s.width(_width);
  198. }
  199.  
  200. private:
  201. StreamType& _s;
  202. std::ios::fmtflags _flags;
  203. std::size_t _width;
  204. };
  205.  
  206. typedef stream_state<std::ostream> ostream_state;
  207. typedef stream_state<std::istream> istream_state;
  208.  
  209. void create_caf_stream(const char* filename, const unsigned max_msg, caf::actor_system& system)
  210. {
  211. using namespace std;
  212.  
  213. ofstream ss(filename, ofstream::binary);
  214. ostream_state os_state(ss);
  215.  
  216. msg_serializer ser{system};
  217.  
  218. message::greeting g;
  219. message::salutation s;
  220.  
  221. for (unsigned i = 0; i < (max_msg/2); ++i)
  222. {
  223. g.seqNo = i+100;
  224. g.message = "greeting " + to_string(i+100);
  225.  
  226. unsigned x(1000);
  227. ss << ser.serialize(g.type(),x);
  228. ss << ser.serialize(g);
  229.  
  230. s.seqNo = i+100;
  231. s.message = "salutation " + to_string(i+100);
  232.  
  233. ss << ser.serialize(s.type());
  234. ss << ser.serialize(s);
  235. }
  236. }
  237.  
  238. int main()
  239. {
  240. using namespace std;
  241. using namespace caf;
  242. using namespace message;
  243.  
  244. actor_system_config cfg;
  245. actor_system system{cfg};
  246.  
  247. const char* filename = "greetings.caf_stream";
  248. const unsigned max_msg = 10;
  249.  
  250. create_caf_stream(filename, max_msg, system);
  251.  
  252. {
  253. scoped_actor self{system};
  254. actor target = system.spawn<msg_target>();
  255.  
  256. msg_serializer ser{system};
  257.  
  258. greeting g;
  259. ifstream inbound(filename, ifstream::binary);
  260. istream_state is_state(inbound);
  261.  
  262. buffer b(2048);
  263. auto start = std::chrono::high_resolution_clock::now();
  264.  
  265. msg_dispatcher d(self, target);
  266.  
  267. using std::placeholders::_1;
  268.  
  269. map<string, function<void(const buffer&)>> operations;
  270. operations[message::greeting::type_code] = bind(&msg_dispatcher::send_greeting, ref(d), _1);
  271. operations[message::salutation::type_code] = bind(&msg_dispatcher::send_salutation, ref(d), _1);
  272.  
  273. while (inbound >> b)
  274. {
  275. string type;
  276. ser.deserialize(b, type);
  277.  
  278. inbound >> b;
  279. operations[type](b);
  280. }
  281. auto finish = std::chrono::high_resolution_clock::now();
  282.  
  283. auto durationMs = std::chrono::duration_cast<std::chrono::milliseconds>(finish-start);
  284.  
  285. cout << "Ingest+xmit: "<<durationMs.count() << "ms\n";
  286. }
  287.  
  288. return EXIT_SUCCESS;
  289. }
Add Comment
Please, Sign In to add comment