Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <unistd.h>
- #include <atomic>
- #include <chrono>
- #include <deque>
- #include <exception>
- #include <filesystem>
- #include <iostream>
- #include <mutex>
- #include <queue>
- #include <random>
- #include <string>
- #include <thread>
- #include "../../../.build/_deps/libmdbx-src/mdbx.h"
- using namespace std::literals;
- const static std::string lmdb_dir = "lmdb_test";
- constexpr size_t __tmp = 300ull * 1000ull * 1000ull;
- constexpr size_t cache_size =
- (__tmp / 4096ull + bool(__tmp % 4096ul)) * 4096ul; // ~200mb
- constexpr float lmdb_overhead_multiplier = 0.2;
- constexpr std::pair<int, int> buf_distrib = {
- 7ull * 1024 * 1024, 15ull * 1024 * 1024}; // 7mb to 15mb
- constexpr size_t thread_count = 12;
- std::string rand_buffer() {
- static std::random_device rd;
- static std::mt19937 gen(rd());
- static std::uniform_int_distribution<> distrib(buf_distrib.first,
- buf_distrib.second);
- std::string s;
- s.resize(distrib(gen), '-');
- return s;
- }
- constexpr size_t MAX_CHILD_DB_COUNT = 1024 * 1024;
- constexpr size_t MAX_READERS_COUNT = 10 * 1024;
- using TxPtr = std::unique_ptr<MDBX_txn, int (*)(MDBX_txn *)>;
- void commit(TxPtr tx) {
- auto handle = tx.release();
- if (!handle) {
- throw std::runtime_error("transaction already commited");
- };
- if (auto res = mdbx_txn_commit(handle)) {
- throw std::runtime_error(std::string("exception :") + mdbx_strerror(res));
- }
- }
- TxPtr readtx(MDBX_env *env) {
- MDBX_txn *rtx;
- if (auto res = mdbx_txn_begin(env, nullptr, MDBX_TXN_RDONLY, &rtx)) {
- std::cout << __LINE__ << std::endl;
- throw std::runtime_error(std::string("exception :") + mdbx_strerror(res));
- }
- return TxPtr(rtx, mdbx_txn_abort);
- }
- TxPtr mutabletx(MDBX_env *env) {
- MDBX_txn *tx;
- if (auto res = mdbx_txn_begin(env, nullptr, MDBX_TXN_READWRITE, &tx)) {
- std::cout << __LINE__ << std::endl;
- throw std::runtime_error(std::string("exception :") + mdbx_strerror(res));
- }
- return TxPtr(tx, mdbx_txn_abort);
- }
- std::atomic<size_t> inserted{};
- std::atomic<size_t> shrinked{};
- std::atomic<int64_t> real_used_space{};
- std::atomic<int64_t> used_keys{};
- int main() {
- // mdbx_setup_debug(MDBX_LOG_VERBOSE, MDBX_DBG_AUDIT, MDBX_LOGGER_DONTCHANGE);
- std::filesystem::path path(lmdb_dir);
- if (!std::filesystem::exists(path)) {
- std::filesystem::create_directory(path);
- }
- std::filesystem::path data_path = path / "data.mdb";
- std::filesystem::path lock_path = path / "lock.mdb";
- if (std::filesystem::exists(data_path)) {
- std::filesystem::remove(data_path);
- }
- if (std::filesystem::exists(lock_path)) {
- std::filesystem::remove(lock_path);
- }
- MDBX_env *env;
- // Init env
- if (auto res = mdbx_env_create(&env)) {
- std::cout << __LINE__ << std::endl;
- throw std::runtime_error(std::string("exception :") + mdbx_strerror(res));
- }
- if (auto res = mdbx_env_set_maxdbs(env, 2)) {
- std::cout << __LINE__ << std::endl;
- throw std::runtime_error(std::string("exception :") + mdbx_strerror(res));
- }
- if (auto res = mdbx_env_set_maxreaders(env, 20)) {
- std::cout << __LINE__ << std::endl;
- throw std::runtime_error(std::string("exception :") + mdbx_strerror(res));
- }
- if (auto res = mdbx_env_open(env, lmdb_dir.data(), MDBX_COALESCE, 0600)) {
- std::cout << __LINE__ << std::endl;
- throw std::runtime_error(std::string("exception :") + mdbx_strerror(res));
- }
- if (auto res = mdbx_env_set_mapsize(env, cache_size)) {
- std::cout << __LINE__ << std::endl;
- throw std::runtime_error(std::string("exception :") + mdbx_strerror(res));
- }
- MDBX_dbi child_dbi;
- // Init child database.
- {
- auto tx = mutabletx(env);
- if (auto res =
- mdbx_dbi_open(tx.get(), "_child_dbi", MDBX_CREATE, &child_dbi)) {
- std::cout << __LINE__ << std::endl;
- throw std::runtime_error(std::string("exception :") + mdbx_strerror(res));
- }
- commit(std::move(tx));
- }
- auto size = [child_dbi, env]() -> size_t {
- // Make transaction
- auto rtx = readtx(env);
- MDBX_stat result;
- if (auto res =
- mdbx_dbi_stat(rtx.get(), child_dbi, &result, sizeof(MDBX_stat))) {
- std::cout << __LINE__ << std::endl;
- throw std::runtime_error(std::string("exception :") + mdbx_strerror(res));
- }
- rtx.reset();
- MDBX_stat env_stat;
- if (auto res = mdbx_env_stat(env, &env_stat, sizeof(MDBX_stat))) {
- std::cout << __LINE__ << std::endl;
- throw std::runtime_error(std::string("exception :") + mdbx_strerror(res));
- }
- result.ms_branch_pages += env_stat.ms_branch_pages;
- result.ms_leaf_pages += env_stat.ms_leaf_pages;
- result.ms_overflow_pages += env_stat.ms_overflow_pages;
- return result.ms_psize * (result.ms_leaf_pages + result.ms_branch_pages +
- result.ms_overflow_pages + 18);
- };
- auto available_size = [size]() -> size_t {
- uint64_t lmdb_overhead = static_cast<uint64_t>(
- static_cast<float>(cache_size) * lmdb_overhead_multiplier);
- uint64_t sz = size() + lmdb_overhead;
- if (sz > cache_size) {
- return 0;
- }
- return cache_size - sz;
- };
- std::mutex mutex;
- std::queue<std::pair<std::string, size_t>> keys;
- // Insert
- auto insert = [&mutex, &keys, available_size, env, child_dbi](
- const std::string &key, const std::string &value) -> bool {
- auto lock = std::unique_lock{mutex};
- // std::cout << "available_size - " << available_size() << std::endl;
- while (available_size() < key.size() + value.size() + 2 * 4096 &&
- !keys.empty()) {
- auto [key_to_remove, size_of_remove] = keys.front();
- keys.pop();
- // Make transaction
- auto sh_tx = mutabletx(env);
- // Erase
- MDBX_val db_key, db_value;
- db_key.iov_base = const_cast<char *>(key_to_remove.data());
- db_key.iov_len = key_to_remove.size();
- db_value.iov_base = nullptr;
- db_value.iov_len = 0;
- if (auto res = mdbx_del(sh_tx.get(), child_dbi, &db_key, nullptr)) {
- std::cout << __LINE__ << std::endl;
- throw std::runtime_error(std::string("exception :") +
- mdbx_strerror(res));
- }
- // Commit
- commit(std::move(sh_tx));
- real_used_space -= size_of_remove;
- --used_keys;
- ++shrinked;
- }
- // mdb_env_sync(env, 1);
- keys.emplace(key, value.size());
- // Make transaction
- auto tx = mutabletx(env);
- // Make values
- MDBX_val db_key, db_value;
- db_key.iov_base = const_cast<char *>(key.data());
- db_key.iov_len = key.size();
- db_value.iov_base = const_cast<char *>(value.data());
- db_value.iov_len = value.size();
- // Put
- if (auto res =
- mdbx_put(tx.get(), child_dbi, &db_key, &db_value, MDBX_UPSERT)) {
- throw std::runtime_error(std::string("exception :") + mdbx_strerror(res));
- }
- // Commit
- commit(std::move(tx));
- real_used_space += value.size();
- ++inserted;
- ++used_keys;
- return true;
- };
- // Fill just with queue.
- std::cout << "start threads" << std::endl;
- std::atomic<bool> exit_token = true;
- std::vector<std::thread> threads;
- for (size_t i = 0; i < thread_count; ++i) {
- threads.emplace_back([&, i, rand_buf = rand_buffer()]() {
- size_t iter = 0;
- while (exit_token) {
- ++iter;
- std::string key = std::to_string(i) + "_" + std::to_string(iter);
- try {
- insert(key, rand_buf);
- } catch (const std::exception &e) {
- std::cout << std::string("exception :") + e.what() << std::endl;
- exit_token = false;
- }
- }
- });
- }
- while (exit_token) {
- std::cout << std::string("mem: ") + std::to_string(size()) + "/" +
- std::to_string(cache_size) +
- " bytes, inserted = " + std::to_string(inserted) +
- ", shrinked = " + std::to_string(shrinked) + ", " +
- "real_used_space = " + std::to_string(real_used_space) +
- ", used_keys = " + std::to_string(used_keys)
- << std::endl;
- std::this_thread::sleep_for(100ms);
- }
- for (auto &thread : threads) {
- thread.join();
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment