Guest User

Untitled

a guest
Feb 11th, 2021
74
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 8.25 KB | None | 0 0
  1. #include <unistd.h>
  2.  
  3. #include <atomic>
  4. #include <chrono>
  5. #include <deque>
  6. #include <exception>
  7. #include <filesystem>
  8. #include <iostream>
  9. #include <mutex>
  10. #include <queue>
  11. #include <random>
  12. #include <string>
  13. #include <thread>
  14.  
  15. #include "../../../.build/_deps/libmdbx-src/mdbx.h"
  16.  
  17. using namespace std::literals;
  18.  
  19. const static std::string lmdb_dir = "lmdb_test";
  20. constexpr size_t __tmp = 300ull * 1000ull * 1000ull;
  21. constexpr size_t cache_size =
  22. (__tmp / 4096ull + bool(__tmp % 4096ul)) * 4096ul; // ~200mb
  23. constexpr float lmdb_overhead_multiplier = 0.2;
  24.  
  25. constexpr std::pair<int, int> buf_distrib = {
  26. 7ull * 1024 * 1024, 15ull * 1024 * 1024}; // 7mb to 15mb
  27.  
  28. constexpr size_t thread_count = 12;
  29.  
  30. std::string rand_buffer() {
  31. static std::random_device rd;
  32. static std::mt19937 gen(rd());
  33. static std::uniform_int_distribution<> distrib(buf_distrib.first,
  34. buf_distrib.second);
  35. std::string s;
  36. s.resize(distrib(gen), '-');
  37. return s;
  38. }
  39.  
  40. constexpr size_t MAX_CHILD_DB_COUNT = 1024 * 1024;
  41. constexpr size_t MAX_READERS_COUNT = 10 * 1024;
  42.  
  43. using TxPtr = std::unique_ptr<MDBX_txn, int (*)(MDBX_txn *)>;
  44.  
  45. void commit(TxPtr tx) {
  46. auto handle = tx.release();
  47. if (!handle) {
  48. throw std::runtime_error("transaction already commited");
  49. };
  50. if (auto res = mdbx_txn_commit(handle)) {
  51. throw std::runtime_error(std::string("exception :") + mdbx_strerror(res));
  52. }
  53. }
  54.  
  55. TxPtr readtx(MDBX_env *env) {
  56. MDBX_txn *rtx;
  57. if (auto res = mdbx_txn_begin(env, nullptr, MDBX_TXN_RDONLY, &rtx)) {
  58. std::cout << __LINE__ << std::endl;
  59. throw std::runtime_error(std::string("exception :") + mdbx_strerror(res));
  60. }
  61. return TxPtr(rtx, mdbx_txn_abort);
  62. }
  63.  
  64. TxPtr mutabletx(MDBX_env *env) {
  65. MDBX_txn *tx;
  66. if (auto res = mdbx_txn_begin(env, nullptr, MDBX_TXN_READWRITE, &tx)) {
  67. std::cout << __LINE__ << std::endl;
  68. throw std::runtime_error(std::string("exception :") + mdbx_strerror(res));
  69. }
  70. return TxPtr(tx, mdbx_txn_abort);
  71. }
  72.  
  73. std::atomic<size_t> inserted{};
  74. std::atomic<size_t> shrinked{};
  75. std::atomic<int64_t> real_used_space{};
  76. std::atomic<int64_t> used_keys{};
  77.  
  78. int main() {
  79. // mdbx_setup_debug(MDBX_LOG_VERBOSE, MDBX_DBG_AUDIT, MDBX_LOGGER_DONTCHANGE);
  80.  
  81. std::filesystem::path path(lmdb_dir);
  82. if (!std::filesystem::exists(path)) {
  83. std::filesystem::create_directory(path);
  84. }
  85.  
  86. std::filesystem::path data_path = path / "data.mdb";
  87. std::filesystem::path lock_path = path / "lock.mdb";
  88. if (std::filesystem::exists(data_path)) {
  89. std::filesystem::remove(data_path);
  90. }
  91. if (std::filesystem::exists(lock_path)) {
  92. std::filesystem::remove(lock_path);
  93. }
  94.  
  95. MDBX_env *env;
  96.  
  97. // Init env
  98. if (auto res = mdbx_env_create(&env)) {
  99. std::cout << __LINE__ << std::endl;
  100. throw std::runtime_error(std::string("exception :") + mdbx_strerror(res));
  101. }
  102.  
  103. if (auto res = mdbx_env_set_maxdbs(env, 2)) {
  104. std::cout << __LINE__ << std::endl;
  105. throw std::runtime_error(std::string("exception :") + mdbx_strerror(res));
  106. }
  107.  
  108. if (auto res = mdbx_env_set_maxreaders(env, 20)) {
  109. std::cout << __LINE__ << std::endl;
  110. throw std::runtime_error(std::string("exception :") + mdbx_strerror(res));
  111. }
  112.  
  113. if (auto res = mdbx_env_open(env, lmdb_dir.data(), MDBX_COALESCE, 0600)) {
  114. std::cout << __LINE__ << std::endl;
  115. throw std::runtime_error(std::string("exception :") + mdbx_strerror(res));
  116. }
  117.  
  118. if (auto res = mdbx_env_set_mapsize(env, cache_size)) {
  119. std::cout << __LINE__ << std::endl;
  120. throw std::runtime_error(std::string("exception :") + mdbx_strerror(res));
  121. }
  122.  
  123. MDBX_dbi child_dbi;
  124. // Init child database.
  125. {
  126. auto tx = mutabletx(env);
  127. if (auto res =
  128. mdbx_dbi_open(tx.get(), "_child_dbi", MDBX_CREATE, &child_dbi)) {
  129. std::cout << __LINE__ << std::endl;
  130. throw std::runtime_error(std::string("exception :") + mdbx_strerror(res));
  131. }
  132. commit(std::move(tx));
  133. }
  134.  
  135. auto size = [child_dbi, env]() -> size_t {
  136. // Make transaction
  137. auto rtx = readtx(env);
  138.  
  139. MDBX_stat result;
  140. if (auto res =
  141. mdbx_dbi_stat(rtx.get(), child_dbi, &result, sizeof(MDBX_stat))) {
  142. std::cout << __LINE__ << std::endl;
  143. throw std::runtime_error(std::string("exception :") + mdbx_strerror(res));
  144. }
  145. rtx.reset();
  146.  
  147. MDBX_stat env_stat;
  148. if (auto res = mdbx_env_stat(env, &env_stat, sizeof(MDBX_stat))) {
  149. std::cout << __LINE__ << std::endl;
  150. throw std::runtime_error(std::string("exception :") + mdbx_strerror(res));
  151. }
  152.  
  153. result.ms_branch_pages += env_stat.ms_branch_pages;
  154. result.ms_leaf_pages += env_stat.ms_leaf_pages;
  155. result.ms_overflow_pages += env_stat.ms_overflow_pages;
  156.  
  157. return result.ms_psize * (result.ms_leaf_pages + result.ms_branch_pages +
  158. result.ms_overflow_pages + 18);
  159. };
  160.  
  161. auto available_size = [size]() -> size_t {
  162. uint64_t lmdb_overhead = static_cast<uint64_t>(
  163. static_cast<float>(cache_size) * lmdb_overhead_multiplier);
  164. uint64_t sz = size() + lmdb_overhead;
  165. if (sz > cache_size) {
  166. return 0;
  167. }
  168. return cache_size - sz;
  169. };
  170.  
  171. std::mutex mutex;
  172. std::queue<std::pair<std::string, size_t>> keys;
  173.  
  174. // Insert
  175. auto insert = [&mutex, &keys, available_size, env, child_dbi](
  176. const std::string &key, const std::string &value) -> bool {
  177. auto lock = std::unique_lock{mutex};
  178.  
  179. // std::cout << "available_size - " << available_size() << std::endl;
  180.  
  181. while (available_size() < key.size() + value.size() + 2 * 4096 &&
  182. !keys.empty()) {
  183. auto [key_to_remove, size_of_remove] = keys.front();
  184. keys.pop();
  185.  
  186. // Make transaction
  187. auto sh_tx = mutabletx(env);
  188.  
  189. // Erase
  190. MDBX_val db_key, db_value;
  191. db_key.iov_base = const_cast<char *>(key_to_remove.data());
  192. db_key.iov_len = key_to_remove.size();
  193. db_value.iov_base = nullptr;
  194. db_value.iov_len = 0;
  195.  
  196. if (auto res = mdbx_del(sh_tx.get(), child_dbi, &db_key, nullptr)) {
  197. std::cout << __LINE__ << std::endl;
  198. throw std::runtime_error(std::string("exception :") +
  199. mdbx_strerror(res));
  200. }
  201.  
  202. // Commit
  203. commit(std::move(sh_tx));
  204. real_used_space -= size_of_remove;
  205. --used_keys;
  206. ++shrinked;
  207. }
  208. // mdb_env_sync(env, 1);
  209.  
  210. keys.emplace(key, value.size());
  211.  
  212. // Make transaction
  213. auto tx = mutabletx(env);
  214.  
  215. // Make values
  216. MDBX_val db_key, db_value;
  217. db_key.iov_base = const_cast<char *>(key.data());
  218. db_key.iov_len = key.size();
  219. db_value.iov_base = const_cast<char *>(value.data());
  220. db_value.iov_len = value.size();
  221.  
  222. // Put
  223. if (auto res =
  224. mdbx_put(tx.get(), child_dbi, &db_key, &db_value, MDBX_UPSERT)) {
  225. throw std::runtime_error(std::string("exception :") + mdbx_strerror(res));
  226. }
  227.  
  228. // Commit
  229. commit(std::move(tx));
  230. real_used_space += value.size();
  231. ++inserted;
  232. ++used_keys;
  233.  
  234. return true;
  235. };
  236.  
  237. // Fill just with queue.
  238. std::cout << "start threads" << std::endl;
  239.  
  240. std::atomic<bool> exit_token = true;
  241.  
  242. std::vector<std::thread> threads;
  243. for (size_t i = 0; i < thread_count; ++i) {
  244. threads.emplace_back([&, i, rand_buf = rand_buffer()]() {
  245. size_t iter = 0;
  246. while (exit_token) {
  247. ++iter;
  248. std::string key = std::to_string(i) + "_" + std::to_string(iter);
  249.  
  250. try {
  251. insert(key, rand_buf);
  252. } catch (const std::exception &e) {
  253. std::cout << std::string("exception :") + e.what() << std::endl;
  254. exit_token = false;
  255. }
  256. }
  257. });
  258. }
  259.  
  260. while (exit_token) {
  261. std::cout << std::string("mem: ") + std::to_string(size()) + "/" +
  262. std::to_string(cache_size) +
  263. " bytes, inserted = " + std::to_string(inserted) +
  264. ", shrinked = " + std::to_string(shrinked) + ", " +
  265. "real_used_space = " + std::to_string(real_used_space) +
  266. ", used_keys = " + std::to_string(used_keys)
  267. << std::endl;
  268. std::this_thread::sleep_for(100ms);
  269. }
  270.  
  271. for (auto &thread : threads) {
  272. thread.join();
  273. }
  274. }
  275.  
Advertisement
Add Comment
Please, Sign In to add comment