Advertisement
Guest User

Untitled

a guest
Mar 28th, 2010
389
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 13.50 KB | None | 0 0
  1. // lump/list.H
  2.  
  3. #ifndef INCLUDED_LUMP_LIST_H
  4. #define INCLUDED_LUMP_LIST_H
  5.  
  6. #include <cstddef>
  7. #include <bitset>
  8.  
  9. #include <boost/interprocess/offset_ptr.hpp>
  10. #include <boost/interprocess/managed_shared_memory.hpp>
  11. #include <boost/tuple/tuple.hpp>
  12. #include <boost/tuple/tuple_comparison.hpp>
  13.  
  14. #include <sys/time.h>
  15.  
  16. // The __sync functions are GCC atomic builtins. They're documented here: http://gcc.gnu.org/onlinedocs/gcc-4.1.2/gcc/Atomic-Builtins.html
  17. // method_one() and method_two() have commented out condition waits. You can try uncommenting them, they only make latency worse. Same for the condition
  18. // code in method_three(), it's only left in since that's how I was testing last.
  19.  
  20. #define FULL_MEMORY_BARRIER __sync_synchronize()
  21.  
  22. #ifndef MEMORY_BARRIER_DEBUG // If you set this and things start working, you have a memory barrier bug
  23. #define RELEASE_MEMORY_BARRIER_IMPL2(line) int release_barrier##line; __sync_lock_release(&release_barrier##line)
  24. #define RELEASE_MEMORY_BARRIER_IMPL(line) RELEASE_MEMORY_BARRIER_IMPL2(line)
  25. #define RELEASE_MEMORY_BARRIER RELEASE_MEMORY_BARRIER_IMPL(__LINE__)
  26. #else
  27. #define RELEASE_MEMORY_BARRIER FULL_MEMORY_BARRIER
  28. #endif
  29.  
  30. #ifndef MEMORY_BARRIER_DEBUG
  31. #define ACQUIRE_MEMORY_BARRIER_IMPL2(line) int acquire_barrier##line; __sync_lock_test_and_set(&acquire_barrier##line, 1)
  32. #define ACQUIRE_MEMORY_BARRIER_IMPL(line) ACQUIRE_MEMORY_BARRIER_IMPL2(line)
  33. #define ACQUIRE_MEMORY_BARRIER ACQUIRE_MEMORY_BARRIER_IMPL(__LINE__)
  34. #else
  35. #define ACQUIRE_MEMORY_BARRIER FULL_MEMORY_BARRIER
  36. #endif
  37.  
  38. // Meant to represent an offset like offset_ptr, but doesn't
  39. // do anything automagically. Nice if you want to have pointers
  40. // from one segment into another segment.
  41. template<class T>
  42. class diff_ptr
  43. {
  44. public:
  45.     inline diff_ptr() {}
  46.  
  47.     inline diff_ptr(diff_ptr const& other)
  48.         : offset_(other.offset_)
  49.     {}
  50.    
  51.     inline diff_ptr(boost::interprocess::offset_ptr<T> const& init,
  52.                     boost::interprocess::managed_shared_memory const& segment)
  53.     {
  54.         set(init.get(), segment);
  55.     }
  56.  
  57.     inline diff_ptr(T* init,
  58.                     boost::interprocess::managed_shared_memory const& segment)
  59.     {
  60.         set(init, segment);
  61.     }
  62.  
  63.     inline void set(T* init,
  64.                     boost::interprocess::managed_shared_memory const& segment)
  65.     {
  66.         set_offset((char*)(init) - (char*)(segment.get_address()));
  67.     }
  68.  
  69.     inline void set(T volatile* init,
  70.                     boost::interprocess::managed_shared_memory const& segment) volatile
  71.     {
  72.         set_offset((char*)(init) - (char*)(segment.get_address()));
  73.     }
  74.  
  75.     inline void set_offset(ptrdiff_t offset) volatile
  76.     {
  77.         offset_ = offset;
  78.     }
  79.  
  80.     inline T* get(boost::interprocess::managed_shared_memory const& segment) const volatile
  81.     {
  82.         return reinterpret_cast<T*>((char*)(segment.get_address()) + offset_);
  83.     }
  84.    
  85. private:
  86.     std::ptrdiff_t offset_;
  87. };
  88.  
  89. enum NodeState {
  90.     HEAD,
  91.     NOT_FILLED_YET,
  92.     FILLED
  93. };
  94.  
  95. struct Node {
  96.     volatile NodeState state_;
  97.     volatile diff_ptr<Node> next_;
  98. };
  99.  
  100. struct UpdateID : public Node {
  101.     ::uint32_t list_id_;
  102. };
  103.  
  104. #define DUMMY_DATA_SIZE 512
  105.  
  106. struct Data : public Node {
  107.     volatile long tv_sec_;
  108.     volatile long tv_usec_;
  109.  
  110.     char dummy_data_[DUMMY_DATA_SIZE];
  111. };
  112.  
  113. struct Channel {
  114.     diff_ptr<Node> head_;
  115.     volatile diff_ptr<Node> tail_;
  116.     volatile std::size_t size_;
  117.     ::uint32_t id_;
  118. };
  119.  
  120. #endif
  121.  
  122.  
  123. // lump/child/main.C
  124.  
  125. #include <iostream>
  126. #include <vector>
  127. #include <string>
  128. #include <set>
  129.  
  130. #include <boost/thread.hpp>
  131. #include <boost/tokenizer.hpp>
  132.  
  133. #include <boost/interprocess/sync/scoped_lock.hpp>
  134. #include <boost/interprocess/sync/named_mutex.hpp>
  135. #include <boost/interprocess/sync/named_condition.hpp>
  136. #include <boost/interprocess/allocators/allocator.hpp>
  137. #include <boost/interprocess/containers/vector.hpp>
  138. #include <boost/interprocess/managed_shared_memory.hpp>
  139.  
  140. #include <lump/list.H>
  141.  
  142. using namespace boost;
  143. using namespace boost::interprocess;
  144. using namespace std;
  145.  
  146. managed_shared_memory segment(open_only, "SharedMemoryArea");
  147.  
  148. typedef boost::interprocess::allocator< diff_ptr<Channel>, managed_shared_memory::segment_manager> ShmemAllocator;
  149. typedef boost::interprocess::vector< diff_ptr<Channel>, ShmemAllocator > ChannelList;
  150.  
  151. ChannelList& channel_list = *(segment.find<ChannelList>("channel_list").first);
  152. Channel& update_channel = *(segment.find<Channel>("update_channel").first);
  153.  
  154. named_mutex update_mutex(open_only, "update_mutex");
  155. named_condition update_condition(open_only, "update_condition");
  156.  
  157. void log_latency(long send_tv_sec, long send_tv_usec)
  158. {
  159.     timeval arrive_timestamp;
  160.     gettimeofday(&arrive_timestamp, NULL);
  161.  
  162.     if(send_tv_sec == 0)
  163.         std::cerr << "Send timestamp was 0... something is wrong." << std::endl; // never prints
  164.     if(arrive_timestamp.tv_sec == 0)
  165.         std::cerr << "Arrive timestamp was 0... something is wrong." << std::endl; // never prints
  166.  
  167.     hrtime_t send_time = hrtime_t(send_tv_sec) * hrtime_t(1e9) + hrtime_t(send_tv_usec) * hrtime_t(1e3);
  168.     hrtime_t arrive_time = hrtime_t(arrive_timestamp.tv_sec) * hrtime_t(1e9) + hrtime_t(arrive_timestamp.tv_usec) * hrtime_t(1e3);
  169.     hrtime_t latency = arrive_time - send_time;
  170.  
  171.     std::cout << "Latency: " << double(latency) * 1.0e-6 << "ms " << double(latency) * 1.0e-3 << "us " << latency << "ns " << std::endl;
  172. }
  173.  
  174. void method_one()
  175. {
  176.     std::vector<Data*> channel_cursors;
  177.     for(ChannelList::iterator i = channel_list.begin(); i != channel_list.end(); ++i)
  178.     {
  179.         Data* current_item = static_cast<Data*>(i->get(segment)->tail_.get(segment));
  180.  
  181.         if(current_item->state_ == HEAD)
  182.             current_item = static_cast<Data*>(current_item->next_.get(segment));
  183.        
  184.         channel_cursors.push_back(current_item);
  185.     }
  186.    
  187.     while(true)
  188.     {
  189.         for(std::size_t i = 0; i < channel_list.size(); ++i)
  190.         {
  191.             // {
  192.             //  scoped_lock<named_mutex> update_lock(update_mutex);
  193.             //  update_condition.wait(update_lock);
  194.             // }
  195.            
  196.             Data* current_item = channel_cursors[i];
  197.  
  198.             ACQUIRE_MEMORY_BARRIER;
  199.             if(current_item->state_ == NOT_FILLED_YET) {
  200.                 continue;
  201.             }
  202.  
  203.             log_latency(current_item->tv_sec_, current_item->tv_usec_);
  204.  
  205.             channel_cursors[i] = static_cast<Data*>(current_item->next_.get(segment));
  206.         }
  207.     }
  208. }
  209.  
  210. void method_two()
  211. {
  212.     std::vector<Data*> channel_cursors;
  213.     for(ChannelList::iterator i = channel_list.begin(); i != channel_list.end(); ++i)
  214.     {
  215.         Data* current_item = static_cast<Data*>(i->get(segment)->tail_.get(segment));
  216.  
  217.         if(current_item->state_ == HEAD)
  218.             current_item = static_cast<Data*>(current_item->next_.get(segment));
  219.        
  220.         channel_cursors.push_back(current_item);
  221.     }
  222.    
  223.     UpdateID* update_cursor = static_cast<UpdateID*>(update_channel.tail_.get(segment));
  224.  
  225.     while(true)
  226.     {
  227.         // {
  228.         //  scoped_lock<named_mutex> update_lock(update_mutex);
  229.         //  update_condition.wait(update_lock);
  230.         // }
  231.        
  232.         if(update_cursor->state_ == NOT_FILLED_YET) {
  233.             continue;
  234.         }
  235.        
  236.         ::uint32_t update_id = update_cursor->list_id_;
  237.  
  238.         Data* current_item = channel_cursors[update_id];
  239.        
  240.         if(current_item->state_ == NOT_FILLED_YET) {
  241.             std::cerr << "This should never print." << std::endl; // it doesn't
  242.             continue;
  243.         }
  244.  
  245.         log_latency(current_item->tv_sec_, current_item->tv_usec_);
  246.        
  247.         channel_cursors[update_id] = static_cast<Data*>(current_item->next_.get(segment));
  248.         update_cursor = static_cast<UpdateID*>(update_cursor->next_.get(segment));
  249.     }  
  250. }
  251.  
  252. void method_three()
  253. {
  254.     std::vector<Data*> channel_cursors;
  255.     for(ChannelList::iterator i = channel_list.begin(); i != channel_list.end(); ++i)
  256.     {
  257.         Data* current_item = static_cast<Data*>(i->get(segment)->tail_.get(segment));
  258.  
  259.         if(current_item->state_ == HEAD)
  260.             current_item = static_cast<Data*>(current_item->next_.get(segment));
  261.        
  262.         channel_cursors.push_back(current_item);
  263.     }
  264.  
  265.     UpdateID* update_cursor = static_cast<UpdateID*>(update_channel.tail_.get(segment));
  266.  
  267.     while(true)
  268.     {
  269.         bool found_an_update = false;
  270.         for(std::size_t i = 0; i < channel_list.size(); ++i)
  271.         {
  272.             std::size_t idx = i;
  273.            
  274.             ACQUIRE_MEMORY_BARRIER;
  275.             if(update_cursor->state_ != NOT_FILLED_YET) {
  276.                 //std::cerr << "Found via update" << std::endl;
  277.                 i--;
  278.                 idx = update_cursor->list_id_;
  279.                 update_cursor = static_cast<UpdateID*>(update_cursor->next_.get(segment));
  280.             }
  281.            
  282.             Data* current_item = channel_cursors[idx];
  283.  
  284.             ACQUIRE_MEMORY_BARRIER;
  285.             if(current_item->state_ == NOT_FILLED_YET) {
  286.                 continue;
  287.             }
  288.  
  289.             found_an_update = true;
  290.  
  291.             log_latency(current_item->tv_sec_, current_item->tv_usec_);
  292.             channel_cursors[idx] = static_cast<Data*>(current_item->next_.get(segment));
  293.         }
  294.  
  295.         if(!found_an_update)
  296.         {
  297.             std::cerr << "Sleeping" << std::endl;
  298.             scoped_lock<named_mutex> update_lock(update_mutex);
  299.             update_condition.wait(update_lock);
  300.         }
  301.     }
  302. }
  303.  
  304. int main(int argc, char** argv)
  305. {
  306.     using ::int32_t;
  307.     using ::uint32_t;
  308.  
  309.     if(argc == 1)
  310.     {
  311.         std::cout << "Using 1st method" << std::endl;
  312.         method_one();
  313.     }
  314.     else if(argc == 2)
  315.     {
  316.         std::cout << "Using 2nd method" << std::endl;
  317.         method_two();
  318.     }
  319.     else if(argc == 3)
  320.     {
  321.         std::cout << "Using 3rd method" << std::endl;
  322.         method_three();
  323.     }
  324.    
  325.     return 0;
  326. }
  327.  
  328.  
  329. // lump/server/main.C
  330.  
  331. #include <vector>
  332. #include <map>
  333. #include <cstring>
  334. #include <sstream>
  335. #include <iostream>
  336.  
  337. #include <boost/interprocess/sync/scoped_lock.hpp>
  338. #include <boost/interprocess/sync/named_mutex.hpp>
  339. #include <boost/interprocess/sync/named_condition.hpp>
  340. #include <boost/interprocess/allocators/allocator.hpp>
  341. #include <boost/interprocess/containers/vector.hpp>
  342. #include <boost/interprocess/managed_shared_memory.hpp>
  343.  
  344. #include <lump/list.H>
  345.  
  346. using namespace boost;
  347. using namespace boost::interprocess;
  348. using namespace std;
  349.  
  350. struct shm_remove
  351. {
  352.     shm_remove()
  353.     {
  354.         shared_memory_object::remove("SharedMemoryArea");
  355.         named_mutex::remove("update_mutex");
  356.         named_condition::remove("update_condition");
  357.     }
  358.    
  359.     ~shm_remove()
  360.     {
  361.         shared_memory_object::remove("SharedMemoryArea");
  362.         named_mutex::remove("update_mutex");
  363.         named_condition::remove("update_condition");
  364.     }
  365. } remover;
  366.  
  367. static int CACHE_SIZE = 512 * 1024 * 1024;
  368. managed_shared_memory segment(create_only, "SharedMemoryArea", CACHE_SIZE);
  369.  
  370. typedef boost::interprocess::allocator< diff_ptr<Channel>, managed_shared_memory::segment_manager> ShmemAllocator;
  371. typedef boost::interprocess::vector< diff_ptr<Channel>, ShmemAllocator > ChannelList;
  372. ShmemAllocator alloc_instance(segment.get_segment_manager());
  373.  
  374. ChannelList& channel_list = *(segment.construct<ChannelList>("channel_list")(alloc_instance));
  375. Channel& update_channel = *(segment.construct<Channel>("update_channel")());
  376.  
  377. named_mutex update_mutex(create_only, "update_mutex");
  378. named_condition update_condition(create_only, "update_condition");
  379.  
  380. timeval receive_timestamp;
  381.  
  382. static int NEXT_ID_TO_USE = 0;
  383. static int CHANNEL_TO_USE = 0;
  384.  
  385. template<class T>
  386. void init_channel(Channel& channel)
  387. {
  388.     T* new_head = static_cast<T*>(segment.allocate(sizeof(T)));
  389.     new_head->state_ = HEAD;
  390.  
  391.     T* new_empty = static_cast<T*>(segment.allocate(sizeof(T)));
  392.     new_empty->state_ = NOT_FILLED_YET;
  393.     new_head->next_.set(new_empty, segment);
  394.  
  395.     channel.id_ = NEXT_ID_TO_USE++;
  396.  
  397.     RELEASE_MEMORY_BARRIER;
  398.     channel.head_.set(new_head, segment);
  399.     channel.tail_.set(new_head, segment);
  400. }
  401.  
  402. void update(Channel& channel, char *new_data)
  403. {
  404.     gettimeofday(&receive_timestamp, NULL);
  405.        
  406.     Data* new_empty_node = static_cast<Data*>(segment.allocate(sizeof(Data)));
  407.     new_empty_node->state_ = NOT_FILLED_YET;
  408.  
  409.     Data* node_to_fill = static_cast<Data*>(channel.tail_.get(segment)->next_.get(segment));
  410.    
  411.     node_to_fill->next_.set(new_empty_node, segment);
  412.  
  413.     node_to_fill->tv_sec_ = receive_timestamp.tv_sec;
  414.     node_to_fill->tv_usec_ = receive_timestamp.tv_usec;
  415.  
  416.     strncpy(node_to_fill->dummy_data_, new_data, DUMMY_DATA_SIZE);
  417.  
  418.     __sync_fetch_and_add(&(channel.size_), 1);
  419.  
  420.     //
  421.     // Separate out UpdateID work
  422.     //
  423.  
  424.     UpdateID* new_empty_update = static_cast<UpdateID*>(segment.allocate(sizeof(UpdateID)));
  425.     new_empty_update->state_ = NOT_FILLED_YET;
  426.  
  427.     UpdateID* update_node_to_fill = static_cast<UpdateID*>(update_channel.tail_.get(segment)->next_.get(segment));
  428.    
  429.     update_node_to_fill->next_.set(new_empty_update, segment);
  430.  
  431.     update_node_to_fill->list_id_ = channel.id_;
  432.  
  433.     __sync_fetch_and_add(&(update_channel.size_), 1);
  434.  
  435.     RELEASE_MEMORY_BARRIER;
  436.     node_to_fill->state_ = FILLED;
  437.     RELEASE_MEMORY_BARRIER;
  438.     channel.tail_.set(node_to_fill, segment);
  439.  
  440.     RELEASE_MEMORY_BARRIER;
  441.     update_node_to_fill->state_ = FILLED;
  442.     RELEASE_MEMORY_BARRIER;
  443.     update_channel.tail_.set(update_node_to_fill, segment);
  444.  
  445.     //scoped_lock<named_mutex> update_lock(update_mutex);
  446.     update_condition.notify_all();
  447. }
  448.  
  449. int main(int argc, char** argv)
  450. {
  451.     for(int i = 0; i < 250000; ++i)
  452.     {
  453.         Channel* data_channel = static_cast<Channel*>(segment.allocate(sizeof(Channel)));
  454.         init_channel<Data>(*data_channel);
  455.         diff_ptr<Channel> ptr(data_channel, segment);
  456.         channel_list.push_back(ptr);
  457.     }
  458.  
  459.     init_channel<UpdateID>(update_channel);
  460.  
  461.     std::cout << "Cache Size: " << CACHE_SIZE << std::endl;
  462.  
  463.     srand ( time(NULL) );
  464.     while(true)
  465.     {
  466.         char dummy_data_to_transmit[DUMMY_DATA_SIZE];
  467.         for(int i = 0; i < DUMMY_DATA_SIZE - 1; ++i)
  468.         {
  469.             dummy_data_to_transmit[i] = rand() % (90 - 65) + 65;
  470.         }
  471.         dummy_data_to_transmit[DUMMY_DATA_SIZE - 1] = '\0';
  472.  
  473.         //std::cout << "Update: " << dummy_data_to_transmit << std::endl;
  474.         Channel& channel = *(channel_list[rand() % channel_list.size()].get(segment));
  475.         update(channel, &(dummy_data_to_transmit[0]));
  476.  
  477.         timespec interval;
  478.         interval.tv_sec = 0;
  479.         //interval.tv_sec = rand() % 2;
  480.         //interval.tv_nsec = rand() % 5000;
  481.         interval.tv_nsec = rand() % 5000000;
  482.         nanosleep(&interval, NULL);
  483.     }
  484.  
  485.     return 0;
  486. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement