Advertisement
AndrewSci

ConcurrentMap

Jun 26th, 2019
580
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.09 KB | None | 0 0
  1. #include "test_runner.h"
  2. #include "profile.h"
  3.  
  4. #include <algorithm>
  5. #include <numeric>
  6. #include <vector>
  7. #include <string>
  8. #include <random>
  9. #include <future>
  10. #include <mutex>
  11. #include <cmath>
  12. using namespace std;
  13.  
  14. template <typename K, typename V>
  15. class ConcurrentMap {
  16. public:
  17. static_assert(is_integral_v<K>, "ConcurrentMap supports only integer keys");
  18.  
  19. struct Access {
  20. V& ref_to_value;
  21. lock_guard<mutex> guard;
  22. };
  23. struct Base {
  24. map<K, V> Map;
  25. mutex m;
  26. };
  27.  
  28. explicit ConcurrentMap(size_t bucket_count) : OrdinaryMap(bucket_count), SIZE_MAP(bucket_count){}
  29.  
  30. Access operator[](const K& key)
  31. {
  32. lock_guard<mutex> s(guard_base);
  33.  
  34. Base& item = OrdinaryMap[key_pos(key)];
  35. return { item.Map[key], lock_guard(item.m) };
  36. }
  37.  
  38. map<K, V> BuildOrdinaryMap()
  39. {
  40. map<K, V> result;
  41. for (auto& it : OrdinaryMap)
  42. {
  43. for (auto& it_2 : it.Map)
  44. {
  45. result[it_2.first] = it_2.second;
  46. }
  47. }
  48. return result;
  49. }
  50. private:
  51. vector<Base> OrdinaryMap;
  52. mutex guard_base;
  53. int SIZE_MAP;
  54.  
  55. int64_t key_pos(K key)
  56. {
  57. int64_t num = key;
  58. return abs(num) % SIZE_MAP;
  59. }
  60. };
  61.  
  62. void RunConcurrentUpdates(ConcurrentMap<int, int>& cm, size_t thread_count, int key_count)
  63. {
  64. auto kernel = [&cm, key_count](int seed) {
  65. vector<int> updates(key_count);
  66. iota(begin(updates), end(updates), -key_count / 2);
  67. shuffle(begin(updates), end(updates), default_random_engine(seed));
  68.  
  69. for (int i = 0; i < 2; ++i) {
  70. for (auto key : updates) {
  71. cm[key].ref_to_value++;
  72. }
  73. }
  74. };
  75.  
  76. vector<future<void>> futures;
  77. for (size_t i = 0; i < thread_count; ++i) {
  78. futures.push_back(async(kernel, i));
  79. }
  80. }
  81.  
  82. void TestConcurrentUpdate() {
  83. const size_t thread_count = 3;
  84. const size_t key_count = 50000;
  85.  
  86. ConcurrentMap<int, int> cm(thread_count);
  87. RunConcurrentUpdates(cm, thread_count, key_count);
  88.  
  89. const auto result = cm.BuildOrdinaryMap();
  90. ASSERT_EQUAL(result.size(), key_count);
  91. for (auto&[k, v] : result) {
  92. AssertEqual(v, 6, "Key = " + to_string(k));
  93. }
  94. }
  95.  
  96. void TestReadAndWrite() {
  97. ConcurrentMap<size_t, string> cm(5);
  98.  
  99. auto updater = [&cm] {
  100. for (size_t i = 0; i < 50000; ++i) {
  101. cm[i].ref_to_value += 'a';
  102. }
  103. };
  104. auto reader = [&cm] {
  105. vector<string> result(50000);
  106. for (size_t i = 0; i < result.size(); ++i) {
  107. result[i] = cm[i].ref_to_value;
  108. }
  109. return result;
  110. };
  111.  
  112. auto u1 = async(updater);
  113. auto r1 = async(reader);
  114. auto u2 = async(updater);
  115. auto r2 = async(reader);
  116.  
  117. u1.get();
  118. u2.get();
  119.  
  120. for (auto f : { &r1, &r2 }) {
  121. auto result = f->get();
  122. ASSERT(all_of(result.begin(), result.end(), [](const string& s) {
  123. return s.empty() || s == "a" || s == "aa";
  124. }));
  125. }
  126. }
  127.  
  128. void TestSpeedup() {
  129. {
  130. ConcurrentMap<int, int> single_lock(1);
  131.  
  132. LOG_DURATION("Single lock");
  133. RunConcurrentUpdates(single_lock, 4, 50000);
  134. }
  135. {
  136. ConcurrentMap<int, int> many_locks(100);
  137.  
  138. LOG_DURATION("100 locks");
  139. RunConcurrentUpdates(many_locks, 4, 50000);
  140. }
  141. }
  142.  
  143. int main() {
  144. TestRunner tr;
  145. RUN_TEST(tr, TestConcurrentUpdate);
  146. RUN_TEST(tr, TestReadAndWrite);
  147. RUN_TEST(tr, TestSpeedup);
  148. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement