View difference between Paste ID: 0kuzm3Uf and
SHOW:
|
|
- or go back to the newest paste.
1 | - | |
1 | + | // lump/list.H |
2 | ||
3 | #ifndef INCLUDED_LUMP_LIST_H | |
4 | #define INCLUDED_LUMP_LIST_H | |
5 | ||
6 | #include <cstddef> | |
7 | #include <bitset> | |
8 | ||
9 | #include <boost/interprocess/offset_ptr.hpp> | |
10 | #include <boost/interprocess/managed_shared_memory.hpp> | |
11 | #include <boost/tuple/tuple.hpp> | |
12 | #include <boost/tuple/tuple_comparison.hpp> | |
13 | ||
14 | #include <sys/time.h> | |
15 | ||
16 | // The __sync functions are GCC atomic builtins. They're documented here: http://gcc.gnu.org/onlinedocs/gcc-4.1.2/gcc/Atomic-Builtins.html | |
17 | // method_one() and method_two() have commented out condition waits. You can try uncommenting them, they only make latency worse. Same for the condition | |
18 | // code in method_three(), it's only left in since that's how I was testing last. | |
19 | ||
20 | #define FULL_MEMORY_BARRIER __sync_synchronize() | |
21 | ||
22 | #ifndef MEMORY_BARRIER_DEBUG // If you set this and things start working, you have a memory barrier bug | |
23 | #define RELEASE_MEMORY_BARRIER_IMPL2(line) int release_barrier##line; __sync_lock_release(&release_barrier##line) | |
24 | #define RELEASE_MEMORY_BARRIER_IMPL(line) RELEASE_MEMORY_BARRIER_IMPL2(line) | |
25 | #define RELEASE_MEMORY_BARRIER RELEASE_MEMORY_BARRIER_IMPL(__LINE__) | |
26 | #else | |
27 | #define RELEASE_MEMORY_BARRIER FULL_MEMORY_BARRIER | |
28 | #endif | |
29 | ||
30 | #ifndef MEMORY_BARRIER_DEBUG | |
31 | #define ACQUIRE_MEMORY_BARRIER_IMPL2(line) int acquire_barrier##line; __sync_lock_test_and_set(&acquire_barrier##line, 1) | |
32 | #define ACQUIRE_MEMORY_BARRIER_IMPL(line) ACQUIRE_MEMORY_BARRIER_IMPL2(line) | |
33 | #define ACQUIRE_MEMORY_BARRIER ACQUIRE_MEMORY_BARRIER_IMPL(__LINE__) | |
34 | #else | |
35 | #define ACQUIRE_MEMORY_BARRIER FULL_MEMORY_BARRIER | |
36 | #endif | |
37 | ||
38 | // Meant to represent an offset like offset_ptr, but doesn't | |
39 | // do anything automagically. Nice if you want to have pointers | |
40 | // from one segment into another segment. | |
41 | template<class T> | |
42 | class diff_ptr | |
43 | { | |
44 | public: | |
45 | inline diff_ptr() {} | |
46 | ||
47 | inline diff_ptr(diff_ptr const& other) | |
48 | : offset_(other.offset_) | |
49 | {} | |
50 | ||
51 | inline diff_ptr(boost::interprocess::offset_ptr<T> const& init, | |
52 | boost::interprocess::managed_shared_memory const& segment) | |
53 | { | |
54 | set(init.get(), segment); | |
55 | } | |
56 | ||
57 | inline diff_ptr(T* init, | |
58 | boost::interprocess::managed_shared_memory const& segment) | |
59 | { | |
60 | set(init, segment); | |
61 | } | |
62 | ||
63 | inline void set(T* init, | |
64 | boost::interprocess::managed_shared_memory const& segment) | |
65 | { | |
66 | set_offset((char*)(init) - (char*)(segment.get_address())); | |
67 | } | |
68 | ||
69 | inline void set(T volatile* init, | |
70 | boost::interprocess::managed_shared_memory const& segment) volatile | |
71 | { | |
72 | set_offset((char*)(init) - (char*)(segment.get_address())); | |
73 | } | |
74 | ||
75 | inline void set_offset(ptrdiff_t offset) volatile | |
76 | { | |
77 | offset_ = offset; | |
78 | } | |
79 | ||
80 | inline T* get(boost::interprocess::managed_shared_memory const& segment) const volatile | |
81 | { | |
82 | return reinterpret_cast<T*>((char*)(segment.get_address()) + offset_); | |
83 | } | |
84 | ||
85 | private: | |
86 | std::ptrdiff_t offset_; | |
87 | }; | |
88 | ||
89 | enum NodeState { | |
90 | HEAD, | |
91 | NOT_FILLED_YET, | |
92 | FILLED | |
93 | }; | |
94 | ||
95 | struct Node { | |
96 | volatile NodeState state_; | |
97 | volatile diff_ptr<Node> next_; | |
98 | }; | |
99 | ||
100 | struct UpdateID : public Node { | |
101 | ::uint32_t list_id_; | |
102 | }; | |
103 | ||
104 | #define DUMMY_DATA_SIZE 512 | |
105 | ||
106 | struct Data : public Node { | |
107 | volatile long tv_sec_; | |
108 | volatile long tv_usec_; | |
109 | ||
110 | char dummy_data_[DUMMY_DATA_SIZE]; | |
111 | }; | |
112 | ||
113 | struct Channel { | |
114 | diff_ptr<Node> head_; | |
115 | volatile diff_ptr<Node> tail_; | |
116 | volatile std::size_t size_; | |
117 | ::uint32_t id_; | |
118 | }; | |
119 | ||
120 | #endif | |
121 | ||
122 | ||
123 | // lump/child/main.C | |
124 | ||
125 | #include <iostream> | |
126 | #include <vector> | |
127 | #include <string> | |
128 | #include <set> | |
129 | ||
130 | #include <boost/thread.hpp> | |
131 | #include <boost/tokenizer.hpp> | |
132 | ||
133 | #include <boost/interprocess/sync/scoped_lock.hpp> | |
134 | #include <boost/interprocess/sync/named_mutex.hpp> | |
135 | #include <boost/interprocess/sync/named_condition.hpp> | |
136 | #include <boost/interprocess/allocators/allocator.hpp> | |
137 | #include <boost/interprocess/containers/vector.hpp> | |
138 | #include <boost/interprocess/managed_shared_memory.hpp> | |
139 | ||
140 | #include <lump/list.H> | |
141 | ||
142 | using namespace boost; | |
143 | using namespace boost::interprocess; | |
144 | using namespace std; | |
145 | ||
146 | managed_shared_memory segment(open_only, "SharedMemoryArea"); | |
147 | ||
148 | typedef boost::interprocess::allocator< diff_ptr<Channel>, managed_shared_memory::segment_manager> ShmemAllocator; | |
149 | typedef boost::interprocess::vector< diff_ptr<Channel>, ShmemAllocator > ChannelList; | |
150 | ||
151 | ChannelList& channel_list = *(segment.find<ChannelList>("channel_list").first); | |
152 | Channel& update_channel = *(segment.find<Channel>("update_channel").first); | |
153 | ||
154 | named_mutex update_mutex(open_only, "update_mutex"); | |
155 | named_condition update_condition(open_only, "update_condition"); | |
156 | ||
157 | void log_latency(long send_tv_sec, long send_tv_usec) | |
158 | { | |
159 | timeval arrive_timestamp; | |
160 | gettimeofday(&arrive_timestamp, NULL); | |
161 | ||
162 | if(send_tv_sec == 0) | |
163 | std::cerr << "Send timestamp was 0... something is wrong." << std::endl; // never prints | |
164 | if(arrive_timestamp.tv_sec == 0) | |
165 | std::cerr << "Arrive timestamp was 0... something is wrong." << std::endl; // never prints | |
166 | ||
167 | hrtime_t send_time = hrtime_t(send_tv_sec) * hrtime_t(1e9) + hrtime_t(send_tv_usec) * hrtime_t(1e3); | |
168 | hrtime_t arrive_time = hrtime_t(arrive_timestamp.tv_sec) * hrtime_t(1e9) + hrtime_t(arrive_timestamp.tv_usec) * hrtime_t(1e3); | |
169 | hrtime_t latency = arrive_time - send_time; | |
170 | ||
171 | std::cout << "Latency: " << double(latency) * 1.0e-6 << "ms " << double(latency) * 1.0e-3 << "us " << latency << "ns " << std::endl; | |
172 | } | |
173 | ||
174 | void method_one() | |
175 | { | |
176 | std::vector<Data*> channel_cursors; | |
177 | for(ChannelList::iterator i = channel_list.begin(); i != channel_list.end(); ++i) | |
178 | { | |
179 | Data* current_item = static_cast<Data*>(i->get(segment)->tail_.get(segment)); | |
180 | ||
181 | if(current_item->state_ == HEAD) | |
182 | current_item = static_cast<Data*>(current_item->next_.get(segment)); | |
183 | ||
184 | channel_cursors.push_back(current_item); | |
185 | } | |
186 | ||
187 | while(true) | |
188 | { | |
189 | for(std::size_t i = 0; i < channel_list.size(); ++i) | |
190 | { | |
191 | // { | |
192 | // scoped_lock<named_mutex> update_lock(update_mutex); | |
193 | // update_condition.wait(update_lock); | |
194 | // } | |
195 | ||
196 | Data* current_item = channel_cursors[i]; | |
197 | ||
198 | ACQUIRE_MEMORY_BARRIER; | |
199 | if(current_item->state_ == NOT_FILLED_YET) { | |
200 | continue; | |
201 | } | |
202 | ||
203 | log_latency(current_item->tv_sec_, current_item->tv_usec_); | |
204 | ||
205 | channel_cursors[i] = static_cast<Data*>(current_item->next_.get(segment)); | |
206 | } | |
207 | } | |
208 | } | |
209 | ||
210 | void method_two() | |
211 | { | |
212 | std::vector<Data*> channel_cursors; | |
213 | for(ChannelList::iterator i = channel_list.begin(); i != channel_list.end(); ++i) | |
214 | { | |
215 | Data* current_item = static_cast<Data*>(i->get(segment)->tail_.get(segment)); | |
216 | ||
217 | if(current_item->state_ == HEAD) | |
218 | current_item = static_cast<Data*>(current_item->next_.get(segment)); | |
219 | ||
220 | channel_cursors.push_back(current_item); | |
221 | } | |
222 | ||
223 | UpdateID* update_cursor = static_cast<UpdateID*>(update_channel.tail_.get(segment)); | |
224 | ||
225 | while(true) | |
226 | { | |
227 | // { | |
228 | // scoped_lock<named_mutex> update_lock(update_mutex); | |
229 | // update_condition.wait(update_lock); | |
230 | // } | |
231 | ||
232 | if(update_cursor->state_ == NOT_FILLED_YET) { | |
233 | continue; | |
234 | } | |
235 | ||
236 | ::uint32_t update_id = update_cursor->list_id_; | |
237 | ||
238 | Data* current_item = channel_cursors[update_id]; | |
239 | ||
240 | if(current_item->state_ == NOT_FILLED_YET) { | |
241 | std::cerr << "This should never print." << std::endl; // it doesn't | |
242 | continue; | |
243 | } | |
244 | ||
245 | log_latency(current_item->tv_sec_, current_item->tv_usec_); | |
246 | ||
247 | channel_cursors[update_id] = static_cast<Data*>(current_item->next_.get(segment)); | |
248 | update_cursor = static_cast<UpdateID*>(update_cursor->next_.get(segment)); | |
249 | } | |
250 | } | |
251 | ||
252 | void method_three() | |
253 | { | |
254 | std::vector<Data*> channel_cursors; | |
255 | for(ChannelList::iterator i = channel_list.begin(); i != channel_list.end(); ++i) | |
256 | { | |
257 | Data* current_item = static_cast<Data*>(i->get(segment)->tail_.get(segment)); | |
258 | ||
259 | if(current_item->state_ == HEAD) | |
260 | current_item = static_cast<Data*>(current_item->next_.get(segment)); | |
261 | ||
262 | channel_cursors.push_back(current_item); | |
263 | } | |
264 | ||
265 | UpdateID* update_cursor = static_cast<UpdateID*>(update_channel.tail_.get(segment)); | |
266 | ||
267 | while(true) | |
268 | { | |
269 | bool found_an_update = false; | |
270 | for(std::size_t i = 0; i < channel_list.size(); ++i) | |
271 | { | |
272 | std::size_t idx = i; | |
273 | ||
274 | ACQUIRE_MEMORY_BARRIER; | |
275 | if(update_cursor->state_ != NOT_FILLED_YET) { | |
276 | //std::cerr << "Found via update" << std::endl; | |
277 | i--; | |
278 | idx = update_cursor->list_id_; | |
279 | update_cursor = static_cast<UpdateID*>(update_cursor->next_.get(segment)); | |
280 | } | |
281 | ||
282 | Data* current_item = channel_cursors[idx]; | |
283 | ||
284 | ACQUIRE_MEMORY_BARRIER; | |
285 | if(current_item->state_ == NOT_FILLED_YET) { | |
286 | continue; | |
287 | } | |
288 | ||
289 | found_an_update = true; | |
290 | ||
291 | log_latency(current_item->tv_sec_, current_item->tv_usec_); | |
292 | channel_cursors[idx] = static_cast<Data*>(current_item->next_.get(segment)); | |
293 | } | |
294 | ||
295 | if(!found_an_update) | |
296 | { | |
297 | std::cerr << "Sleeping" << std::endl; | |
298 | scoped_lock<named_mutex> update_lock(update_mutex); | |
299 | update_condition.wait(update_lock); | |
300 | } | |
301 | } | |
302 | } | |
303 | ||
304 | int main(int argc, char** argv) | |
305 | { | |
306 | using ::int32_t; | |
307 | using ::uint32_t; | |
308 | ||
309 | if(argc == 1) | |
310 | { | |
311 | std::cout << "Using 1st method" << std::endl; | |
312 | method_one(); | |
313 | } | |
314 | else if(argc == 2) | |
315 | { | |
316 | std::cout << "Using 2nd method" << std::endl; | |
317 | method_two(); | |
318 | } | |
319 | else if(argc == 3) | |
320 | { | |
321 | std::cout << "Using 3rd method" << std::endl; | |
322 | method_three(); | |
323 | } | |
324 | ||
325 | return 0; | |
326 | } | |
327 | ||
328 | ||
329 | // lump/server/main.C | |
330 | ||
331 | #include <vector> | |
332 | #include <map> | |
333 | #include <cstring> | |
334 | #include <sstream> | |
335 | #include <iostream> | |
336 | ||
337 | #include <boost/interprocess/sync/scoped_lock.hpp> | |
338 | #include <boost/interprocess/sync/named_mutex.hpp> | |
339 | #include <boost/interprocess/sync/named_condition.hpp> | |
340 | #include <boost/interprocess/allocators/allocator.hpp> | |
341 | #include <boost/interprocess/containers/vector.hpp> | |
342 | #include <boost/interprocess/managed_shared_memory.hpp> | |
343 | ||
344 | #include <lump/list.H> | |
345 | ||
346 | using namespace boost; | |
347 | using namespace boost::interprocess; | |
348 | using namespace std; | |
349 | ||
350 | struct shm_remove | |
351 | { | |
352 | shm_remove() | |
353 | { | |
354 | shared_memory_object::remove("SharedMemoryArea"); | |
355 | named_mutex::remove("update_mutex"); | |
356 | named_condition::remove("update_condition"); | |
357 | } | |
358 | ||
359 | ~shm_remove() | |
360 | { | |
361 | shared_memory_object::remove("SharedMemoryArea"); | |
362 | named_mutex::remove("update_mutex"); | |
363 | named_condition::remove("update_condition"); | |
364 | } | |
365 | } remover; | |
366 | ||
367 | static int CACHE_SIZE = 512 * 1024 * 1024; | |
368 | managed_shared_memory segment(create_only, "SharedMemoryArea", CACHE_SIZE); | |
369 | ||
370 | typedef boost::interprocess::allocator< diff_ptr<Channel>, managed_shared_memory::segment_manager> ShmemAllocator; | |
371 | typedef boost::interprocess::vector< diff_ptr<Channel>, ShmemAllocator > ChannelList; | |
372 | ShmemAllocator alloc_instance(segment.get_segment_manager()); | |
373 | ||
374 | ChannelList& channel_list = *(segment.construct<ChannelList>("channel_list")(alloc_instance)); | |
375 | Channel& update_channel = *(segment.construct<Channel>("update_channel")()); | |
376 | ||
377 | named_mutex update_mutex(create_only, "update_mutex"); | |
378 | named_condition update_condition(create_only, "update_condition"); | |
379 | ||
380 | timeval receive_timestamp; | |
381 | ||
382 | static int NEXT_ID_TO_USE = 0; | |
383 | static int CHANNEL_TO_USE = 0; | |
384 | ||
385 | template<class T> | |
386 | void init_channel(Channel& channel) | |
387 | { | |
388 | T* new_head = static_cast<T*>(segment.allocate(sizeof(T))); | |
389 | new_head->state_ = HEAD; | |
390 | ||
391 | T* new_empty = static_cast<T*>(segment.allocate(sizeof(T))); | |
392 | new_empty->state_ = NOT_FILLED_YET; | |
393 | new_head->next_.set(new_empty, segment); | |
394 | ||
395 | channel.id_ = NEXT_ID_TO_USE++; | |
396 | ||
397 | RELEASE_MEMORY_BARRIER; | |
398 | channel.head_.set(new_head, segment); | |
399 | channel.tail_.set(new_head, segment); | |
400 | } | |
401 | ||
402 | void update(Channel& channel, char *new_data) | |
403 | { | |
404 | gettimeofday(&receive_timestamp, NULL); | |
405 | ||
406 | Data* new_empty_node = static_cast<Data*>(segment.allocate(sizeof(Data))); | |
407 | new_empty_node->state_ = NOT_FILLED_YET; | |
408 | ||
409 | Data* node_to_fill = static_cast<Data*>(channel.tail_.get(segment)->next_.get(segment)); | |
410 | ||
411 | node_to_fill->next_.set(new_empty_node, segment); | |
412 | ||
413 | node_to_fill->tv_sec_ = receive_timestamp.tv_sec; | |
414 | node_to_fill->tv_usec_ = receive_timestamp.tv_usec; | |
415 | ||
416 | strncpy(node_to_fill->dummy_data_, new_data, DUMMY_DATA_SIZE); | |
417 | ||
418 | __sync_fetch_and_add(&(channel.size_), 1); | |
419 | ||
420 | // | |
421 | // Separate out UpdateID work | |
422 | // | |
423 | ||
424 | UpdateID* new_empty_update = static_cast<UpdateID*>(segment.allocate(sizeof(UpdateID))); | |
425 | new_empty_update->state_ = NOT_FILLED_YET; | |
426 | ||
427 | UpdateID* update_node_to_fill = static_cast<UpdateID*>(update_channel.tail_.get(segment)->next_.get(segment)); | |
428 | ||
429 | update_node_to_fill->next_.set(new_empty_update, segment); | |
430 | ||
431 | update_node_to_fill->list_id_ = channel.id_; | |
432 | ||
433 | __sync_fetch_and_add(&(update_channel.size_), 1); | |
434 | ||
435 | RELEASE_MEMORY_BARRIER; | |
436 | node_to_fill->state_ = FILLED; | |
437 | RELEASE_MEMORY_BARRIER; | |
438 | channel.tail_.set(node_to_fill, segment); | |
439 | ||
440 | RELEASE_MEMORY_BARRIER; | |
441 | update_node_to_fill->state_ = FILLED; | |
442 | RELEASE_MEMORY_BARRIER; | |
443 | update_channel.tail_.set(update_node_to_fill, segment); | |
444 | ||
445 | //scoped_lock<named_mutex> update_lock(update_mutex); | |
446 | update_condition.notify_all(); | |
447 | } | |
448 | ||
449 | int main(int argc, char** argv) | |
450 | { | |
451 | for(int i = 0; i < 250000; ++i) | |
452 | { | |
453 | Channel* data_channel = static_cast<Channel*>(segment.allocate(sizeof(Channel))); | |
454 | init_channel<Data>(*data_channel); | |
455 | diff_ptr<Channel> ptr(data_channel, segment); | |
456 | channel_list.push_back(ptr); | |
457 | } | |
458 | ||
459 | init_channel<UpdateID>(update_channel); | |
460 | ||
461 | std::cout << "Cache Size: " << CACHE_SIZE << std::endl; | |
462 | ||
463 | srand ( time(NULL) ); | |
464 | while(true) | |
465 | { | |
466 | char dummy_data_to_transmit[DUMMY_DATA_SIZE]; | |
467 | for(int i = 0; i < DUMMY_DATA_SIZE - 1; ++i) | |
468 | { | |
469 | dummy_data_to_transmit[i] = rand() % (90 - 65) + 65; | |
470 | } | |
471 | dummy_data_to_transmit[DUMMY_DATA_SIZE - 1] = '\0'; | |
472 | ||
473 | //std::cout << "Update: " << dummy_data_to_transmit << std::endl; | |
474 | Channel& channel = *(channel_list[rand() % channel_list.size()].get(segment)); | |
475 | update(channel, &(dummy_data_to_transmit[0])); | |
476 | ||
477 | timespec interval; | |
478 | interval.tv_sec = 0; | |
479 | //interval.tv_sec = rand() % 2; | |
480 | //interval.tv_nsec = rand() % 5000; | |
481 | interval.tv_nsec = rand() % 5000000; | |
482 | nanosleep(&interval, NULL); | |
483 | } | |
484 | ||
485 | return 0; | |
486 | } |