Pastebin launched a little side project called VERYVIRAL.com, check it out ;-) Want more features on Pastebin? Sign Up, it's FREE!
Guest

Untitled

By: a guest on Mar 28th, 2010  |  syntax: C++  |  size: 13.50 KB  |  views: 58  |  expires: Never
download  |  raw  |  embed  |  report abuse  |  print
This paste has a previous version, view the difference. Text below is selected. Please press Ctrl+C to copy to your clipboard. (⌘+C on Mac)
  1. // lump/list.H
  2.  
  3. #ifndef INCLUDED_LUMP_LIST_H
  4. #define INCLUDED_LUMP_LIST_H
  5.  
  6. #include <cstddef>
  7. #include <bitset>
  8.  
  9. #include <boost/interprocess/offset_ptr.hpp>
  10. #include <boost/interprocess/managed_shared_memory.hpp>
  11. #include <boost/tuple/tuple.hpp>
  12. #include <boost/tuple/tuple_comparison.hpp>
  13.  
  14. #include <sys/time.h>
  15.  
  16. // The __sync functions are GCC atomic builtins. They're documented here: http://gcc.gnu.org/onlinedocs/gcc-4.1.2/gcc/Atomic-Builtins.html
  17. // method_one() and method_two() have commented out condition waits. You can try uncommenting them, they only make latency worse. Same for the condition
  18. // code in method_three(), it's only left in since that's how I was testing last.
  19.  
  20. #define FULL_MEMORY_BARRIER __sync_synchronize()
  21.  
  22. #ifndef MEMORY_BARRIER_DEBUG // If you set this and things start working, you have a memory barrier bug
  23. #define RELEASE_MEMORY_BARRIER_IMPL2(line) int release_barrier##line; __sync_lock_release(&release_barrier##line)
  24. #define RELEASE_MEMORY_BARRIER_IMPL(line) RELEASE_MEMORY_BARRIER_IMPL2(line)
  25. #define RELEASE_MEMORY_BARRIER RELEASE_MEMORY_BARRIER_IMPL(__LINE__)
  26. #else
  27. #define RELEASE_MEMORY_BARRIER FULL_MEMORY_BARRIER
  28. #endif
  29.  
  30. #ifndef MEMORY_BARRIER_DEBUG
  31. #define ACQUIRE_MEMORY_BARRIER_IMPL2(line) int acquire_barrier##line; __sync_lock_test_and_set(&acquire_barrier##line, 1)
  32. #define ACQUIRE_MEMORY_BARRIER_IMPL(line) ACQUIRE_MEMORY_BARRIER_IMPL2(line)
  33. #define ACQUIRE_MEMORY_BARRIER ACQUIRE_MEMORY_BARRIER_IMPL(__LINE__)
  34. #else
  35. #define ACQUIRE_MEMORY_BARRIER FULL_MEMORY_BARRIER
  36. #endif
  37.  
  38. // Meant to represent an offset like offset_ptr, but doesn't
  39. // do anything automagically. Nice if you want to have pointers
  40. // from one segment into another segment.
  41. template<class T>
  42. class diff_ptr
  43. {
  44. public:
  45.         inline diff_ptr() {}
  46.  
  47.         inline diff_ptr(diff_ptr const& other)
  48.                 : offset_(other.offset_)
  49.         {}
  50.        
  51.         inline diff_ptr(boost::interprocess::offset_ptr<T> const& init,
  52.                                         boost::interprocess::managed_shared_memory const& segment)
  53.         {
  54.                 set(init.get(), segment);
  55.         }
  56.  
  57.         inline diff_ptr(T* init,
  58.                                         boost::interprocess::managed_shared_memory const& segment)
  59.         {
  60.                 set(init, segment);
  61.         }
  62.  
  63.         inline void set(T* init,
  64.                                         boost::interprocess::managed_shared_memory const& segment)
  65.         {
  66.                 set_offset((char*)(init) - (char*)(segment.get_address()));
  67.         }
  68.  
  69.         inline void set(T volatile* init,
  70.                                         boost::interprocess::managed_shared_memory const& segment) volatile
  71.         {
  72.                 set_offset((char*)(init) - (char*)(segment.get_address()));
  73.         }
  74.  
  75.         inline void set_offset(ptrdiff_t offset) volatile
  76.         {
  77.                 offset_ = offset;
  78.         }
  79.  
  80.         inline T* get(boost::interprocess::managed_shared_memory const& segment) const volatile
  81.         {
  82.                 return reinterpret_cast<T*>((char*)(segment.get_address()) + offset_);
  83.         }
  84.        
  85. private:
  86.         std::ptrdiff_t offset_;
  87. };
  88.  
  89. enum NodeState {
  90.         HEAD,
  91.         NOT_FILLED_YET,
  92.         FILLED
  93. };
  94.  
  95. struct Node {
  96.         volatile NodeState state_;
  97.         volatile diff_ptr<Node> next_;
  98. };
  99.  
  100. struct UpdateID : public Node {
  101.         ::uint32_t list_id_;
  102. };
  103.  
  104. #define DUMMY_DATA_SIZE 512
  105.  
  106. struct Data : public Node {
  107.         volatile long tv_sec_;
  108.         volatile long tv_usec_;
  109.  
  110.         char dummy_data_[DUMMY_DATA_SIZE];
  111. };
  112.  
  113. struct Channel {
  114.         diff_ptr<Node> head_;
  115.         volatile diff_ptr<Node> tail_;
  116.         volatile std::size_t size_;
  117.         ::uint32_t id_;
  118. };
  119.  
  120. #endif
  121.  
  122.  
  123. // lump/child/main.C
  124.  
  125. #include <iostream>
  126. #include <vector>
  127. #include <string>
  128. #include <set>
  129.  
  130. #include <boost/thread.hpp>
  131. #include <boost/tokenizer.hpp>
  132.  
  133. #include <boost/interprocess/sync/scoped_lock.hpp>
  134. #include <boost/interprocess/sync/named_mutex.hpp>
  135. #include <boost/interprocess/sync/named_condition.hpp>
  136. #include <boost/interprocess/allocators/allocator.hpp>
  137. #include <boost/interprocess/containers/vector.hpp>
  138. #include <boost/interprocess/managed_shared_memory.hpp>
  139.  
  140. #include <lump/list.H>
  141.  
  142. using namespace boost;
  143. using namespace boost::interprocess;
  144. using namespace std;
  145.  
  146. managed_shared_memory segment(open_only, "SharedMemoryArea");
  147.  
  148. typedef boost::interprocess::allocator< diff_ptr<Channel>, managed_shared_memory::segment_manager> ShmemAllocator;
  149. typedef boost::interprocess::vector< diff_ptr<Channel>, ShmemAllocator > ChannelList;
  150.  
  151. ChannelList& channel_list = *(segment.find<ChannelList>("channel_list").first);
  152. Channel& update_channel = *(segment.find<Channel>("update_channel").first);
  153.  
  154. named_mutex update_mutex(open_only, "update_mutex");
  155. named_condition update_condition(open_only, "update_condition");
  156.  
  157. void log_latency(long send_tv_sec, long send_tv_usec)
  158. {
  159.         timeval arrive_timestamp;
  160.         gettimeofday(&arrive_timestamp, NULL);
  161.  
  162.         if(send_tv_sec == 0)
  163.                 std::cerr << "Send timestamp was 0... something is wrong." << std::endl; // never prints
  164.         if(arrive_timestamp.tv_sec == 0)
  165.                 std::cerr << "Arrive timestamp was 0... something is wrong." << std::endl; // never prints
  166.  
  167.         hrtime_t send_time = hrtime_t(send_tv_sec) * hrtime_t(1e9) + hrtime_t(send_tv_usec) * hrtime_t(1e3);
  168.         hrtime_t arrive_time = hrtime_t(arrive_timestamp.tv_sec) * hrtime_t(1e9) + hrtime_t(arrive_timestamp.tv_usec) * hrtime_t(1e3);
  169.         hrtime_t latency = arrive_time - send_time;
  170.  
  171.         std::cout << "Latency: " << double(latency) * 1.0e-6 << "ms " << double(latency) * 1.0e-3 << "us " << latency << "ns " << std::endl;
  172. }
  173.  
  174. void method_one()
  175. {
  176.         std::vector<Data*> channel_cursors;
  177.         for(ChannelList::iterator i = channel_list.begin(); i != channel_list.end(); ++i)
  178.         {
  179.                 Data* current_item = static_cast<Data*>(i->get(segment)->tail_.get(segment));
  180.  
  181.                 if(current_item->state_ == HEAD)
  182.                         current_item = static_cast<Data*>(current_item->next_.get(segment));
  183.                
  184.                 channel_cursors.push_back(current_item);
  185.         }
  186.        
  187.         while(true)
  188.         {
  189.                 for(std::size_t i = 0; i < channel_list.size(); ++i)
  190.                 {
  191.                         // {
  192.                         //      scoped_lock<named_mutex> update_lock(update_mutex);
  193.                         //      update_condition.wait(update_lock);
  194.                         // }
  195.                        
  196.                         Data* current_item = channel_cursors[i];
  197.  
  198.                         ACQUIRE_MEMORY_BARRIER;
  199.                         if(current_item->state_ == NOT_FILLED_YET) {
  200.                                 continue;
  201.                         }
  202.  
  203.                         log_latency(current_item->tv_sec_, current_item->tv_usec_);
  204.  
  205.                         channel_cursors[i] = static_cast<Data*>(current_item->next_.get(segment));
  206.                 }
  207.         }
  208. }
  209.  
  210. void method_two()
  211. {
  212.         std::vector<Data*> channel_cursors;
  213.         for(ChannelList::iterator i = channel_list.begin(); i != channel_list.end(); ++i)
  214.         {
  215.                 Data* current_item = static_cast<Data*>(i->get(segment)->tail_.get(segment));
  216.  
  217.                 if(current_item->state_ == HEAD)
  218.                         current_item = static_cast<Data*>(current_item->next_.get(segment));
  219.                
  220.                 channel_cursors.push_back(current_item);
  221.         }
  222.        
  223.         UpdateID* update_cursor = static_cast<UpdateID*>(update_channel.tail_.get(segment));
  224.  
  225.         while(true)
  226.         {
  227.                 // {
  228.                 //      scoped_lock<named_mutex> update_lock(update_mutex);
  229.                 //      update_condition.wait(update_lock);
  230.                 // }
  231.                
  232.                 if(update_cursor->state_ == NOT_FILLED_YET) {
  233.                         continue;
  234.                 }
  235.                
  236.                 ::uint32_t update_id = update_cursor->list_id_;
  237.  
  238.                 Data* current_item = channel_cursors[update_id];
  239.                
  240.                 if(current_item->state_ == NOT_FILLED_YET) {
  241.                         std::cerr << "This should never print." << std::endl; // it doesn't
  242.                         continue;
  243.                 }
  244.  
  245.                 log_latency(current_item->tv_sec_, current_item->tv_usec_);
  246.                
  247.                 channel_cursors[update_id] = static_cast<Data*>(current_item->next_.get(segment));
  248.                 update_cursor = static_cast<UpdateID*>(update_cursor->next_.get(segment));
  249.         }      
  250. }
  251.  
  252. void method_three()
  253. {
  254.         std::vector<Data*> channel_cursors;
  255.         for(ChannelList::iterator i = channel_list.begin(); i != channel_list.end(); ++i)
  256.         {
  257.                 Data* current_item = static_cast<Data*>(i->get(segment)->tail_.get(segment));
  258.  
  259.                 if(current_item->state_ == HEAD)
  260.                         current_item = static_cast<Data*>(current_item->next_.get(segment));
  261.                
  262.                 channel_cursors.push_back(current_item);
  263.         }
  264.  
  265.         UpdateID* update_cursor = static_cast<UpdateID*>(update_channel.tail_.get(segment));
  266.  
  267.         while(true)
  268.         {
  269.                 bool found_an_update = false;
  270.                 for(std::size_t i = 0; i < channel_list.size(); ++i)
  271.                 {
  272.                         std::size_t idx = i;
  273.                        
  274.                         ACQUIRE_MEMORY_BARRIER;
  275.                         if(update_cursor->state_ != NOT_FILLED_YET) {
  276.                                 //std::cerr << "Found via update" << std::endl;
  277.                                 i--;
  278.                                 idx = update_cursor->list_id_;
  279.                                 update_cursor = static_cast<UpdateID*>(update_cursor->next_.get(segment));
  280.                         }
  281.                        
  282.                         Data* current_item = channel_cursors[idx];
  283.  
  284.                         ACQUIRE_MEMORY_BARRIER;
  285.                         if(current_item->state_ == NOT_FILLED_YET) {
  286.                                 continue;
  287.                         }
  288.  
  289.                         found_an_update = true;
  290.  
  291.                         log_latency(current_item->tv_sec_, current_item->tv_usec_);
  292.                         channel_cursors[idx] = static_cast<Data*>(current_item->next_.get(segment));
  293.                 }
  294.  
  295.                 if(!found_an_update)
  296.                 {
  297.                         std::cerr << "Sleeping" << std::endl;
  298.                         scoped_lock<named_mutex> update_lock(update_mutex);
  299.                         update_condition.wait(update_lock);
  300.                 }
  301.         }
  302. }
  303.  
  304. int main(int argc, char** argv)
  305. {
  306.         using ::int32_t;
  307.         using ::uint32_t;
  308.  
  309.         if(argc == 1)
  310.         {
  311.                 std::cout << "Using 1st method" << std::endl;
  312.                 method_one();
  313.         }
  314.         else if(argc == 2)
  315.         {
  316.                 std::cout << "Using 2nd method" << std::endl;
  317.                 method_two();
  318.         }
  319.         else if(argc == 3)
  320.         {
  321.                 std::cout << "Using 3rd method" << std::endl;
  322.                 method_three();
  323.         }
  324.        
  325.         return 0;
  326. }
  327.  
  328.  
  329. // lump/server/main.C
  330.  
  331. #include <vector>
  332. #include <map>
  333. #include <cstring>
  334. #include <sstream>
  335. #include <iostream>
  336.  
  337. #include <boost/interprocess/sync/scoped_lock.hpp>
  338. #include <boost/interprocess/sync/named_mutex.hpp>
  339. #include <boost/interprocess/sync/named_condition.hpp>
  340. #include <boost/interprocess/allocators/allocator.hpp>
  341. #include <boost/interprocess/containers/vector.hpp>
  342. #include <boost/interprocess/managed_shared_memory.hpp>
  343.  
  344. #include <lump/list.H>
  345.  
  346. using namespace boost;
  347. using namespace boost::interprocess;
  348. using namespace std;
  349.  
  350. struct shm_remove
  351. {
  352.         shm_remove()
  353.         {
  354.                 shared_memory_object::remove("SharedMemoryArea");
  355.                 named_mutex::remove("update_mutex");
  356.                 named_condition::remove("update_condition");
  357.         }
  358.        
  359.         ~shm_remove()
  360.         {
  361.                 shared_memory_object::remove("SharedMemoryArea");
  362.                 named_mutex::remove("update_mutex");
  363.                 named_condition::remove("update_condition");
  364.         }
  365. } remover;
  366.  
  367. static int CACHE_SIZE = 512 * 1024 * 1024;
  368. managed_shared_memory segment(create_only, "SharedMemoryArea", CACHE_SIZE);
  369.  
  370. typedef boost::interprocess::allocator< diff_ptr<Channel>, managed_shared_memory::segment_manager> ShmemAllocator;
  371. typedef boost::interprocess::vector< diff_ptr<Channel>, ShmemAllocator > ChannelList;
  372. ShmemAllocator alloc_instance(segment.get_segment_manager());
  373.  
  374. ChannelList& channel_list = *(segment.construct<ChannelList>("channel_list")(alloc_instance));
  375. Channel& update_channel = *(segment.construct<Channel>("update_channel")());
  376.  
  377. named_mutex update_mutex(create_only, "update_mutex");
  378. named_condition update_condition(create_only, "update_condition");
  379.  
  380. timeval receive_timestamp;
  381.  
  382. static int NEXT_ID_TO_USE = 0;
  383. static int CHANNEL_TO_USE = 0;
  384.  
  385. template<class T>
  386. void init_channel(Channel& channel)
  387. {
  388.         T* new_head = static_cast<T*>(segment.allocate(sizeof(T)));
  389.         new_head->state_ = HEAD;
  390.  
  391.         T* new_empty = static_cast<T*>(segment.allocate(sizeof(T)));
  392.         new_empty->state_ = NOT_FILLED_YET;
  393.         new_head->next_.set(new_empty, segment);
  394.  
  395.         channel.id_ = NEXT_ID_TO_USE++;
  396.  
  397.         RELEASE_MEMORY_BARRIER;
  398.         channel.head_.set(new_head, segment);
  399.         channel.tail_.set(new_head, segment);
  400. }
  401.  
  402. void update(Channel& channel, char *new_data)
  403. {
  404.         gettimeofday(&receive_timestamp, NULL);
  405.                
  406.         Data* new_empty_node = static_cast<Data*>(segment.allocate(sizeof(Data)));
  407.         new_empty_node->state_ = NOT_FILLED_YET;
  408.  
  409.         Data* node_to_fill = static_cast<Data*>(channel.tail_.get(segment)->next_.get(segment));
  410.        
  411.         node_to_fill->next_.set(new_empty_node, segment);
  412.  
  413.         node_to_fill->tv_sec_ = receive_timestamp.tv_sec;
  414.         node_to_fill->tv_usec_ = receive_timestamp.tv_usec;
  415.  
  416.         strncpy(node_to_fill->dummy_data_, new_data, DUMMY_DATA_SIZE);
  417.  
  418.         __sync_fetch_and_add(&(channel.size_), 1);
  419.  
  420.         //
  421.         // Separate out UpdateID work
  422.         //
  423.  
  424.         UpdateID* new_empty_update = static_cast<UpdateID*>(segment.allocate(sizeof(UpdateID)));
  425.         new_empty_update->state_ = NOT_FILLED_YET;
  426.  
  427.         UpdateID* update_node_to_fill = static_cast<UpdateID*>(update_channel.tail_.get(segment)->next_.get(segment));
  428.        
  429.         update_node_to_fill->next_.set(new_empty_update, segment);
  430.  
  431.         update_node_to_fill->list_id_ = channel.id_;
  432.  
  433.         __sync_fetch_and_add(&(update_channel.size_), 1);
  434.  
  435.         RELEASE_MEMORY_BARRIER;
  436.         node_to_fill->state_ = FILLED;
  437.         RELEASE_MEMORY_BARRIER;
  438.         channel.tail_.set(node_to_fill, segment);
  439.  
  440.         RELEASE_MEMORY_BARRIER;
  441.         update_node_to_fill->state_ = FILLED;
  442.         RELEASE_MEMORY_BARRIER;
  443.         update_channel.tail_.set(update_node_to_fill, segment);
  444.  
  445.         //scoped_lock<named_mutex> update_lock(update_mutex);
  446.         update_condition.notify_all();
  447. }
  448.  
  449. int main(int argc, char** argv)
  450. {
  451.         for(int i = 0; i < 250000; ++i)
  452.         {
  453.                 Channel* data_channel = static_cast<Channel*>(segment.allocate(sizeof(Channel)));
  454.                 init_channel<Data>(*data_channel);
  455.                 diff_ptr<Channel> ptr(data_channel, segment);
  456.                 channel_list.push_back(ptr);
  457.         }
  458.  
  459.         init_channel<UpdateID>(update_channel);
  460.  
  461.         std::cout << "Cache Size: " << CACHE_SIZE << std::endl;
  462.  
  463.         srand ( time(NULL) );
  464.         while(true)
  465.         {
  466.                 char dummy_data_to_transmit[DUMMY_DATA_SIZE];
  467.                 for(int i = 0; i < DUMMY_DATA_SIZE - 1; ++i)
  468.                 {
  469.                         dummy_data_to_transmit[i] = rand() % (90 - 65) + 65;
  470.                 }
  471.                 dummy_data_to_transmit[DUMMY_DATA_SIZE - 1] = '\0';
  472.  
  473.                 //std::cout << "Update: " << dummy_data_to_transmit << std::endl;
  474.                 Channel& channel = *(channel_list[rand() % channel_list.size()].get(segment));
  475.                 update(channel, &(dummy_data_to_transmit[0]));
  476.  
  477.                 timespec interval;
  478.                 interval.tv_sec = 0;
  479.                 //interval.tv_sec = rand() % 2;
  480.                 //interval.tv_nsec = rand() % 5000;
  481.                 interval.tv_nsec = rand() % 5000000;
  482.                 nanosleep(&interval, NULL);
  483.         }
  484.  
  485.         return 0;
  486. }
clone this paste RAW Paste Data