Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <array>
- #include <algorithm>
- #include <atomic>
- #include <cassert>
- #include <chrono>
- #include <condition_variable>
- #include <iostream>
- #include <fstream>
- #include <queue>
- #include <mutex>
- #include <stack>
- #include <string>
- #include <thread>
- #include <vector>
- struct exit_code
- {
- enum
- {
- success,
- invalid_usage,
- file_error,
- not_enoght_memory,
- unexpected_error
- };
- };
- template <typename T>
- class file_sorter final
- {
- public:
- explicit file_sorter(
- size_t working_memory_size,
- size_t used_cores = std::thread::hardware_concurrency())
- : chunks_count_(used_cores)
- , chunk_size_(working_memory_size / (used_cores + 1) / sizeof(value_t)) // +1 is file buffer
- , total_chunks_(0)
- {
- chunks_ = make_chunks(chunks_count_, chunk_size_);
- }
- file_sorter(const file_sorter&) = delete;
- file_sorter& operator=(const file_sorter&) = delete;
- file_sorter(file_sorter&&) = delete;
- file_sorter& operator=(file_sorter&&) = delete;
- void sort(const std::string& source_file_name, const std::string& destination_file_name)
- {
- reset();
- std::ifstream source(source_file_name, std::ios::binary);
- if (!source)
- {
- throw std::runtime_error("can't open '" + source_file_name + "'");
- }
- while (!source.eof())
- {
- if (!source.read(reinterpret_cast<char*>(file_buf_.data()), file_buf_.size() * sizeof(value_t)))
- {
- if (!source.eof())
- {
- throw std::runtime_error("can't read '" + source_file_name + "'");
- }
- }
- if (error_)
- {
- throw std::runtime_error("unable to write temporary file");
- }
- const auto bytes_readed = static_cast<size_t>(source.gcount());
- file_buf_.resize(bytes_readed / sizeof(value_t));
- std::unique_lock<std::mutex> lock(mutex_);
- while (free_chunks_.empty())
- chunk_processed_.wait(lock);
- const auto chunk_index = free_chunks_.top();
- free_chunks_.pop();
- chunks_[chunk_index].swap(file_buf_);
- file_buf_.resize(chunk_size_);
- std::thread thread(
- std::bind(&file_sorter::sort_and_save, this, get_sorted_chunk_file_name(total_chunks_), chunk_index));
- thread.detach();
- ++total_chunks_;
- }
- // waiting for the async operations completion
- std::unique_lock<std::mutex> lock(mutex_);
- while (free_chunks_.size() < chunks_count_)
- chunk_processed_.wait(lock);
- std::ofstream destination(destination_file_name, std::ios::binary);
- if (!destination)
- {
- throw std::runtime_error("can't open '" + destination_file_name + "'");
- }
- merge_and_save(destination);
- cleanup();
- }
- private:
- using value_t = T;
- using chunk_t = std::vector<value_t>;
- using chunk_list_t = std::vector<chunk_t>;
- void cleanup() const
- {
- for (auto i = 0; i < total_chunks_; ++i)
- {
- std::remove(get_sorted_chunk_file_name(i).c_str());
- }
- }
- void merge_and_save(std::ofstream& destination) const
- {
- std::vector<std::ifstream> sorted_files;
- for (auto i = 0; i < total_chunks_; ++i)
- {
- sorted_files.emplace_back(get_sorted_chunk_file_name(i), std::ios::binary);
- if (!sorted_files.back())
- {
- throw std::runtime_error("can't open temporary file");
- }
- }
- using file_by_value_pair_t = std::pair<value_t, size_t>;
- auto compare_by_value = [](const file_by_value_pair_t& x, const file_by_value_pair_t& y)
- {
- return x.first > y.first;
- };
- std::priority_queue<
- file_by_value_pair_t,
- std::vector<file_by_value_pair_t>,
- decltype(compare_by_value)> file_by_value(compare_by_value);
- for (size_t i = 0, size = sorted_files.size(); i < size; ++i)
- {
- const auto value = read_value(sorted_files[i]);
- file_by_value.push(std::make_pair(value, i));
- }
- while (!file_by_value.empty())
- {
- const auto min_file_pair = file_by_value.top();
- file_by_value.pop();
- const auto min = min_file_pair.first;
- const auto file_index = min_file_pair.second;
- auto& file = sorted_files[file_index];
- while (true)
- {
- write_value(destination, min);
- const auto value = read_value(file);
- if (file.eof())
- {
- break;
- }
- if (value != min)
- {
- file_by_value.push(std::make_pair(value, file_index));
- break;
- }
- }
- }
- }
- value_t read_value(std::ifstream& file) const
- {
- value_t value;
- if (!file.read(reinterpret_cast<char*>(&value), sizeof(value_t)))
- {
- if (!file.eof())
- {
- throw std::runtime_error("unable to read temporary file");
- }
- }
- return value;
- }
- void write_value(std::ofstream& file, const value_t& value) const
- {
- if (!file.write(reinterpret_cast<const char*>(&value), sizeof(value_t)))
- {
- throw std::runtime_error("unable to write destination file");
- }
- }
- void reset()
- {
- total_chunks_ = 0;
- error_ = false;
- while (!free_chunks_.empty())
- free_chunks_.pop();
- for (size_t i = 0, size = chunks_count_; i < size; ++i)
- free_chunks_.push(i);
- file_buf_.resize(chunk_size_);
- }
- chunk_list_t make_chunks(size_t count, size_t size) const
- {
- std::vector<chunk_t> chunks;
- for (auto i = 0u; i < count; ++i)
- {
- chunk_t new_chunk;
- new_chunk.reserve(size);
- chunks.push_back(std::move(new_chunk));
- }
- return chunks; // RVO Section 12.8 of n3337 standard draft (C++11)
- }
- std::string get_sorted_chunk_file_name(int n) const
- {
- return "sorted_" + std::to_string(n) + ".tmp";
- }
- void sort_and_save(const std::string& file_name, size_t chunk_index)
- {
- auto& chunk = chunks_[chunk_index];
- std::sort(std::begin(chunk), std::end(chunk));
- std::ofstream sorted(file_name, std::ios::binary);
- if (!sorted || !sorted.write(reinterpret_cast<char*>(chunk.data()), chunk.size() * sizeof(value_t)))
- {
- error_ = true;
- }
- std::unique_lock<std::mutex> lock(mutex_);
- free_chunks_.push(chunk_index);
- chunk_processed_.notify_one();
- };
- const size_t chunks_count_;
- const size_t chunk_size_;
- chunk_list_t chunks_;
- std::stack<size_t> free_chunks_;
- chunk_t file_buf_;
- int total_chunks_;
- std::atomic<bool> error_;
- std::mutex mutex_;
- std::condition_variable chunk_processed_;
- };
- int main(int argc, char* argv[])
- {
- const auto source_file_name = "input";
- const auto destination_file_name = "output";
- const size_t preffered_mem_size = 128 * 1024 * 1024;
- try
- {
- const auto start = std::chrono::steady_clock().now();
- file_sorter<int32_t> sorter(preffered_mem_size);
- sorter.sort(source_file_name, destination_file_name);
- const auto finish = std::chrono::steady_clock().now();
- std::cout << "completed in " << std::chrono::duration_cast<std::chrono::milliseconds>(finish - start).count() << " ms\n";
- }
- catch (const std::runtime_error& error)
- {
- std::cerr << error.what() << '\n';
- return exit_code::file_error;
- }
- catch (const std::bad_alloc&)
- {
- std::cerr << "not enough memory! need at least " << preffered_mem_size << " bytes of free memory\n";
- return exit_code::not_enoght_memory;
- }
- catch (const std::exception& e)
- {
- std::cerr << e.what() << '\n';
- return exit_code::unexpected_error;
- }
- catch (...)
- {
- std::cerr << "something wrong\n";
- return exit_code::unexpected_error;
- }
- return exit_code::success;
- }
Add Comment
Please, Sign In to add comment