Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- // lump/list.H
- #ifndef INCLUDED_LUMP_LIST_H
- #define INCLUDED_LUMP_LIST_H
- #include <cstddef>
- #include <bitset>
- #include <boost/interprocess/offset_ptr.hpp>
- #include <boost/interprocess/managed_shared_memory.hpp>
- #include <boost/tuple/tuple.hpp>
- #include <boost/tuple/tuple_comparison.hpp>
- #include <sys/time.h>
- // The __sync functions are GCC atomic builtins. They're documented here: http://gcc.gnu.org/onlinedocs/gcc-4.1.2/gcc/Atomic-Builtins.html
- // method_one() and method_two() have commented out condition waits. You can try uncommenting them, they only make latency worse. Same for the condition
- // code in method_three(), it's only left in since that's how I was testing last.
- #define FULL_MEMORY_BARRIER __sync_synchronize()
- #ifndef MEMORY_BARRIER_DEBUG // If you set this and things start working, you have a memory barrier bug
- #define RELEASE_MEMORY_BARRIER_IMPL2(line) int release_barrier##line; __sync_lock_release(&release_barrier##line)
- #define RELEASE_MEMORY_BARRIER_IMPL(line) RELEASE_MEMORY_BARRIER_IMPL2(line)
- #define RELEASE_MEMORY_BARRIER RELEASE_MEMORY_BARRIER_IMPL(__LINE__)
- #else
- #define RELEASE_MEMORY_BARRIER FULL_MEMORY_BARRIER
- #endif
- #ifndef MEMORY_BARRIER_DEBUG
- #define ACQUIRE_MEMORY_BARRIER_IMPL2(line) int acquire_barrier##line; __sync_lock_test_and_set(&acquire_barrier##line, 1)
- #define ACQUIRE_MEMORY_BARRIER_IMPL(line) ACQUIRE_MEMORY_BARRIER_IMPL2(line)
- #define ACQUIRE_MEMORY_BARRIER ACQUIRE_MEMORY_BARRIER_IMPL(__LINE__)
- #else
- #define ACQUIRE_MEMORY_BARRIER FULL_MEMORY_BARRIER
- #endif
- // Meant to represent an offset like offset_ptr, but doesn't
- // do anything automagically. Nice if you want to have pointers
- // from one segment into another segment.
- template<class T>
- class diff_ptr
- {
- public:
- inline diff_ptr() {}
- inline diff_ptr(diff_ptr const& other)
- : offset_(other.offset_)
- {}
- inline diff_ptr(boost::interprocess::offset_ptr<T> const& init,
- boost::interprocess::managed_shared_memory const& segment)
- {
- set(init.get(), segment);
- }
- inline diff_ptr(T* init,
- boost::interprocess::managed_shared_memory const& segment)
- {
- set(init, segment);
- }
- inline void set(T* init,
- boost::interprocess::managed_shared_memory const& segment)
- {
- set_offset((char*)(init) - (char*)(segment.get_address()));
- }
- inline void set(T volatile* init,
- boost::interprocess::managed_shared_memory const& segment) volatile
- {
- set_offset((char*)(init) - (char*)(segment.get_address()));
- }
- inline void set_offset(ptrdiff_t offset) volatile
- {
- offset_ = offset;
- }
- inline T* get(boost::interprocess::managed_shared_memory const& segment) const volatile
- {
- return reinterpret_cast<T*>((char*)(segment.get_address()) + offset_);
- }
- private:
- std::ptrdiff_t offset_;
- };
- enum NodeState {
- HEAD,
- NOT_FILLED_YET,
- FILLED
- };
- struct Node {
- volatile NodeState state_;
- volatile diff_ptr<Node> next_;
- };
- struct UpdateID : public Node {
- ::uint32_t list_id_;
- };
- #define DUMMY_DATA_SIZE 512
- struct Data : public Node {
- volatile long tv_sec_;
- volatile long tv_usec_;
- char dummy_data_[DUMMY_DATA_SIZE];
- };
- struct Channel {
- diff_ptr<Node> head_;
- volatile diff_ptr<Node> tail_;
- volatile std::size_t size_;
- ::uint32_t id_;
- };
- #endif
- // lump/child/main.C
- #include <iostream>
- #include <vector>
- #include <string>
- #include <set>
- #include <boost/thread.hpp>
- #include <boost/tokenizer.hpp>
- #include <boost/interprocess/sync/scoped_lock.hpp>
- #include <boost/interprocess/sync/named_mutex.hpp>
- #include <boost/interprocess/sync/named_condition.hpp>
- #include <boost/interprocess/allocators/allocator.hpp>
- #include <boost/interprocess/containers/vector.hpp>
- #include <boost/interprocess/managed_shared_memory.hpp>
- #include <lump/list.H>
- using namespace boost;
- using namespace boost::interprocess;
- using namespace std;
- managed_shared_memory segment(open_only, "SharedMemoryArea");
- typedef boost::interprocess::allocator< diff_ptr<Channel>, managed_shared_memory::segment_manager> ShmemAllocator;
- typedef boost::interprocess::vector< diff_ptr<Channel>, ShmemAllocator > ChannelList;
- ChannelList& channel_list = *(segment.find<ChannelList>("channel_list").first);
- Channel& update_channel = *(segment.find<Channel>("update_channel").first);
- named_mutex update_mutex(open_only, "update_mutex");
- named_condition update_condition(open_only, "update_condition");
- void log_latency(long send_tv_sec, long send_tv_usec)
- {
- timeval arrive_timestamp;
- gettimeofday(&arrive_timestamp, NULL);
- if(send_tv_sec == 0)
- std::cerr << "Send timestamp was 0... something is wrong." << std::endl; // never prints
- if(arrive_timestamp.tv_sec == 0)
- std::cerr << "Arrive timestamp was 0... something is wrong." << std::endl; // never prints
- hrtime_t send_time = hrtime_t(send_tv_sec) * hrtime_t(1e9) + hrtime_t(send_tv_usec) * hrtime_t(1e3);
- hrtime_t arrive_time = hrtime_t(arrive_timestamp.tv_sec) * hrtime_t(1e9) + hrtime_t(arrive_timestamp.tv_usec) * hrtime_t(1e3);
- hrtime_t latency = arrive_time - send_time;
- std::cout << "Latency: " << double(latency) * 1.0e-6 << "ms " << double(latency) * 1.0e-3 << "us " << latency << "ns " << std::endl;
- }
- void method_one()
- {
- std::vector<Data*> channel_cursors;
- for(ChannelList::iterator i = channel_list.begin(); i != channel_list.end(); ++i)
- {
- Data* current_item = static_cast<Data*>(i->get(segment)->tail_.get(segment));
- if(current_item->state_ == HEAD)
- current_item = static_cast<Data*>(current_item->next_.get(segment));
- channel_cursors.push_back(current_item);
- }
- while(true)
- {
- for(std::size_t i = 0; i < channel_list.size(); ++i)
- {
- // {
- // scoped_lock<named_mutex> update_lock(update_mutex);
- // update_condition.wait(update_lock);
- // }
- Data* current_item = channel_cursors[i];
- ACQUIRE_MEMORY_BARRIER;
- if(current_item->state_ == NOT_FILLED_YET) {
- continue;
- }
- log_latency(current_item->tv_sec_, current_item->tv_usec_);
- channel_cursors[i] = static_cast<Data*>(current_item->next_.get(segment));
- }
- }
- }
- void method_two()
- {
- std::vector<Data*> channel_cursors;
- for(ChannelList::iterator i = channel_list.begin(); i != channel_list.end(); ++i)
- {
- Data* current_item = static_cast<Data*>(i->get(segment)->tail_.get(segment));
- if(current_item->state_ == HEAD)
- current_item = static_cast<Data*>(current_item->next_.get(segment));
- channel_cursors.push_back(current_item);
- }
- UpdateID* update_cursor = static_cast<UpdateID*>(update_channel.tail_.get(segment));
- while(true)
- {
- // {
- // scoped_lock<named_mutex> update_lock(update_mutex);
- // update_condition.wait(update_lock);
- // }
- if(update_cursor->state_ == NOT_FILLED_YET) {
- continue;
- }
- ::uint32_t update_id = update_cursor->list_id_;
- Data* current_item = channel_cursors[update_id];
- if(current_item->state_ == NOT_FILLED_YET) {
- std::cerr << "This should never print." << std::endl; // it doesn't
- continue;
- }
- log_latency(current_item->tv_sec_, current_item->tv_usec_);
- channel_cursors[update_id] = static_cast<Data*>(current_item->next_.get(segment));
- update_cursor = static_cast<UpdateID*>(update_cursor->next_.get(segment));
- }
- }
- void method_three()
- {
- std::vector<Data*> channel_cursors;
- for(ChannelList::iterator i = channel_list.begin(); i != channel_list.end(); ++i)
- {
- Data* current_item = static_cast<Data*>(i->get(segment)->tail_.get(segment));
- if(current_item->state_ == HEAD)
- current_item = static_cast<Data*>(current_item->next_.get(segment));
- channel_cursors.push_back(current_item);
- }
- UpdateID* update_cursor = static_cast<UpdateID*>(update_channel.tail_.get(segment));
- while(true)
- {
- bool found_an_update = false;
- for(std::size_t i = 0; i < channel_list.size(); ++i)
- {
- std::size_t idx = i;
- ACQUIRE_MEMORY_BARRIER;
- if(update_cursor->state_ != NOT_FILLED_YET) {
- //std::cerr << "Found via update" << std::endl;
- i--;
- idx = update_cursor->list_id_;
- update_cursor = static_cast<UpdateID*>(update_cursor->next_.get(segment));
- }
- Data* current_item = channel_cursors[idx];
- ACQUIRE_MEMORY_BARRIER;
- if(current_item->state_ == NOT_FILLED_YET) {
- continue;
- }
- found_an_update = true;
- log_latency(current_item->tv_sec_, current_item->tv_usec_);
- channel_cursors[idx] = static_cast<Data*>(current_item->next_.get(segment));
- }
- if(!found_an_update)
- {
- std::cerr << "Sleeping" << std::endl;
- scoped_lock<named_mutex> update_lock(update_mutex);
- update_condition.wait(update_lock);
- }
- }
- }
- int main(int argc, char** argv)
- {
- using ::int32_t;
- using ::uint32_t;
- if(argc == 1)
- {
- std::cout << "Using 1st method" << std::endl;
- method_one();
- }
- else if(argc == 2)
- {
- std::cout << "Using 2nd method" << std::endl;
- method_two();
- }
- else if(argc == 3)
- {
- std::cout << "Using 3rd method" << std::endl;
- method_three();
- }
- return 0;
- }
- // lump/server/main.C
- #include <vector>
- #include <map>
- #include <cstring>
- #include <sstream>
- #include <iostream>
- #include <boost/interprocess/sync/scoped_lock.hpp>
- #include <boost/interprocess/sync/named_mutex.hpp>
- #include <boost/interprocess/sync/named_condition.hpp>
- #include <boost/interprocess/allocators/allocator.hpp>
- #include <boost/interprocess/containers/vector.hpp>
- #include <boost/interprocess/managed_shared_memory.hpp>
- #include <lump/list.H>
- using namespace boost;
- using namespace boost::interprocess;
- using namespace std;
- struct shm_remove
- {
- shm_remove()
- {
- shared_memory_object::remove("SharedMemoryArea");
- named_mutex::remove("update_mutex");
- named_condition::remove("update_condition");
- }
- ~shm_remove()
- {
- shared_memory_object::remove("SharedMemoryArea");
- named_mutex::remove("update_mutex");
- named_condition::remove("update_condition");
- }
- } remover;
- static int CACHE_SIZE = 512 * 1024 * 1024;
- managed_shared_memory segment(create_only, "SharedMemoryArea", CACHE_SIZE);
- typedef boost::interprocess::allocator< diff_ptr<Channel>, managed_shared_memory::segment_manager> ShmemAllocator;
- typedef boost::interprocess::vector< diff_ptr<Channel>, ShmemAllocator > ChannelList;
- ShmemAllocator alloc_instance(segment.get_segment_manager());
- ChannelList& channel_list = *(segment.construct<ChannelList>("channel_list")(alloc_instance));
- Channel& update_channel = *(segment.construct<Channel>("update_channel")());
- named_mutex update_mutex(create_only, "update_mutex");
- named_condition update_condition(create_only, "update_condition");
- timeval receive_timestamp;
- static int NEXT_ID_TO_USE = 0;
- static int CHANNEL_TO_USE = 0;
- template<class T>
- void init_channel(Channel& channel)
- {
- T* new_head = static_cast<T*>(segment.allocate(sizeof(T)));
- new_head->state_ = HEAD;
- T* new_empty = static_cast<T*>(segment.allocate(sizeof(T)));
- new_empty->state_ = NOT_FILLED_YET;
- new_head->next_.set(new_empty, segment);
- channel.id_ = NEXT_ID_TO_USE++;
- RELEASE_MEMORY_BARRIER;
- channel.head_.set(new_head, segment);
- channel.tail_.set(new_head, segment);
- }
- void update(Channel& channel, char *new_data)
- {
- gettimeofday(&receive_timestamp, NULL);
- Data* new_empty_node = static_cast<Data*>(segment.allocate(sizeof(Data)));
- new_empty_node->state_ = NOT_FILLED_YET;
- Data* node_to_fill = static_cast<Data*>(channel.tail_.get(segment)->next_.get(segment));
- node_to_fill->next_.set(new_empty_node, segment);
- node_to_fill->tv_sec_ = receive_timestamp.tv_sec;
- node_to_fill->tv_usec_ = receive_timestamp.tv_usec;
- strncpy(node_to_fill->dummy_data_, new_data, DUMMY_DATA_SIZE);
- __sync_fetch_and_add(&(channel.size_), 1);
- //
- // Separate out UpdateID work
- //
- UpdateID* new_empty_update = static_cast<UpdateID*>(segment.allocate(sizeof(UpdateID)));
- new_empty_update->state_ = NOT_FILLED_YET;
- UpdateID* update_node_to_fill = static_cast<UpdateID*>(update_channel.tail_.get(segment)->next_.get(segment));
- update_node_to_fill->next_.set(new_empty_update, segment);
- update_node_to_fill->list_id_ = channel.id_;
- __sync_fetch_and_add(&(update_channel.size_), 1);
- RELEASE_MEMORY_BARRIER;
- node_to_fill->state_ = FILLED;
- RELEASE_MEMORY_BARRIER;
- channel.tail_.set(node_to_fill, segment);
- RELEASE_MEMORY_BARRIER;
- update_node_to_fill->state_ = FILLED;
- RELEASE_MEMORY_BARRIER;
- update_channel.tail_.set(update_node_to_fill, segment);
- //scoped_lock<named_mutex> update_lock(update_mutex);
- update_condition.notify_all();
- }
- int main(int argc, char** argv)
- {
- for(int i = 0; i < 250000; ++i)
- {
- Channel* data_channel = static_cast<Channel*>(segment.allocate(sizeof(Channel)));
- init_channel<Data>(*data_channel);
- diff_ptr<Channel> ptr(data_channel, segment);
- channel_list.push_back(ptr);
- }
- init_channel<UpdateID>(update_channel);
- std::cout << "Cache Size: " << CACHE_SIZE << std::endl;
- srand ( time(NULL) );
- while(true)
- {
- char dummy_data_to_transmit[DUMMY_DATA_SIZE];
- for(int i = 0; i < DUMMY_DATA_SIZE - 1; ++i)
- {
- dummy_data_to_transmit[i] = rand() % (90 - 65) + 65;
- }
- dummy_data_to_transmit[DUMMY_DATA_SIZE - 1] = '\0';
- //std::cout << "Update: " << dummy_data_to_transmit << std::endl;
- Channel& channel = *(channel_list[rand() % channel_list.size()].get(segment));
- update(channel, &(dummy_data_to_transmit[0]));
- timespec interval;
- interval.tv_sec = 0;
- //interval.tv_sec = rand() % 2;
- //interval.tv_nsec = rand() % 5000;
- interval.tv_nsec = rand() % 5000000;
- nanosleep(&interval, NULL);
- }
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement