View difference between Paste ID: 0kuzm3Uf and
SHOW:
|
|
- or go back to the newest paste.
| 1 | - | |
| 1 | + | // lump/list.H |
| 2 | ||
| 3 | #ifndef INCLUDED_LUMP_LIST_H | |
| 4 | #define INCLUDED_LUMP_LIST_H | |
| 5 | ||
| 6 | #include <cstddef> | |
| 7 | #include <bitset> | |
| 8 | ||
| 9 | #include <boost/interprocess/offset_ptr.hpp> | |
| 10 | #include <boost/interprocess/managed_shared_memory.hpp> | |
| 11 | #include <boost/tuple/tuple.hpp> | |
| 12 | #include <boost/tuple/tuple_comparison.hpp> | |
| 13 | ||
| 14 | #include <sys/time.h> | |
| 15 | ||
| 16 | // The __sync functions are GCC atomic builtins. They're documented here: http://gcc.gnu.org/onlinedocs/gcc-4.1.2/gcc/Atomic-Builtins.html | |
| 17 | // method_one() and method_two() have commented out condition waits. You can try uncommenting them, they only make latency worse. Same for the condition | |
| 18 | // code in method_three(), it's only left in since that's how I was testing last. | |
| 19 | ||
| 20 | #define FULL_MEMORY_BARRIER __sync_synchronize() | |
| 21 | ||
| 22 | #ifndef MEMORY_BARRIER_DEBUG // If you set this and things start working, you have a memory barrier bug | |
| 23 | #define RELEASE_MEMORY_BARRIER_IMPL2(line) int release_barrier##line; __sync_lock_release(&release_barrier##line) | |
| 24 | #define RELEASE_MEMORY_BARRIER_IMPL(line) RELEASE_MEMORY_BARRIER_IMPL2(line) | |
| 25 | #define RELEASE_MEMORY_BARRIER RELEASE_MEMORY_BARRIER_IMPL(__LINE__) | |
| 26 | #else | |
| 27 | #define RELEASE_MEMORY_BARRIER FULL_MEMORY_BARRIER | |
| 28 | #endif | |
| 29 | ||
| 30 | #ifndef MEMORY_BARRIER_DEBUG | |
| 31 | #define ACQUIRE_MEMORY_BARRIER_IMPL2(line) int acquire_barrier##line; __sync_lock_test_and_set(&acquire_barrier##line, 1) | |
| 32 | #define ACQUIRE_MEMORY_BARRIER_IMPL(line) ACQUIRE_MEMORY_BARRIER_IMPL2(line) | |
| 33 | #define ACQUIRE_MEMORY_BARRIER ACQUIRE_MEMORY_BARRIER_IMPL(__LINE__) | |
| 34 | #else | |
| 35 | #define ACQUIRE_MEMORY_BARRIER FULL_MEMORY_BARRIER | |
| 36 | #endif | |
| 37 | ||
| 38 | // Meant to represent an offset like offset_ptr, but doesn't | |
| 39 | // do anything automagically. Nice if you want to have pointers | |
| 40 | // from one segment into another segment. | |
| 41 | template<class T> | |
| 42 | class diff_ptr | |
| 43 | {
| |
| 44 | public: | |
| 45 | inline diff_ptr() {}
| |
| 46 | ||
| 47 | inline diff_ptr(diff_ptr const& other) | |
| 48 | : offset_(other.offset_) | |
| 49 | {}
| |
| 50 | ||
| 51 | inline diff_ptr(boost::interprocess::offset_ptr<T> const& init, | |
| 52 | boost::interprocess::managed_shared_memory const& segment) | |
| 53 | {
| |
| 54 | set(init.get(), segment); | |
| 55 | } | |
| 56 | ||
| 57 | inline diff_ptr(T* init, | |
| 58 | boost::interprocess::managed_shared_memory const& segment) | |
| 59 | {
| |
| 60 | set(init, segment); | |
| 61 | } | |
| 62 | ||
| 63 | inline void set(T* init, | |
| 64 | boost::interprocess::managed_shared_memory const& segment) | |
| 65 | {
| |
| 66 | set_offset((char*)(init) - (char*)(segment.get_address())); | |
| 67 | } | |
| 68 | ||
| 69 | inline void set(T volatile* init, | |
| 70 | boost::interprocess::managed_shared_memory const& segment) volatile | |
| 71 | {
| |
| 72 | set_offset((char*)(init) - (char*)(segment.get_address())); | |
| 73 | } | |
| 74 | ||
| 75 | inline void set_offset(ptrdiff_t offset) volatile | |
| 76 | {
| |
| 77 | offset_ = offset; | |
| 78 | } | |
| 79 | ||
| 80 | inline T* get(boost::interprocess::managed_shared_memory const& segment) const volatile | |
| 81 | {
| |
| 82 | return reinterpret_cast<T*>((char*)(segment.get_address()) + offset_); | |
| 83 | } | |
| 84 | ||
| 85 | private: | |
| 86 | std::ptrdiff_t offset_; | |
| 87 | }; | |
| 88 | ||
| 89 | enum NodeState {
| |
| 90 | HEAD, | |
| 91 | NOT_FILLED_YET, | |
| 92 | FILLED | |
| 93 | }; | |
| 94 | ||
| 95 | struct Node {
| |
| 96 | volatile NodeState state_; | |
| 97 | volatile diff_ptr<Node> next_; | |
| 98 | }; | |
| 99 | ||
| 100 | struct UpdateID : public Node {
| |
| 101 | ::uint32_t list_id_; | |
| 102 | }; | |
| 103 | ||
| 104 | #define DUMMY_DATA_SIZE 512 | |
| 105 | ||
| 106 | struct Data : public Node {
| |
| 107 | volatile long tv_sec_; | |
| 108 | volatile long tv_usec_; | |
| 109 | ||
| 110 | char dummy_data_[DUMMY_DATA_SIZE]; | |
| 111 | }; | |
| 112 | ||
| 113 | struct Channel {
| |
| 114 | diff_ptr<Node> head_; | |
| 115 | volatile diff_ptr<Node> tail_; | |
| 116 | volatile std::size_t size_; | |
| 117 | ::uint32_t id_; | |
| 118 | }; | |
| 119 | ||
| 120 | #endif | |
| 121 | ||
| 122 | ||
| 123 | // lump/child/main.C | |
| 124 | ||
| 125 | #include <iostream> | |
| 126 | #include <vector> | |
| 127 | #include <string> | |
| 128 | #include <set> | |
| 129 | ||
| 130 | #include <boost/thread.hpp> | |
| 131 | #include <boost/tokenizer.hpp> | |
| 132 | ||
| 133 | #include <boost/interprocess/sync/scoped_lock.hpp> | |
| 134 | #include <boost/interprocess/sync/named_mutex.hpp> | |
| 135 | #include <boost/interprocess/sync/named_condition.hpp> | |
| 136 | #include <boost/interprocess/allocators/allocator.hpp> | |
| 137 | #include <boost/interprocess/containers/vector.hpp> | |
| 138 | #include <boost/interprocess/managed_shared_memory.hpp> | |
| 139 | ||
| 140 | #include <lump/list.H> | |
| 141 | ||
| 142 | using namespace boost; | |
| 143 | using namespace boost::interprocess; | |
| 144 | using namespace std; | |
| 145 | ||
| 146 | managed_shared_memory segment(open_only, "SharedMemoryArea"); | |
| 147 | ||
| 148 | typedef boost::interprocess::allocator< diff_ptr<Channel>, managed_shared_memory::segment_manager> ShmemAllocator; | |
| 149 | typedef boost::interprocess::vector< diff_ptr<Channel>, ShmemAllocator > ChannelList; | |
| 150 | ||
| 151 | ChannelList& channel_list = *(segment.find<ChannelList>("channel_list").first);
| |
| 152 | Channel& update_channel = *(segment.find<Channel>("update_channel").first);
| |
| 153 | ||
| 154 | named_mutex update_mutex(open_only, "update_mutex"); | |
| 155 | named_condition update_condition(open_only, "update_condition"); | |
| 156 | ||
| 157 | void log_latency(long send_tv_sec, long send_tv_usec) | |
| 158 | {
| |
| 159 | timeval arrive_timestamp; | |
| 160 | gettimeofday(&arrive_timestamp, NULL); | |
| 161 | ||
| 162 | if(send_tv_sec == 0) | |
| 163 | std::cerr << "Send timestamp was 0... something is wrong." << std::endl; // never prints | |
| 164 | if(arrive_timestamp.tv_sec == 0) | |
| 165 | std::cerr << "Arrive timestamp was 0... something is wrong." << std::endl; // never prints | |
| 166 | ||
| 167 | hrtime_t send_time = hrtime_t(send_tv_sec) * hrtime_t(1e9) + hrtime_t(send_tv_usec) * hrtime_t(1e3); | |
| 168 | hrtime_t arrive_time = hrtime_t(arrive_timestamp.tv_sec) * hrtime_t(1e9) + hrtime_t(arrive_timestamp.tv_usec) * hrtime_t(1e3); | |
| 169 | hrtime_t latency = arrive_time - send_time; | |
| 170 | ||
| 171 | std::cout << "Latency: " << double(latency) * 1.0e-6 << "ms " << double(latency) * 1.0e-3 << "us " << latency << "ns " << std::endl; | |
| 172 | } | |
| 173 | ||
| 174 | void method_one() | |
| 175 | {
| |
| 176 | std::vector<Data*> channel_cursors; | |
| 177 | for(ChannelList::iterator i = channel_list.begin(); i != channel_list.end(); ++i) | |
| 178 | {
| |
| 179 | Data* current_item = static_cast<Data*>(i->get(segment)->tail_.get(segment)); | |
| 180 | ||
| 181 | if(current_item->state_ == HEAD) | |
| 182 | current_item = static_cast<Data*>(current_item->next_.get(segment)); | |
| 183 | ||
| 184 | channel_cursors.push_back(current_item); | |
| 185 | } | |
| 186 | ||
| 187 | while(true) | |
| 188 | {
| |
| 189 | for(std::size_t i = 0; i < channel_list.size(); ++i) | |
| 190 | {
| |
| 191 | // {
| |
| 192 | // scoped_lock<named_mutex> update_lock(update_mutex); | |
| 193 | // update_condition.wait(update_lock); | |
| 194 | // } | |
| 195 | ||
| 196 | Data* current_item = channel_cursors[i]; | |
| 197 | ||
| 198 | ACQUIRE_MEMORY_BARRIER; | |
| 199 | if(current_item->state_ == NOT_FILLED_YET) {
| |
| 200 | continue; | |
| 201 | } | |
| 202 | ||
| 203 | log_latency(current_item->tv_sec_, current_item->tv_usec_); | |
| 204 | ||
| 205 | channel_cursors[i] = static_cast<Data*>(current_item->next_.get(segment)); | |
| 206 | } | |
| 207 | } | |
| 208 | } | |
| 209 | ||
| 210 | void method_two() | |
| 211 | {
| |
| 212 | std::vector<Data*> channel_cursors; | |
| 213 | for(ChannelList::iterator i = channel_list.begin(); i != channel_list.end(); ++i) | |
| 214 | {
| |
| 215 | Data* current_item = static_cast<Data*>(i->get(segment)->tail_.get(segment)); | |
| 216 | ||
| 217 | if(current_item->state_ == HEAD) | |
| 218 | current_item = static_cast<Data*>(current_item->next_.get(segment)); | |
| 219 | ||
| 220 | channel_cursors.push_back(current_item); | |
| 221 | } | |
| 222 | ||
| 223 | UpdateID* update_cursor = static_cast<UpdateID*>(update_channel.tail_.get(segment)); | |
| 224 | ||
| 225 | while(true) | |
| 226 | {
| |
| 227 | // {
| |
| 228 | // scoped_lock<named_mutex> update_lock(update_mutex); | |
| 229 | // update_condition.wait(update_lock); | |
| 230 | // } | |
| 231 | ||
| 232 | if(update_cursor->state_ == NOT_FILLED_YET) {
| |
| 233 | continue; | |
| 234 | } | |
| 235 | ||
| 236 | ::uint32_t update_id = update_cursor->list_id_; | |
| 237 | ||
| 238 | Data* current_item = channel_cursors[update_id]; | |
| 239 | ||
| 240 | if(current_item->state_ == NOT_FILLED_YET) {
| |
| 241 | std::cerr << "This should never print." << std::endl; // it doesn't | |
| 242 | continue; | |
| 243 | } | |
| 244 | ||
| 245 | log_latency(current_item->tv_sec_, current_item->tv_usec_); | |
| 246 | ||
| 247 | channel_cursors[update_id] = static_cast<Data*>(current_item->next_.get(segment)); | |
| 248 | update_cursor = static_cast<UpdateID*>(update_cursor->next_.get(segment)); | |
| 249 | } | |
| 250 | } | |
| 251 | ||
| 252 | void method_three() | |
| 253 | {
| |
| 254 | std::vector<Data*> channel_cursors; | |
| 255 | for(ChannelList::iterator i = channel_list.begin(); i != channel_list.end(); ++i) | |
| 256 | {
| |
| 257 | Data* current_item = static_cast<Data*>(i->get(segment)->tail_.get(segment)); | |
| 258 | ||
| 259 | if(current_item->state_ == HEAD) | |
| 260 | current_item = static_cast<Data*>(current_item->next_.get(segment)); | |
| 261 | ||
| 262 | channel_cursors.push_back(current_item); | |
| 263 | } | |
| 264 | ||
| 265 | UpdateID* update_cursor = static_cast<UpdateID*>(update_channel.tail_.get(segment)); | |
| 266 | ||
| 267 | while(true) | |
| 268 | {
| |
| 269 | bool found_an_update = false; | |
| 270 | for(std::size_t i = 0; i < channel_list.size(); ++i) | |
| 271 | {
| |
| 272 | std::size_t idx = i; | |
| 273 | ||
| 274 | ACQUIRE_MEMORY_BARRIER; | |
| 275 | if(update_cursor->state_ != NOT_FILLED_YET) {
| |
| 276 | //std::cerr << "Found via update" << std::endl; | |
| 277 | i--; | |
| 278 | idx = update_cursor->list_id_; | |
| 279 | update_cursor = static_cast<UpdateID*>(update_cursor->next_.get(segment)); | |
| 280 | } | |
| 281 | ||
| 282 | Data* current_item = channel_cursors[idx]; | |
| 283 | ||
| 284 | ACQUIRE_MEMORY_BARRIER; | |
| 285 | if(current_item->state_ == NOT_FILLED_YET) {
| |
| 286 | continue; | |
| 287 | } | |
| 288 | ||
| 289 | found_an_update = true; | |
| 290 | ||
| 291 | log_latency(current_item->tv_sec_, current_item->tv_usec_); | |
| 292 | channel_cursors[idx] = static_cast<Data*>(current_item->next_.get(segment)); | |
| 293 | } | |
| 294 | ||
| 295 | if(!found_an_update) | |
| 296 | {
| |
| 297 | std::cerr << "Sleeping" << std::endl; | |
| 298 | scoped_lock<named_mutex> update_lock(update_mutex); | |
| 299 | update_condition.wait(update_lock); | |
| 300 | } | |
| 301 | } | |
| 302 | } | |
| 303 | ||
| 304 | int main(int argc, char** argv) | |
| 305 | {
| |
| 306 | using ::int32_t; | |
| 307 | using ::uint32_t; | |
| 308 | ||
| 309 | if(argc == 1) | |
| 310 | {
| |
| 311 | std::cout << "Using 1st method" << std::endl; | |
| 312 | method_one(); | |
| 313 | } | |
| 314 | else if(argc == 2) | |
| 315 | {
| |
| 316 | std::cout << "Using 2nd method" << std::endl; | |
| 317 | method_two(); | |
| 318 | } | |
| 319 | else if(argc == 3) | |
| 320 | {
| |
| 321 | std::cout << "Using 3rd method" << std::endl; | |
| 322 | method_three(); | |
| 323 | } | |
| 324 | ||
| 325 | return 0; | |
| 326 | } | |
| 327 | ||
| 328 | ||
| 329 | // lump/server/main.C | |
| 330 | ||
| 331 | #include <vector> | |
| 332 | #include <map> | |
| 333 | #include <cstring> | |
| 334 | #include <sstream> | |
| 335 | #include <iostream> | |
| 336 | ||
| 337 | #include <boost/interprocess/sync/scoped_lock.hpp> | |
| 338 | #include <boost/interprocess/sync/named_mutex.hpp> | |
| 339 | #include <boost/interprocess/sync/named_condition.hpp> | |
| 340 | #include <boost/interprocess/allocators/allocator.hpp> | |
| 341 | #include <boost/interprocess/containers/vector.hpp> | |
| 342 | #include <boost/interprocess/managed_shared_memory.hpp> | |
| 343 | ||
| 344 | #include <lump/list.H> | |
| 345 | ||
| 346 | using namespace boost; | |
| 347 | using namespace boost::interprocess; | |
| 348 | using namespace std; | |
| 349 | ||
| 350 | struct shm_remove | |
| 351 | {
| |
| 352 | shm_remove() | |
| 353 | {
| |
| 354 | shared_memory_object::remove("SharedMemoryArea");
| |
| 355 | named_mutex::remove("update_mutex");
| |
| 356 | named_condition::remove("update_condition");
| |
| 357 | } | |
| 358 | ||
| 359 | ~shm_remove() | |
| 360 | {
| |
| 361 | shared_memory_object::remove("SharedMemoryArea");
| |
| 362 | named_mutex::remove("update_mutex");
| |
| 363 | named_condition::remove("update_condition");
| |
| 364 | } | |
| 365 | } remover; | |
| 366 | ||
| 367 | static int CACHE_SIZE = 512 * 1024 * 1024; | |
| 368 | managed_shared_memory segment(create_only, "SharedMemoryArea", CACHE_SIZE); | |
| 369 | ||
| 370 | typedef boost::interprocess::allocator< diff_ptr<Channel>, managed_shared_memory::segment_manager> ShmemAllocator; | |
| 371 | typedef boost::interprocess::vector< diff_ptr<Channel>, ShmemAllocator > ChannelList; | |
| 372 | ShmemAllocator alloc_instance(segment.get_segment_manager()); | |
| 373 | ||
| 374 | ChannelList& channel_list = *(segment.construct<ChannelList>("channel_list")(alloc_instance));
| |
| 375 | Channel& update_channel = *(segment.construct<Channel>("update_channel")());
| |
| 376 | ||
| 377 | named_mutex update_mutex(create_only, "update_mutex"); | |
| 378 | named_condition update_condition(create_only, "update_condition"); | |
| 379 | ||
| 380 | timeval receive_timestamp; | |
| 381 | ||
| 382 | static int NEXT_ID_TO_USE = 0; | |
| 383 | static int CHANNEL_TO_USE = 0; | |
| 384 | ||
| 385 | template<class T> | |
| 386 | void init_channel(Channel& channel) | |
| 387 | {
| |
| 388 | T* new_head = static_cast<T*>(segment.allocate(sizeof(T))); | |
| 389 | new_head->state_ = HEAD; | |
| 390 | ||
| 391 | T* new_empty = static_cast<T*>(segment.allocate(sizeof(T))); | |
| 392 | new_empty->state_ = NOT_FILLED_YET; | |
| 393 | new_head->next_.set(new_empty, segment); | |
| 394 | ||
| 395 | channel.id_ = NEXT_ID_TO_USE++; | |
| 396 | ||
| 397 | RELEASE_MEMORY_BARRIER; | |
| 398 | channel.head_.set(new_head, segment); | |
| 399 | channel.tail_.set(new_head, segment); | |
| 400 | } | |
| 401 | ||
| 402 | void update(Channel& channel, char *new_data) | |
| 403 | {
| |
| 404 | gettimeofday(&receive_timestamp, NULL); | |
| 405 | ||
| 406 | Data* new_empty_node = static_cast<Data*>(segment.allocate(sizeof(Data))); | |
| 407 | new_empty_node->state_ = NOT_FILLED_YET; | |
| 408 | ||
| 409 | Data* node_to_fill = static_cast<Data*>(channel.tail_.get(segment)->next_.get(segment)); | |
| 410 | ||
| 411 | node_to_fill->next_.set(new_empty_node, segment); | |
| 412 | ||
| 413 | node_to_fill->tv_sec_ = receive_timestamp.tv_sec; | |
| 414 | node_to_fill->tv_usec_ = receive_timestamp.tv_usec; | |
| 415 | ||
| 416 | strncpy(node_to_fill->dummy_data_, new_data, DUMMY_DATA_SIZE); | |
| 417 | ||
| 418 | __sync_fetch_and_add(&(channel.size_), 1); | |
| 419 | ||
| 420 | // | |
| 421 | // Separate out UpdateID work | |
| 422 | // | |
| 423 | ||
| 424 | UpdateID* new_empty_update = static_cast<UpdateID*>(segment.allocate(sizeof(UpdateID))); | |
| 425 | new_empty_update->state_ = NOT_FILLED_YET; | |
| 426 | ||
| 427 | UpdateID* update_node_to_fill = static_cast<UpdateID*>(update_channel.tail_.get(segment)->next_.get(segment)); | |
| 428 | ||
| 429 | update_node_to_fill->next_.set(new_empty_update, segment); | |
| 430 | ||
| 431 | update_node_to_fill->list_id_ = channel.id_; | |
| 432 | ||
| 433 | __sync_fetch_and_add(&(update_channel.size_), 1); | |
| 434 | ||
| 435 | RELEASE_MEMORY_BARRIER; | |
| 436 | node_to_fill->state_ = FILLED; | |
| 437 | RELEASE_MEMORY_BARRIER; | |
| 438 | channel.tail_.set(node_to_fill, segment); | |
| 439 | ||
| 440 | RELEASE_MEMORY_BARRIER; | |
| 441 | update_node_to_fill->state_ = FILLED; | |
| 442 | RELEASE_MEMORY_BARRIER; | |
| 443 | update_channel.tail_.set(update_node_to_fill, segment); | |
| 444 | ||
| 445 | //scoped_lock<named_mutex> update_lock(update_mutex); | |
| 446 | update_condition.notify_all(); | |
| 447 | } | |
| 448 | ||
| 449 | int main(int argc, char** argv) | |
| 450 | {
| |
| 451 | for(int i = 0; i < 250000; ++i) | |
| 452 | {
| |
| 453 | Channel* data_channel = static_cast<Channel*>(segment.allocate(sizeof(Channel))); | |
| 454 | init_channel<Data>(*data_channel); | |
| 455 | diff_ptr<Channel> ptr(data_channel, segment); | |
| 456 | channel_list.push_back(ptr); | |
| 457 | } | |
| 458 | ||
| 459 | init_channel<UpdateID>(update_channel); | |
| 460 | ||
| 461 | std::cout << "Cache Size: " << CACHE_SIZE << std::endl; | |
| 462 | ||
| 463 | srand ( time(NULL) ); | |
| 464 | while(true) | |
| 465 | {
| |
| 466 | char dummy_data_to_transmit[DUMMY_DATA_SIZE]; | |
| 467 | for(int i = 0; i < DUMMY_DATA_SIZE - 1; ++i) | |
| 468 | {
| |
| 469 | dummy_data_to_transmit[i] = rand() % (90 - 65) + 65; | |
| 470 | } | |
| 471 | dummy_data_to_transmit[DUMMY_DATA_SIZE - 1] = '\0'; | |
| 472 | ||
| 473 | //std::cout << "Update: " << dummy_data_to_transmit << std::endl; | |
| 474 | Channel& channel = *(channel_list[rand() % channel_list.size()].get(segment)); | |
| 475 | update(channel, &(dummy_data_to_transmit[0])); | |
| 476 | ||
| 477 | timespec interval; | |
| 478 | interval.tv_sec = 0; | |
| 479 | //interval.tv_sec = rand() % 2; | |
| 480 | //interval.tv_nsec = rand() % 5000; | |
| 481 | interval.tv_nsec = rand() % 5000000; | |
| 482 | nanosleep(&interval, NULL); | |
| 483 | } | |
| 484 | ||
| 485 | return 0; | |
| 486 | } |