// lump/list.H #ifndef INCLUDED_LUMP_LIST_H #define INCLUDED_LUMP_LIST_H #include #include #include #include #include #include #include // The __sync functions are GCC atomic builtins. They're documented here: http://gcc.gnu.org/onlinedocs/gcc-4.1.2/gcc/Atomic-Builtins.html // method_one() and method_two() have commented out condition waits. You can try uncommenting them, they only make latency worse. Same for the condition // code in method_three(), it's only left in since that's how I was testing last. #define FULL_MEMORY_BARRIER __sync_synchronize() #ifndef MEMORY_BARRIER_DEBUG // If you set this and things start working, you have a memory barrier bug #define RELEASE_MEMORY_BARRIER_IMPL2(line) int release_barrier##line; __sync_lock_release(&release_barrier##line) #define RELEASE_MEMORY_BARRIER_IMPL(line) RELEASE_MEMORY_BARRIER_IMPL2(line) #define RELEASE_MEMORY_BARRIER RELEASE_MEMORY_BARRIER_IMPL(__LINE__) #else #define RELEASE_MEMORY_BARRIER FULL_MEMORY_BARRIER #endif #ifndef MEMORY_BARRIER_DEBUG #define ACQUIRE_MEMORY_BARRIER_IMPL2(line) int acquire_barrier##line; __sync_lock_test_and_set(&acquire_barrier##line, 1) #define ACQUIRE_MEMORY_BARRIER_IMPL(line) ACQUIRE_MEMORY_BARRIER_IMPL2(line) #define ACQUIRE_MEMORY_BARRIER ACQUIRE_MEMORY_BARRIER_IMPL(__LINE__) #else #define ACQUIRE_MEMORY_BARRIER FULL_MEMORY_BARRIER #endif // Meant to represent an offset like offset_ptr, but doesn't // do anything automagically. Nice if you want to have pointers // from one segment into another segment. template class diff_ptr { public: inline diff_ptr() {} inline diff_ptr(diff_ptr const& other) : offset_(other.offset_) {} inline diff_ptr(boost::interprocess::offset_ptr const& init, boost::interprocess::managed_shared_memory const& segment) { set(init.get(), segment); } inline diff_ptr(T* init, boost::interprocess::managed_shared_memory const& segment) { set(init, segment); } inline void set(T* init, boost::interprocess::managed_shared_memory const& segment) { set_offset((char*)(init) - (char*)(segment.get_address())); } inline void set(T volatile* init, boost::interprocess::managed_shared_memory const& segment) volatile { set_offset((char*)(init) - (char*)(segment.get_address())); } inline void set_offset(ptrdiff_t offset) volatile { offset_ = offset; } inline T* get(boost::interprocess::managed_shared_memory const& segment) const volatile { return reinterpret_cast((char*)(segment.get_address()) + offset_); } private: std::ptrdiff_t offset_; }; enum NodeState { HEAD, NOT_FILLED_YET, FILLED }; struct Node { volatile NodeState state_; volatile diff_ptr next_; }; struct UpdateID : public Node { ::uint32_t list_id_; }; #define DUMMY_DATA_SIZE 512 struct Data : public Node { volatile long tv_sec_; volatile long tv_usec_; char dummy_data_[DUMMY_DATA_SIZE]; }; struct Channel { diff_ptr head_; volatile diff_ptr tail_; volatile std::size_t size_; ::uint32_t id_; }; #endif // lump/child/main.C #include #include #include #include #include #include #include #include #include #include #include #include #include using namespace boost; using namespace boost::interprocess; using namespace std; managed_shared_memory segment(open_only, "SharedMemoryArea"); typedef boost::interprocess::allocator< diff_ptr, managed_shared_memory::segment_manager> ShmemAllocator; typedef boost::interprocess::vector< diff_ptr, ShmemAllocator > ChannelList; ChannelList& channel_list = *(segment.find("channel_list").first); Channel& update_channel = *(segment.find("update_channel").first); named_mutex update_mutex(open_only, "update_mutex"); named_condition update_condition(open_only, "update_condition"); void log_latency(long send_tv_sec, long send_tv_usec) { timeval arrive_timestamp; gettimeofday(&arrive_timestamp, NULL); if(send_tv_sec == 0) std::cerr << "Send timestamp was 0... something is wrong." << std::endl; // never prints if(arrive_timestamp.tv_sec == 0) std::cerr << "Arrive timestamp was 0... something is wrong." << std::endl; // never prints hrtime_t send_time = hrtime_t(send_tv_sec) * hrtime_t(1e9) + hrtime_t(send_tv_usec) * hrtime_t(1e3); hrtime_t arrive_time = hrtime_t(arrive_timestamp.tv_sec) * hrtime_t(1e9) + hrtime_t(arrive_timestamp.tv_usec) * hrtime_t(1e3); hrtime_t latency = arrive_time - send_time; std::cout << "Latency: " << double(latency) * 1.0e-6 << "ms " << double(latency) * 1.0e-3 << "us " << latency << "ns " << std::endl; } void method_one() { std::vector channel_cursors; for(ChannelList::iterator i = channel_list.begin(); i != channel_list.end(); ++i) { Data* current_item = static_cast(i->get(segment)->tail_.get(segment)); if(current_item->state_ == HEAD) current_item = static_cast(current_item->next_.get(segment)); channel_cursors.push_back(current_item); } while(true) { for(std::size_t i = 0; i < channel_list.size(); ++i) { // { // scoped_lock update_lock(update_mutex); // update_condition.wait(update_lock); // } Data* current_item = channel_cursors[i]; ACQUIRE_MEMORY_BARRIER; if(current_item->state_ == NOT_FILLED_YET) { continue; } log_latency(current_item->tv_sec_, current_item->tv_usec_); channel_cursors[i] = static_cast(current_item->next_.get(segment)); } } } void method_two() { std::vector channel_cursors; for(ChannelList::iterator i = channel_list.begin(); i != channel_list.end(); ++i) { Data* current_item = static_cast(i->get(segment)->tail_.get(segment)); if(current_item->state_ == HEAD) current_item = static_cast(current_item->next_.get(segment)); channel_cursors.push_back(current_item); } UpdateID* update_cursor = static_cast(update_channel.tail_.get(segment)); while(true) { // { // scoped_lock update_lock(update_mutex); // update_condition.wait(update_lock); // } if(update_cursor->state_ == NOT_FILLED_YET) { continue; } ::uint32_t update_id = update_cursor->list_id_; Data* current_item = channel_cursors[update_id]; if(current_item->state_ == NOT_FILLED_YET) { std::cerr << "This should never print." << std::endl; // it doesn't continue; } log_latency(current_item->tv_sec_, current_item->tv_usec_); channel_cursors[update_id] = static_cast(current_item->next_.get(segment)); update_cursor = static_cast(update_cursor->next_.get(segment)); } } void method_three() { std::vector channel_cursors; for(ChannelList::iterator i = channel_list.begin(); i != channel_list.end(); ++i) { Data* current_item = static_cast(i->get(segment)->tail_.get(segment)); if(current_item->state_ == HEAD) current_item = static_cast(current_item->next_.get(segment)); channel_cursors.push_back(current_item); } UpdateID* update_cursor = static_cast(update_channel.tail_.get(segment)); while(true) { bool found_an_update = false; for(std::size_t i = 0; i < channel_list.size(); ++i) { std::size_t idx = i; ACQUIRE_MEMORY_BARRIER; if(update_cursor->state_ != NOT_FILLED_YET) { //std::cerr << "Found via update" << std::endl; i--; idx = update_cursor->list_id_; update_cursor = static_cast(update_cursor->next_.get(segment)); } Data* current_item = channel_cursors[idx]; ACQUIRE_MEMORY_BARRIER; if(current_item->state_ == NOT_FILLED_YET) { continue; } found_an_update = true; log_latency(current_item->tv_sec_, current_item->tv_usec_); channel_cursors[idx] = static_cast(current_item->next_.get(segment)); } if(!found_an_update) { std::cerr << "Sleeping" << std::endl; scoped_lock update_lock(update_mutex); update_condition.wait(update_lock); } } } int main(int argc, char** argv) { using ::int32_t; using ::uint32_t; if(argc == 1) { std::cout << "Using 1st method" << std::endl; method_one(); } else if(argc == 2) { std::cout << "Using 2nd method" << std::endl; method_two(); } else if(argc == 3) { std::cout << "Using 3rd method" << std::endl; method_three(); } return 0; } // lump/server/main.C #include #include #include #include #include #include #include #include #include #include #include #include using namespace boost; using namespace boost::interprocess; using namespace std; struct shm_remove { shm_remove() { shared_memory_object::remove("SharedMemoryArea"); named_mutex::remove("update_mutex"); named_condition::remove("update_condition"); } ~shm_remove() { shared_memory_object::remove("SharedMemoryArea"); named_mutex::remove("update_mutex"); named_condition::remove("update_condition"); } } remover; static int CACHE_SIZE = 512 * 1024 * 1024; managed_shared_memory segment(create_only, "SharedMemoryArea", CACHE_SIZE); typedef boost::interprocess::allocator< diff_ptr, managed_shared_memory::segment_manager> ShmemAllocator; typedef boost::interprocess::vector< diff_ptr, ShmemAllocator > ChannelList; ShmemAllocator alloc_instance(segment.get_segment_manager()); ChannelList& channel_list = *(segment.construct("channel_list")(alloc_instance)); Channel& update_channel = *(segment.construct("update_channel")()); named_mutex update_mutex(create_only, "update_mutex"); named_condition update_condition(create_only, "update_condition"); timeval receive_timestamp; static int NEXT_ID_TO_USE = 0; static int CHANNEL_TO_USE = 0; template void init_channel(Channel& channel) { T* new_head = static_cast(segment.allocate(sizeof(T))); new_head->state_ = HEAD; T* new_empty = static_cast(segment.allocate(sizeof(T))); new_empty->state_ = NOT_FILLED_YET; new_head->next_.set(new_empty, segment); channel.id_ = NEXT_ID_TO_USE++; RELEASE_MEMORY_BARRIER; channel.head_.set(new_head, segment); channel.tail_.set(new_head, segment); } void update(Channel& channel, char *new_data) { gettimeofday(&receive_timestamp, NULL); Data* new_empty_node = static_cast(segment.allocate(sizeof(Data))); new_empty_node->state_ = NOT_FILLED_YET; Data* node_to_fill = static_cast(channel.tail_.get(segment)->next_.get(segment)); node_to_fill->next_.set(new_empty_node, segment); node_to_fill->tv_sec_ = receive_timestamp.tv_sec; node_to_fill->tv_usec_ = receive_timestamp.tv_usec; strncpy(node_to_fill->dummy_data_, new_data, DUMMY_DATA_SIZE); __sync_fetch_and_add(&(channel.size_), 1); // // Separate out UpdateID work // UpdateID* new_empty_update = static_cast(segment.allocate(sizeof(UpdateID))); new_empty_update->state_ = NOT_FILLED_YET; UpdateID* update_node_to_fill = static_cast(update_channel.tail_.get(segment)->next_.get(segment)); update_node_to_fill->next_.set(new_empty_update, segment); update_node_to_fill->list_id_ = channel.id_; __sync_fetch_and_add(&(update_channel.size_), 1); RELEASE_MEMORY_BARRIER; node_to_fill->state_ = FILLED; RELEASE_MEMORY_BARRIER; channel.tail_.set(node_to_fill, segment); RELEASE_MEMORY_BARRIER; update_node_to_fill->state_ = FILLED; RELEASE_MEMORY_BARRIER; update_channel.tail_.set(update_node_to_fill, segment); //scoped_lock update_lock(update_mutex); update_condition.notify_all(); } int main(int argc, char** argv) { for(int i = 0; i < 250000; ++i) { Channel* data_channel = static_cast(segment.allocate(sizeof(Channel))); init_channel(*data_channel); diff_ptr ptr(data_channel, segment); channel_list.push_back(ptr); } init_channel(update_channel); std::cout << "Cache Size: " << CACHE_SIZE << std::endl; srand ( time(NULL) ); while(true) { char dummy_data_to_transmit[DUMMY_DATA_SIZE]; for(int i = 0; i < DUMMY_DATA_SIZE - 1; ++i) { dummy_data_to_transmit[i] = rand() % (90 - 65) + 65; } dummy_data_to_transmit[DUMMY_DATA_SIZE - 1] = '\0'; //std::cout << "Update: " << dummy_data_to_transmit << std::endl; Channel& channel = *(channel_list[rand() % channel_list.size()].get(segment)); update(channel, &(dummy_data_to_transmit[0])); timespec interval; interval.tv_sec = 0; //interval.tv_sec = rand() % 2; //interval.tv_nsec = rand() % 5000; interval.tv_nsec = rand() % 5000000; nanosleep(&interval, NULL); } return 0; }