// lump/list.H
#ifndef INCLUDED_LUMP_LIST_H
#define INCLUDED_LUMP_LIST_H
#include <cstddef>
#include <bitset>
#include <boost/interprocess/offset_ptr.hpp>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/tuple/tuple.hpp>
#include <boost/tuple/tuple_comparison.hpp>
#include <sys/time.h>
// The __sync functions are GCC atomic builtins. They're documented here: http://gcc.gnu.org/onlinedocs/gcc-4.1.2/gcc/Atomic-Builtins.html
// method_one() and method_two() have commented out condition waits. You can try uncommenting them, they only make latency worse. Same for the condition
// code in method_three(), it's only left in since that's how I was testing last.
#define FULL_MEMORY_BARRIER __sync_synchronize()
#ifndef MEMORY_BARRIER_DEBUG // If you set this and things start working, you have a memory barrier bug
#define RELEASE_MEMORY_BARRIER_IMPL2(line) int release_barrier##line; __sync_lock_release(&release_barrier##line)
#define RELEASE_MEMORY_BARRIER_IMPL(line) RELEASE_MEMORY_BARRIER_IMPL2(line)
#define RELEASE_MEMORY_BARRIER RELEASE_MEMORY_BARRIER_IMPL(__LINE__)
#else
#define RELEASE_MEMORY_BARRIER FULL_MEMORY_BARRIER
#endif
#ifndef MEMORY_BARRIER_DEBUG
#define ACQUIRE_MEMORY_BARRIER_IMPL2(line) int acquire_barrier##line; __sync_lock_test_and_set(&acquire_barrier##line, 1)
#define ACQUIRE_MEMORY_BARRIER_IMPL(line) ACQUIRE_MEMORY_BARRIER_IMPL2(line)
#define ACQUIRE_MEMORY_BARRIER ACQUIRE_MEMORY_BARRIER_IMPL(__LINE__)
#else
#define ACQUIRE_MEMORY_BARRIER FULL_MEMORY_BARRIER
#endif
// Meant to represent an offset like offset_ptr, but doesn't
// do anything automagically. Nice if you want to have pointers
// from one segment into another segment.
template<class T>
class diff_ptr
{
public:
inline diff_ptr() {}
inline diff_ptr(diff_ptr const& other)
: offset_(other.offset_)
{}
inline diff_ptr(boost::interprocess::offset_ptr<T> const& init,
boost::interprocess::managed_shared_memory const& segment)
{
set(init.get(), segment);
}
inline diff_ptr(T* init,
boost::interprocess::managed_shared_memory const& segment)
{
set(init, segment);
}
inline void set(T* init,
boost::interprocess::managed_shared_memory const& segment)
{
set_offset((char*)(init) - (char*)(segment.get_address()));
}
inline void set(T volatile* init,
boost::interprocess::managed_shared_memory const& segment) volatile
{
set_offset((char*)(init) - (char*)(segment.get_address()));
}
inline void set_offset(ptrdiff_t offset) volatile
{
offset_ = offset;
}
inline T* get(boost::interprocess::managed_shared_memory const& segment) const volatile
{
return reinterpret_cast<T*>((char*)(segment.get_address()) + offset_);
}
private:
std::ptrdiff_t offset_;
};
enum NodeState {
HEAD,
NOT_FILLED_YET,
FILLED
};
struct Node {
volatile NodeState state_;
volatile diff_ptr<Node> next_;
};
struct UpdateID : public Node {
::uint32_t list_id_;
};
#define DUMMY_DATA_SIZE 512
struct Data : public Node {
volatile long tv_sec_;
volatile long tv_usec_;
char dummy_data_[DUMMY_DATA_SIZE];
};
struct Channel {
diff_ptr<Node> head_;
volatile diff_ptr<Node> tail_;
volatile std::size_t size_;
::uint32_t id_;
};
#endif
// lump/child/main.C
#include <iostream>
#include <vector>
#include <string>
#include <set>
#include <boost/thread.hpp>
#include <boost/tokenizer.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <boost/interprocess/sync/named_mutex.hpp>
#include <boost/interprocess/sync/named_condition.hpp>
#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/containers/vector.hpp>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <lump/list.H>
using namespace boost;
using namespace boost::interprocess;
using namespace std;
managed_shared_memory segment(open_only, "SharedMemoryArea");
typedef boost::interprocess::allocator< diff_ptr<Channel>, managed_shared_memory::segment_manager> ShmemAllocator;
typedef boost::interprocess::vector< diff_ptr<Channel>, ShmemAllocator > ChannelList;
ChannelList& channel_list = *(segment.find<ChannelList>("channel_list").first);
Channel& update_channel = *(segment.find<Channel>("update_channel").first);
named_mutex update_mutex(open_only, "update_mutex");
named_condition update_condition(open_only, "update_condition");
void log_latency(long send_tv_sec, long send_tv_usec)
{
timeval arrive_timestamp;
gettimeofday(&arrive_timestamp, NULL);
if(send_tv_sec == 0)
std::cerr << "Send timestamp was 0... something is wrong." << std::endl; // never prints
if(arrive_timestamp.tv_sec == 0)
std::cerr << "Arrive timestamp was 0... something is wrong." << std::endl; // never prints
hrtime_t send_time = hrtime_t(send_tv_sec) * hrtime_t(1e9) + hrtime_t(send_tv_usec) * hrtime_t(1e3);
hrtime_t arrive_time = hrtime_t(arrive_timestamp.tv_sec) * hrtime_t(1e9) + hrtime_t(arrive_timestamp.tv_usec) * hrtime_t(1e3);
hrtime_t latency = arrive_time - send_time;
std::cout << "Latency: " << double(latency) * 1.0e-6 << "ms " << double(latency) * 1.0e-3 << "us " << latency << "ns " << std::endl;
}
void method_one()
{
std::vector<Data*> channel_cursors;
for(ChannelList::iterator i = channel_list.begin(); i != channel_list.end(); ++i)
{
Data* current_item = static_cast<Data*>(i->get(segment)->tail_.get(segment));
if(current_item->state_ == HEAD)
current_item = static_cast<Data*>(current_item->next_.get(segment));
channel_cursors.push_back(current_item);
}
while(true)
{
for(std::size_t i = 0; i < channel_list.size(); ++i)
{
// {
// scoped_lock<named_mutex> update_lock(update_mutex);
// update_condition.wait(update_lock);
// }
Data* current_item = channel_cursors[i];
ACQUIRE_MEMORY_BARRIER;
if(current_item->state_ == NOT_FILLED_YET) {
continue;
}
log_latency(current_item->tv_sec_, current_item->tv_usec_);
channel_cursors[i] = static_cast<Data*>(current_item->next_.get(segment));
}
}
}
void method_two()
{
std::vector<Data*> channel_cursors;
for(ChannelList::iterator i = channel_list.begin(); i != channel_list.end(); ++i)
{
Data* current_item = static_cast<Data*>(i->get(segment)->tail_.get(segment));
if(current_item->state_ == HEAD)
current_item = static_cast<Data*>(current_item->next_.get(segment));
channel_cursors.push_back(current_item);
}
UpdateID* update_cursor = static_cast<UpdateID*>(update_channel.tail_.get(segment));
while(true)
{
// {
// scoped_lock<named_mutex> update_lock(update_mutex);
// update_condition.wait(update_lock);
// }
if(update_cursor->state_ == NOT_FILLED_YET) {
continue;
}
::uint32_t update_id = update_cursor->list_id_;
Data* current_item = channel_cursors[update_id];
if(current_item->state_ == NOT_FILLED_YET) {
std::cerr << "This should never print." << std::endl; // it doesn't
continue;
}
log_latency(current_item->tv_sec_, current_item->tv_usec_);
channel_cursors[update_id] = static_cast<Data*>(current_item->next_.get(segment));
update_cursor = static_cast<UpdateID*>(update_cursor->next_.get(segment));
}
}
void method_three()
{
std::vector<Data*> channel_cursors;
for(ChannelList::iterator i = channel_list.begin(); i != channel_list.end(); ++i)
{
Data* current_item = static_cast<Data*>(i->get(segment)->tail_.get(segment));
if(current_item->state_ == HEAD)
current_item = static_cast<Data*>(current_item->next_.get(segment));
channel_cursors.push_back(current_item);
}
UpdateID* update_cursor = static_cast<UpdateID*>(update_channel.tail_.get(segment));
while(true)
{
bool found_an_update = false;
for(std::size_t i = 0; i < channel_list.size(); ++i)
{
std::size_t idx = i;
ACQUIRE_MEMORY_BARRIER;
if(update_cursor->state_ != NOT_FILLED_YET) {
//std::cerr << "Found via update" << std::endl;
i--;
idx = update_cursor->list_id_;
update_cursor = static_cast<UpdateID*>(update_cursor->next_.get(segment));
}
Data* current_item = channel_cursors[idx];
ACQUIRE_MEMORY_BARRIER;
if(current_item->state_ == NOT_FILLED_YET) {
continue;
}
found_an_update = true;
log_latency(current_item->tv_sec_, current_item->tv_usec_);
channel_cursors[idx] = static_cast<Data*>(current_item->next_.get(segment));
}
if(!found_an_update)
{
std::cerr << "Sleeping" << std::endl;
scoped_lock<named_mutex> update_lock(update_mutex);
update_condition.wait(update_lock);
}
}
}
int main(int argc, char** argv)
{
using ::int32_t;
using ::uint32_t;
if(argc == 1)
{
std::cout << "Using 1st method" << std::endl;
method_one();
}
else if(argc == 2)
{
std::cout << "Using 2nd method" << std::endl;
method_two();
}
else if(argc == 3)
{
std::cout << "Using 3rd method" << std::endl;
method_three();
}
return 0;
}
// lump/server/main.C
#include <vector>
#include <map>
#include <cstring>
#include <sstream>
#include <iostream>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <boost/interprocess/sync/named_mutex.hpp>
#include <boost/interprocess/sync/named_condition.hpp>
#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/containers/vector.hpp>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <lump/list.H>
using namespace boost;
using namespace boost::interprocess;
using namespace std;
struct shm_remove
{
shm_remove()
{
shared_memory_object::remove("SharedMemoryArea");
named_mutex::remove("update_mutex");
named_condition::remove("update_condition");
}
~shm_remove()
{
shared_memory_object::remove("SharedMemoryArea");
named_mutex::remove("update_mutex");
named_condition::remove("update_condition");
}
} remover;
static int CACHE_SIZE = 512 * 1024 * 1024;
managed_shared_memory segment(create_only, "SharedMemoryArea", CACHE_SIZE);
typedef boost::interprocess::allocator< diff_ptr<Channel>, managed_shared_memory::segment_manager> ShmemAllocator;
typedef boost::interprocess::vector< diff_ptr<Channel>, ShmemAllocator > ChannelList;
ShmemAllocator alloc_instance(segment.get_segment_manager());
ChannelList& channel_list = *(segment.construct<ChannelList>("channel_list")(alloc_instance));
Channel& update_channel = *(segment.construct<Channel>("update_channel")());
named_mutex update_mutex(create_only, "update_mutex");
named_condition update_condition(create_only, "update_condition");
timeval receive_timestamp;
static int NEXT_ID_TO_USE = 0;
static int CHANNEL_TO_USE = 0;
template<class T>
void init_channel(Channel& channel)
{
T* new_head = static_cast<T*>(segment.allocate(sizeof(T)));
new_head->state_ = HEAD;
T* new_empty = static_cast<T*>(segment.allocate(sizeof(T)));
new_empty->state_ = NOT_FILLED_YET;
new_head->next_.set(new_empty, segment);
channel.id_ = NEXT_ID_TO_USE++;
RELEASE_MEMORY_BARRIER;
channel.head_.set(new_head, segment);
channel.tail_.set(new_head, segment);
}
void update(Channel& channel, char *new_data)
{
gettimeofday(&receive_timestamp, NULL);
Data* new_empty_node = static_cast<Data*>(segment.allocate(sizeof(Data)));
new_empty_node->state_ = NOT_FILLED_YET;
Data* node_to_fill = static_cast<Data*>(channel.tail_.get(segment)->next_.get(segment));
node_to_fill->next_.set(new_empty_node, segment);
node_to_fill->tv_sec_ = receive_timestamp.tv_sec;
node_to_fill->tv_usec_ = receive_timestamp.tv_usec;
strncpy(node_to_fill->dummy_data_, new_data, DUMMY_DATA_SIZE);
__sync_fetch_and_add(&(channel.size_), 1);
//
// Separate out UpdateID work
//
UpdateID* new_empty_update = static_cast<UpdateID*>(segment.allocate(sizeof(UpdateID)));
new_empty_update->state_ = NOT_FILLED_YET;
UpdateID* update_node_to_fill = static_cast<UpdateID*>(update_channel.tail_.get(segment)->next_.get(segment));
update_node_to_fill->next_.set(new_empty_update, segment);
update_node_to_fill->list_id_ = channel.id_;
__sync_fetch_and_add(&(update_channel.size_), 1);
RELEASE_MEMORY_BARRIER;
node_to_fill->state_ = FILLED;
RELEASE_MEMORY_BARRIER;
channel.tail_.set(node_to_fill, segment);
RELEASE_MEMORY_BARRIER;
update_node_to_fill->state_ = FILLED;
RELEASE_MEMORY_BARRIER;
update_channel.tail_.set(update_node_to_fill, segment);
//scoped_lock<named_mutex> update_lock(update_mutex);
update_condition.notify_all();
}
int main(int argc, char** argv)
{
for(int i = 0; i < 250000; ++i)
{
Channel* data_channel = static_cast<Channel*>(segment.allocate(sizeof(Channel)));
init_channel<Data>(*data_channel);
diff_ptr<Channel> ptr(data_channel, segment);
channel_list.push_back(ptr);
}
init_channel<UpdateID>(update_channel);
std::cout << "Cache Size: " << CACHE_SIZE << std::endl;
srand ( time(NULL) );
while(true)
{
char dummy_data_to_transmit[DUMMY_DATA_SIZE];
for(int i = 0; i < DUMMY_DATA_SIZE - 1; ++i)
{
dummy_data_to_transmit[i] = rand() % (90 - 65) + 65;
}
dummy_data_to_transmit[DUMMY_DATA_SIZE - 1] = '\0';
//std::cout << "Update: " << dummy_data_to_transmit << std::endl;
Channel& channel = *(channel_list[rand() % channel_list.size()].get(segment));
update(channel, &(dummy_data_to_transmit[0]));
timespec interval;
interval.tv_sec = 0;
//interval.tv_sec = rand() % 2;
//interval.tv_nsec = rand() % 5000;
interval.tv_nsec = rand() % 5000000;
nanosleep(&interval, NULL);
}
return 0;
}