Guest User

Untitled

a guest
Nov 18th, 2017
77
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 8.22 KB | None | 0 0
  1. #include <array>
  2. #include <algorithm>
  3. #include <atomic>
  4. #include <cassert>
  5. #include <chrono>
  6. #include <condition_variable>
  7. #include <iostream>
  8. #include <fstream>
  9. #include <queue>
  10. #include <mutex>
  11. #include <stack>
  12. #include <string>
  13. #include <thread>
  14. #include <vector>
  15.  
  16. struct exit_code
  17. {
  18. enum
  19. {
  20. success,
  21. invalid_usage,
  22. file_error,
  23. not_enoght_memory,
  24. unexpected_error
  25. };
  26. };
  27.  
  28. template <typename T>
  29. class file_sorter final
  30. {
  31. public:
  32. explicit file_sorter(
  33. size_t working_memory_size,
  34. size_t used_cores = std::thread::hardware_concurrency())
  35. : chunks_count_(used_cores)
  36. , chunk_size_(working_memory_size / (used_cores + 1) / sizeof(value_t)) // +1 is file buffer
  37. , total_chunks_(0)
  38. {
  39. chunks_ = make_chunks(chunks_count_, chunk_size_);
  40. }
  41.  
  42. file_sorter(const file_sorter&) = delete;
  43. file_sorter& operator=(const file_sorter&) = delete;
  44.  
  45. file_sorter(file_sorter&&) = delete;
  46. file_sorter& operator=(file_sorter&&) = delete;
  47.  
  48. void sort(const std::string& source_file_name, const std::string& destination_file_name)
  49. {
  50. reset();
  51.  
  52. std::ifstream source(source_file_name, std::ios::binary);
  53. if (!source)
  54. {
  55. throw std::runtime_error("can't open '" + source_file_name + "'");
  56. }
  57.  
  58. while (!source.eof())
  59. {
  60. if (!source.read(reinterpret_cast<char*>(file_buf_.data()), file_buf_.size() * sizeof(value_t)))
  61. {
  62. if (!source.eof())
  63. {
  64. throw std::runtime_error("can't read '" + source_file_name + "'");
  65. }
  66. }
  67.  
  68. if (error_)
  69. {
  70. throw std::runtime_error("unable to write temporary file");
  71. }
  72.  
  73. const auto bytes_readed = static_cast<size_t>(source.gcount());
  74. file_buf_.resize(bytes_readed / sizeof(value_t));
  75.  
  76. std::unique_lock<std::mutex> lock(mutex_);
  77. while (free_chunks_.empty())
  78. chunk_processed_.wait(lock);
  79.  
  80. const auto chunk_index = free_chunks_.top();
  81. free_chunks_.pop();
  82.  
  83. chunks_[chunk_index].swap(file_buf_);
  84. file_buf_.resize(chunk_size_);
  85.  
  86. std::thread thread(
  87. std::bind(&file_sorter::sort_and_save, this, get_sorted_chunk_file_name(total_chunks_), chunk_index));
  88. thread.detach();
  89.  
  90. ++total_chunks_;
  91. }
  92.  
  93. // waiting for the async operations completion
  94. std::unique_lock<std::mutex> lock(mutex_);
  95. while (free_chunks_.size() < chunks_count_)
  96. chunk_processed_.wait(lock);
  97.  
  98. std::ofstream destination(destination_file_name, std::ios::binary);
  99. if (!destination)
  100. {
  101. throw std::runtime_error("can't open '" + destination_file_name + "'");
  102. }
  103.  
  104. merge_and_save(destination);
  105.  
  106. cleanup();
  107. }
  108.  
  109. private:
  110. using value_t = T;
  111. using chunk_t = std::vector<value_t>;
  112. using chunk_list_t = std::vector<chunk_t>;
  113.  
  114. void cleanup() const
  115. {
  116. for (auto i = 0; i < total_chunks_; ++i)
  117. {
  118. std::remove(get_sorted_chunk_file_name(i).c_str());
  119. }
  120. }
  121.  
  122. void merge_and_save(std::ofstream& destination) const
  123. {
  124. std::vector<std::ifstream> sorted_files;
  125. for (auto i = 0; i < total_chunks_; ++i)
  126. {
  127. sorted_files.emplace_back(get_sorted_chunk_file_name(i), std::ios::binary);
  128. if (!sorted_files.back())
  129. {
  130. throw std::runtime_error("can't open temporary file");
  131. }
  132. }
  133.  
  134. using file_by_value_pair_t = std::pair<value_t, size_t>;
  135. auto compare_by_value = [](const file_by_value_pair_t& x, const file_by_value_pair_t& y)
  136. {
  137. return x.first > y.first;
  138. };
  139.  
  140. std::priority_queue<
  141. file_by_value_pair_t,
  142. std::vector<file_by_value_pair_t>,
  143. decltype(compare_by_value)> file_by_value(compare_by_value);
  144.  
  145. for (size_t i = 0, size = sorted_files.size(); i < size; ++i)
  146. {
  147. const auto value = read_value(sorted_files[i]);
  148. file_by_value.push(std::make_pair(value, i));
  149. }
  150.  
  151. while (!file_by_value.empty())
  152. {
  153. const auto min_file_pair = file_by_value.top();
  154. file_by_value.pop();
  155.  
  156. const auto min = min_file_pair.first;
  157. const auto file_index = min_file_pair.second;
  158. auto& file = sorted_files[file_index];
  159.  
  160. while (true)
  161. {
  162. write_value(destination, min);
  163.  
  164. const auto value = read_value(file);
  165.  
  166. if (file.eof())
  167. {
  168. break;
  169. }
  170.  
  171. if (value != min)
  172. {
  173. file_by_value.push(std::make_pair(value, file_index));
  174. break;
  175. }
  176. }
  177. }
  178. }
  179.  
  180. value_t read_value(std::ifstream& file) const
  181. {
  182. value_t value;
  183. if (!file.read(reinterpret_cast<char*>(&value), sizeof(value_t)))
  184. {
  185. if (!file.eof())
  186. {
  187. throw std::runtime_error("unable to read temporary file");
  188. }
  189. }
  190. return value;
  191. }
  192.  
  193. void write_value(std::ofstream& file, const value_t& value) const
  194. {
  195. if (!file.write(reinterpret_cast<const char*>(&value), sizeof(value_t)))
  196. {
  197. throw std::runtime_error("unable to write destination file");
  198. }
  199. }
  200.  
  201. void reset()
  202. {
  203. total_chunks_ = 0;
  204. error_ = false;
  205.  
  206. while (!free_chunks_.empty())
  207. free_chunks_.pop();
  208.  
  209. for (size_t i = 0, size = chunks_count_; i < size; ++i)
  210. free_chunks_.push(i);
  211.  
  212. file_buf_.resize(chunk_size_);
  213. }
  214.  
  215. chunk_list_t make_chunks(size_t count, size_t size) const
  216. {
  217. std::vector<chunk_t> chunks;
  218.  
  219. for (auto i = 0u; i < count; ++i)
  220. {
  221. chunk_t new_chunk;
  222. new_chunk.reserve(size);
  223. chunks.push_back(std::move(new_chunk));
  224. }
  225.  
  226. return chunks; // RVO Section 12.8 of n3337 standard draft (C++11)
  227. }
  228.  
  229. std::string get_sorted_chunk_file_name(int n) const
  230. {
  231. return "sorted_" + std::to_string(n) + ".tmp";
  232. }
  233.  
  234. void sort_and_save(const std::string& file_name, size_t chunk_index)
  235. {
  236. auto& chunk = chunks_[chunk_index];
  237.  
  238. std::sort(std::begin(chunk), std::end(chunk));
  239.  
  240. std::ofstream sorted(file_name, std::ios::binary);
  241. if (!sorted || !sorted.write(reinterpret_cast<char*>(chunk.data()), chunk.size() * sizeof(value_t)))
  242. {
  243. error_ = true;
  244. }
  245.  
  246. std::unique_lock<std::mutex> lock(mutex_);
  247. free_chunks_.push(chunk_index);
  248. chunk_processed_.notify_one();
  249. };
  250.  
  251. const size_t chunks_count_;
  252. const size_t chunk_size_;
  253.  
  254. chunk_list_t chunks_;
  255. std::stack<size_t> free_chunks_;
  256.  
  257. chunk_t file_buf_;
  258.  
  259. int total_chunks_;
  260. std::atomic<bool> error_;
  261.  
  262. std::mutex mutex_;
  263. std::condition_variable chunk_processed_;
  264. };
  265.  
  266. int main(int argc, char* argv[])
  267. {
  268. const auto source_file_name = "input";
  269. const auto destination_file_name = "output";
  270.  
  271. const size_t preffered_mem_size = 128 * 1024 * 1024;
  272.  
  273. try
  274. {
  275. const auto start = std::chrono::steady_clock().now();
  276.  
  277. file_sorter<int32_t> sorter(preffered_mem_size);
  278. sorter.sort(source_file_name, destination_file_name);
  279.  
  280. const auto finish = std::chrono::steady_clock().now();
  281. std::cout << "completed in " << std::chrono::duration_cast<std::chrono::milliseconds>(finish - start).count() << " ms\n";
  282. }
  283. catch (const std::runtime_error& error)
  284. {
  285. std::cerr << error.what() << '\n';
  286. return exit_code::file_error;
  287. }
  288. catch (const std::bad_alloc&)
  289. {
  290. std::cerr << "not enough memory! need at least " << preffered_mem_size << " bytes of free memory\n";
  291. return exit_code::not_enoght_memory;
  292. }
  293. catch (const std::exception& e)
  294. {
  295. std::cerr << e.what() << '\n';
  296. return exit_code::unexpected_error;
  297. }
  298. catch (...)
  299. {
  300. std::cerr << "something wrong\n";
  301. return exit_code::unexpected_error;
  302. }
  303.  
  304. return exit_code::success;
  305. }
Add Comment
Please, Sign In to add comment