View difference between Paste ID: 0kuzm3Uf and
SHOW: | | - or go back to the newest paste.
1-
1+
// lump/list.H
2
3
#ifndef INCLUDED_LUMP_LIST_H
4
#define INCLUDED_LUMP_LIST_H
5
6
#include <cstddef>
7
#include <bitset>
8
9
#include <boost/interprocess/offset_ptr.hpp>
10
#include <boost/interprocess/managed_shared_memory.hpp>
11
#include <boost/tuple/tuple.hpp>
12
#include <boost/tuple/tuple_comparison.hpp>
13
14
#include <sys/time.h>
15
16
// The __sync functions are GCC atomic builtins. They're documented here: http://gcc.gnu.org/onlinedocs/gcc-4.1.2/gcc/Atomic-Builtins.html
17
// method_one() and method_two() have commented out condition waits. You can try uncommenting them, they only make latency worse. Same for the condition
18
// code in method_three(), it's only left in since that's how I was testing last.
19
20
#define FULL_MEMORY_BARRIER __sync_synchronize()
21
22
#ifndef MEMORY_BARRIER_DEBUG // If you set this and things start working, you have a memory barrier bug
23
#define RELEASE_MEMORY_BARRIER_IMPL2(line) int release_barrier##line; __sync_lock_release(&release_barrier##line)
24
#define RELEASE_MEMORY_BARRIER_IMPL(line) RELEASE_MEMORY_BARRIER_IMPL2(line)
25
#define RELEASE_MEMORY_BARRIER RELEASE_MEMORY_BARRIER_IMPL(__LINE__)
26
#else
27
#define RELEASE_MEMORY_BARRIER FULL_MEMORY_BARRIER
28
#endif
29
30
#ifndef MEMORY_BARRIER_DEBUG
31
#define ACQUIRE_MEMORY_BARRIER_IMPL2(line) int acquire_barrier##line; __sync_lock_test_and_set(&acquire_barrier##line, 1)
32
#define ACQUIRE_MEMORY_BARRIER_IMPL(line) ACQUIRE_MEMORY_BARRIER_IMPL2(line)
33
#define ACQUIRE_MEMORY_BARRIER ACQUIRE_MEMORY_BARRIER_IMPL(__LINE__)
34
#else
35
#define ACQUIRE_MEMORY_BARRIER FULL_MEMORY_BARRIER
36
#endif
37
38
// Meant to represent an offset like offset_ptr, but doesn't
39
// do anything automagically. Nice if you want to have pointers
40
// from one segment into another segment.
41
template<class T>
42
class diff_ptr
43
{
44
public:
45
	inline diff_ptr() {}
46
47
	inline diff_ptr(diff_ptr const& other)
48
		: offset_(other.offset_)
49
	{}
50
	
51
	inline diff_ptr(boost::interprocess::offset_ptr<T> const& init,
52
					boost::interprocess::managed_shared_memory const& segment) 
53
	{
54
		set(init.get(), segment);
55
	}
56
57
	inline diff_ptr(T* init,
58
					boost::interprocess::managed_shared_memory const& segment)
59
	{
60
		set(init, segment);
61
	}
62
63
	inline void set(T* init,
64
					boost::interprocess::managed_shared_memory const& segment)
65
	{
66
		set_offset((char*)(init) - (char*)(segment.get_address()));
67
	}
68
69
	inline void set(T volatile* init,
70
					boost::interprocess::managed_shared_memory const& segment) volatile
71
	{
72
		set_offset((char*)(init) - (char*)(segment.get_address()));
73
	}
74
75
	inline void set_offset(ptrdiff_t offset) volatile
76
	{
77
		offset_ = offset;
78
	}
79
80
	inline T* get(boost::interprocess::managed_shared_memory const& segment) const volatile
81
	{
82
		return reinterpret_cast<T*>((char*)(segment.get_address()) + offset_);
83
	}
84
	
85
private:
86
	std::ptrdiff_t offset_;
87
};
88
89
enum NodeState {
90
	HEAD,
91
	NOT_FILLED_YET,
92
	FILLED
93
};
94
95
struct Node {
96
	volatile NodeState state_;
97
	volatile diff_ptr<Node> next_;
98
};
99
100
struct UpdateID : public Node {
101
	::uint32_t list_id_;
102
};
103
104
#define DUMMY_DATA_SIZE 512
105
106
struct Data : public Node {
107
	volatile long tv_sec_;
108
	volatile long tv_usec_;
109
110
	char dummy_data_[DUMMY_DATA_SIZE];
111
};
112
113
struct Channel {
114
	diff_ptr<Node> head_;
115
	volatile diff_ptr<Node> tail_;
116
	volatile std::size_t size_;
117
	::uint32_t id_;
118
};
119
120
#endif
121
122
123
// lump/child/main.C
124
125
#include <iostream>
126
#include <vector>
127
#include <string>
128
#include <set>
129
130
#include <boost/thread.hpp>
131
#include <boost/tokenizer.hpp>
132
133
#include <boost/interprocess/sync/scoped_lock.hpp>
134
#include <boost/interprocess/sync/named_mutex.hpp>
135
#include <boost/interprocess/sync/named_condition.hpp>
136
#include <boost/interprocess/allocators/allocator.hpp>
137
#include <boost/interprocess/containers/vector.hpp>
138
#include <boost/interprocess/managed_shared_memory.hpp>
139
140
#include <lump/list.H>
141
142
using namespace boost;
143
using namespace boost::interprocess;
144
using namespace std;
145
146
managed_shared_memory segment(open_only, "SharedMemoryArea");
147
148
typedef boost::interprocess::allocator< diff_ptr<Channel>, managed_shared_memory::segment_manager> ShmemAllocator;
149
typedef boost::interprocess::vector< diff_ptr<Channel>, ShmemAllocator > ChannelList;
150
151
ChannelList& channel_list = *(segment.find<ChannelList>("channel_list").first);
152
Channel& update_channel = *(segment.find<Channel>("update_channel").first);
153
154
named_mutex update_mutex(open_only, "update_mutex");
155
named_condition update_condition(open_only, "update_condition");
156
157
void log_latency(long send_tv_sec, long send_tv_usec)
158
{
159
	timeval arrive_timestamp;
160
	gettimeofday(&arrive_timestamp, NULL);
161
162
	if(send_tv_sec == 0)
163
		std::cerr << "Send timestamp was 0... something is wrong." << std::endl; // never prints
164
	if(arrive_timestamp.tv_sec == 0)
165
		std::cerr << "Arrive timestamp was 0... something is wrong." << std::endl; // never prints
166
167
	hrtime_t send_time = hrtime_t(send_tv_sec) * hrtime_t(1e9) + hrtime_t(send_tv_usec) * hrtime_t(1e3);
168
	hrtime_t arrive_time = hrtime_t(arrive_timestamp.tv_sec) * hrtime_t(1e9) + hrtime_t(arrive_timestamp.tv_usec) * hrtime_t(1e3);
169
	hrtime_t latency = arrive_time - send_time;
170
171
	std::cout << "Latency: " << double(latency) * 1.0e-6 << "ms " << double(latency) * 1.0e-3 << "us " << latency << "ns " << std::endl;
172
}
173
174
void method_one()
175
{
176
	std::vector<Data*> channel_cursors;
177
	for(ChannelList::iterator i = channel_list.begin(); i != channel_list.end(); ++i)
178
	{
179
		Data* current_item = static_cast<Data*>(i->get(segment)->tail_.get(segment));
180
181
		if(current_item->state_ == HEAD)
182
			current_item = static_cast<Data*>(current_item->next_.get(segment));
183
		
184
		channel_cursors.push_back(current_item);
185
	}
186
	
187
	while(true)
188
	{
189
		for(std::size_t i = 0; i < channel_list.size(); ++i)
190
		{
191
			// {
192
			// 	scoped_lock<named_mutex> update_lock(update_mutex);
193
			// 	update_condition.wait(update_lock);
194
			// }
195
			
196
			Data* current_item = channel_cursors[i];
197
198
			ACQUIRE_MEMORY_BARRIER;
199
			if(current_item->state_ == NOT_FILLED_YET) {
200
				continue;
201
			}
202
203
			log_latency(current_item->tv_sec_, current_item->tv_usec_);
204
205
			channel_cursors[i] = static_cast<Data*>(current_item->next_.get(segment));
206
		}
207
	}
208
}
209
210
void method_two()
211
{
212
	std::vector<Data*> channel_cursors;
213
	for(ChannelList::iterator i = channel_list.begin(); i != channel_list.end(); ++i)
214
	{
215
		Data* current_item = static_cast<Data*>(i->get(segment)->tail_.get(segment));
216
217
		if(current_item->state_ == HEAD)
218
			current_item = static_cast<Data*>(current_item->next_.get(segment));
219
		
220
		channel_cursors.push_back(current_item);
221
	}
222
	
223
	UpdateID* update_cursor = static_cast<UpdateID*>(update_channel.tail_.get(segment));
224
225
	while(true)
226
	{
227
		// {
228
		// 	scoped_lock<named_mutex> update_lock(update_mutex);
229
		// 	update_condition.wait(update_lock);
230
		// }
231
		
232
		if(update_cursor->state_ == NOT_FILLED_YET) {
233
			continue;
234
		}
235
		
236
		::uint32_t update_id = update_cursor->list_id_;
237
238
		Data* current_item = channel_cursors[update_id];
239
		
240
		if(current_item->state_ == NOT_FILLED_YET) {
241
			std::cerr << "This should never print." << std::endl; // it doesn't
242
			continue;
243
		}
244
245
		log_latency(current_item->tv_sec_, current_item->tv_usec_);
246
		
247
		channel_cursors[update_id] = static_cast<Data*>(current_item->next_.get(segment));
248
		update_cursor = static_cast<UpdateID*>(update_cursor->next_.get(segment));
249
	}	
250
}
251
252
void method_three()
253
{
254
	std::vector<Data*> channel_cursors;
255
	for(ChannelList::iterator i = channel_list.begin(); i != channel_list.end(); ++i)
256
	{
257
		Data* current_item = static_cast<Data*>(i->get(segment)->tail_.get(segment));
258
259
		if(current_item->state_ == HEAD)
260
			current_item = static_cast<Data*>(current_item->next_.get(segment));
261
		
262
		channel_cursors.push_back(current_item);
263
	}
264
265
	UpdateID* update_cursor = static_cast<UpdateID*>(update_channel.tail_.get(segment));
266
267
	while(true)
268
	{
269
		bool found_an_update = false;
270
		for(std::size_t i = 0; i < channel_list.size(); ++i)
271
		{
272
			std::size_t idx = i;
273
			
274
			ACQUIRE_MEMORY_BARRIER;
275
			if(update_cursor->state_ != NOT_FILLED_YET) {
276
				//std::cerr << "Found via update" << std::endl;
277
				i--;
278
				idx = update_cursor->list_id_;
279
				update_cursor = static_cast<UpdateID*>(update_cursor->next_.get(segment));
280
			}
281
			
282
			Data* current_item = channel_cursors[idx];
283
284
			ACQUIRE_MEMORY_BARRIER;
285
			if(current_item->state_ == NOT_FILLED_YET) {
286
				continue;
287
			}
288
289
			found_an_update = true;
290
291
			log_latency(current_item->tv_sec_, current_item->tv_usec_);
292
			channel_cursors[idx] = static_cast<Data*>(current_item->next_.get(segment));
293
		}
294
295
		if(!found_an_update)
296
		{
297
			std::cerr << "Sleeping" << std::endl;
298
			scoped_lock<named_mutex> update_lock(update_mutex);
299
			update_condition.wait(update_lock);
300
		}
301
	}
302
}
303
304
int main(int argc, char** argv)
305
{
306
	using ::int32_t;
307
	using ::uint32_t;
308
309
	if(argc == 1)
310
	{
311
		std::cout << "Using 1st method" << std::endl;
312
		method_one();
313
	}
314
	else if(argc == 2)
315
	{
316
		std::cout << "Using 2nd method" << std::endl;
317
		method_two();
318
	}
319
	else if(argc == 3)
320
	{
321
		std::cout << "Using 3rd method" << std::endl;
322
		method_three();
323
	}
324
	
325
	return 0;
326
}
327
328
329
// lump/server/main.C
330
331
#include <vector>
332
#include <map>
333
#include <cstring>
334
#include <sstream>
335
#include <iostream>
336
337
#include <boost/interprocess/sync/scoped_lock.hpp>
338
#include <boost/interprocess/sync/named_mutex.hpp>
339
#include <boost/interprocess/sync/named_condition.hpp>
340
#include <boost/interprocess/allocators/allocator.hpp>
341
#include <boost/interprocess/containers/vector.hpp>
342
#include <boost/interprocess/managed_shared_memory.hpp>
343
344
#include <lump/list.H>
345
346
using namespace boost;
347
using namespace boost::interprocess;
348
using namespace std;
349
350
struct shm_remove
351
{
352
	shm_remove()
353
	{
354
		shared_memory_object::remove("SharedMemoryArea");
355
		named_mutex::remove("update_mutex");
356
		named_condition::remove("update_condition");
357
	}
358
	
359
	~shm_remove()
360
	{
361
		shared_memory_object::remove("SharedMemoryArea");
362
		named_mutex::remove("update_mutex");
363
		named_condition::remove("update_condition");
364
	}
365
} remover;
366
367
static int CACHE_SIZE = 512 * 1024 * 1024;
368
managed_shared_memory segment(create_only, "SharedMemoryArea", CACHE_SIZE);
369
370
typedef boost::interprocess::allocator< diff_ptr<Channel>, managed_shared_memory::segment_manager> ShmemAllocator;
371
typedef boost::interprocess::vector< diff_ptr<Channel>, ShmemAllocator > ChannelList;
372
ShmemAllocator alloc_instance(segment.get_segment_manager());
373
374
ChannelList& channel_list = *(segment.construct<ChannelList>("channel_list")(alloc_instance));
375
Channel& update_channel = *(segment.construct<Channel>("update_channel")());
376
377
named_mutex update_mutex(create_only, "update_mutex");
378
named_condition update_condition(create_only, "update_condition");
379
380
timeval receive_timestamp;
381
382
static int NEXT_ID_TO_USE = 0;
383
static int CHANNEL_TO_USE = 0;
384
385
template<class T>
386
void init_channel(Channel& channel)
387
{
388
	T* new_head = static_cast<T*>(segment.allocate(sizeof(T)));
389
	new_head->state_ = HEAD;
390
391
	T* new_empty = static_cast<T*>(segment.allocate(sizeof(T)));
392
	new_empty->state_ = NOT_FILLED_YET;
393
	new_head->next_.set(new_empty, segment);
394
395
	channel.id_ = NEXT_ID_TO_USE++;
396
397
	RELEASE_MEMORY_BARRIER;
398
	channel.head_.set(new_head, segment);
399
	channel.tail_.set(new_head, segment);
400
}
401
402
void update(Channel& channel, char *new_data)
403
{
404
	gettimeofday(&receive_timestamp, NULL);
405
		
406
	Data* new_empty_node = static_cast<Data*>(segment.allocate(sizeof(Data)));
407
	new_empty_node->state_ = NOT_FILLED_YET;
408
409
	Data* node_to_fill = static_cast<Data*>(channel.tail_.get(segment)->next_.get(segment));
410
	
411
	node_to_fill->next_.set(new_empty_node, segment);
412
413
	node_to_fill->tv_sec_ = receive_timestamp.tv_sec;
414
	node_to_fill->tv_usec_ = receive_timestamp.tv_usec;
415
416
	strncpy(node_to_fill->dummy_data_, new_data, DUMMY_DATA_SIZE);
417
418
	__sync_fetch_and_add(&(channel.size_), 1);
419
420
	//
421
	// Separate out UpdateID work
422
	//
423
424
	UpdateID* new_empty_update = static_cast<UpdateID*>(segment.allocate(sizeof(UpdateID)));
425
	new_empty_update->state_ = NOT_FILLED_YET;
426
427
	UpdateID* update_node_to_fill = static_cast<UpdateID*>(update_channel.tail_.get(segment)->next_.get(segment));
428
	
429
	update_node_to_fill->next_.set(new_empty_update, segment);
430
431
	update_node_to_fill->list_id_ = channel.id_;
432
433
	__sync_fetch_and_add(&(update_channel.size_), 1);
434
435
	RELEASE_MEMORY_BARRIER;
436
	node_to_fill->state_ = FILLED;
437
	RELEASE_MEMORY_BARRIER;
438
	channel.tail_.set(node_to_fill, segment);
439
440
	RELEASE_MEMORY_BARRIER;
441
	update_node_to_fill->state_ = FILLED;
442
	RELEASE_MEMORY_BARRIER;
443
	update_channel.tail_.set(update_node_to_fill, segment);
444
445
	//scoped_lock<named_mutex> update_lock(update_mutex);
446
	update_condition.notify_all();
447
}
448
449
int main(int argc, char** argv)
450
{
451
	for(int i = 0; i < 250000; ++i)
452
	{
453
		Channel* data_channel = static_cast<Channel*>(segment.allocate(sizeof(Channel)));
454
		init_channel<Data>(*data_channel);
455
		diff_ptr<Channel> ptr(data_channel, segment);
456
		channel_list.push_back(ptr);
457
	}
458
459
	init_channel<UpdateID>(update_channel);
460
461
	std::cout << "Cache Size: " << CACHE_SIZE << std::endl;
462
463
	srand ( time(NULL) );
464
	while(true)
465
	{
466
		char dummy_data_to_transmit[DUMMY_DATA_SIZE];
467
		for(int i = 0; i < DUMMY_DATA_SIZE - 1; ++i)
468
		{
469
			dummy_data_to_transmit[i] = rand() % (90 - 65) + 65;
470
		}
471
		dummy_data_to_transmit[DUMMY_DATA_SIZE - 1] = '\0';
472
473
		//std::cout << "Update: " << dummy_data_to_transmit << std::endl;
474
		Channel& channel = *(channel_list[rand() % channel_list.size()].get(segment));
475
		update(channel, &(dummy_data_to_transmit[0]));
476
477
		timespec interval;
478
		interval.tv_sec = 0;
479
		//interval.tv_sec = rand() % 2;
480
		//interval.tv_nsec = rand() % 5000;
481
		interval.tv_nsec = rand() % 5000000;
482
		nanosleep(&interval, NULL);
483
	}
484
485
	return 0;
486
}