Advertisement
artyom_h31

Untitled

Mar 14th, 2017
260
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 9.48 KB | None | 0 0
  1. /*
  2.  * Программа для генерации сигнатуры указанного файла.
  3.  * Сигнатура генерируется следующим образом: исходный файл делится на блоки равной (фиксированной) длины blockSize
  4.  * (если размер файла не кратен размеру блока, последний фрагмент дополнен нулями до размера полного блока).
  5.  * Для каждого блока вычисляется значение CRC функции и дописывается в выходной файл-сигнатуру.
  6.  *
  7.  * Программа делится на THREADS_COUNT потоков. Один из них работает над загрузкой входных данных в хранилища,
  8.  * а остальные вычисляют CRC хэш и записывают его в выходной файл. На каждом этапе,поток загрузки забирает одно из
  9.  * свободных хранилищ, заполняет его, после чего освобождает и помечает как необработанный. Этот процесс выполняется до
  10.  * тех пор пока файл не кончится. Потоки обработки в это время ждут необработанное хранилище, и если оно становится доступно, то
  11.  * все данные между этими хранилищами делятся на части и обрабатываются. После обработки данных, потоки обработки объединяют данные
  12.  * и сохраняют их в сигнатурный файл. После чего, процесс потовряется до тех пор, пока в очереди есть необработанные хранилища
  13.  * или пока исходный входной файл ещё не полностью загружен.
  14.  *
  15.  * Аргументы командной строки:
  16.  * argv[1] - путь до исходного файла, для которого требуется сигнатура
  17.  * argv[2] - путь до выходного сигнатурного файла
  18.  * argv[3] - размер блока
  19.  *
  20.  *
  21.  * Для регулировки производительности используйте константы:
  22.  * В файле class_storage.h:
  23.  *      NUMBER_BLOCKS_IN_STORAGE - Количество блоков в хранилище
  24.  *      NUMBER_OF_STORAGE - Максимальное количество хранилищ
  25.  * В файле main.cpp:
  26.  *      THREADS_COUNT - Количество потоков
  27.  */
  28.  
  29. #include <stdio.h>
  30. #include <iostream>
  31. #include <fstream>
  32. #include <boost/crc.hpp>
  33. #include <queue>
  34. #include <sstream>
  35. #include <thread>
  36. #include <queue>
  37. #include <array>
  38. #include <mutex>
  39. #include <atomic>
  40. #include <map>
  41. #include <assert.h>
  42. #include <condition_variable>
  43. #include "exception.h"
  44.  
  45. const int THREADS_COUNT = 4; // Количество потоков. Должно быть больше или равно 2
  46. const int READ_BLOCK_SIZE = 1024*1024;
  47. const int CALC_BLOCK_SIZE = 1024;
  48. const int QUEUE_SIZE = 1024;
  49.  
  50. std::string inputFilePath;
  51. std::string outputFilePath;
  52.  
  53. // Флаг сигнализирует о завершении входного файла. Требуется для вычислительных потоков.
  54. bool eofFound = false;
  55. std::atomic<int> calc_finished(0);
  56.  
  57. typedef std::array<uint8_t, READ_BLOCK_SIZE> block;
  58. typedef std::unique_ptr<block> block_ptr;
  59.  
  60. std::condition_variable data_cond;
  61. std::mutex data_m;
  62. std::queue<std::pair<int, block_ptr> > data_q;
  63.  
  64. std::condition_variable hash_cond;
  65. std::mutex hash_m;
  66. std::queue<std::pair<uint64_t, uint64_t> > hash_q;
  67.  
  68. void read_thread();
  69. void calc_thread(int thread_num);
  70. void write_thread();
  71.  
  72. bool not_filled_data_queue() {return data_q.size() <= QUEUE_SIZE;}
  73. bool not_empty_data_queue() {return eofFound || data_q.size() > 0;}
  74. bool not_empty_hash_queue() {return calc_finished == THREADS_COUNT || hash_q.size() > 0;}
  75.  
  76. int main(int argc, char *argv[])
  77. {
  78.     auto start = std::chrono::steady_clock::now();
  79.  
  80.     try {
  81.         if (argc < 3) {
  82.             throw Exception("Moar arguments!!!1", 1);
  83.         }
  84.  
  85.         inputFilePath = argv[1];
  86.         outputFilePath = argv[2];
  87.  
  88.         if (THREADS_COUNT < 1) {
  89.             throw Exception("Number of threads is invalid", 1);
  90.         }
  91.  
  92.         // Разделяем на два потока. Один будет отвечать за загрузку данных из файла,
  93.         // а второй за вычисление CRC и сохранение его в файл.
  94.         std::thread reader(read_thread);
  95.         std::array<std::thread, THREADS_COUNT> pool;
  96.         for (int i = 0; i < THREADS_COUNT; i++) {
  97.             std::thread calculator(calc_thread, i);
  98.             pool[i] = std::move(calculator);
  99.         }
  100.         std::thread writer(write_thread);
  101.         reader.join();
  102.         for (int i = 0; i < THREADS_COUNT; i++) {
  103.             pool[i].join();
  104.         }
  105.         writer.join();
  106.     } catch (Exception &e) {
  107.         std::cout << "ERROR: " << e.getMessage() << std::endl;
  108.         return e.getCode();
  109.     }
  110.     auto end = std::chrono::steady_clock::now();
  111.     auto diff = end - start;
  112.  
  113.     std::cout << "Working time is " << std::chrono::duration<double, std::milli>(diff).count() << std::endl;
  114.     return 0;
  115. }
  116.  
  117. void read_thread() {
  118.     std::ifstream fsource;
  119.     uint64_t block_count = 0;
  120.  
  121.     fsource.open(inputFilePath, std::ios::binary);
  122.     if (!fsource) {
  123.         throw Exception("File not found", -1);
  124.     }
  125.  
  126.     // Читаем исходные данные пока не наступаит конец файла
  127.     while (!fsource.eof()) {
  128.         block_ptr buffer(new block);
  129.         buffer->fill(0);
  130.         fsource.read((char*) buffer->data(), READ_BLOCK_SIZE);
  131.         // std::fill(buffer->begin() + fsource.gcount(), buffer->end(), 0);
  132.  
  133.         std::unique_lock<std::mutex> locker(data_m);
  134.         data_cond.wait(locker, not_filled_data_queue);
  135.         data_q.push(std::make_pair(block_count, std::move(buffer)));
  136.         locker.unlock();
  137.         data_cond.notify_all();
  138.  
  139.         block_count++;
  140.     }
  141.     eofFound = true;
  142.     data_cond.notify_all();
  143.  
  144.     fsource.close();
  145.     std::cout << "Main thread has finished work " << block_count << std::endl;
  146. }
  147.  
  148. void calc_thread(int thread_num) {
  149.     std::cout << "Calc thread #" << thread_num << " is up" << std::endl;
  150.     // Обрабатываем данные пока не наступит конец файла или пока ещё есть необработанные хранилища
  151.     while (true) {
  152.         // Получаем хранилище для обработки
  153.         std::unique_lock<std::mutex> locker(data_m);
  154.         data_cond.wait(locker, not_empty_data_queue);
  155.         if (eofFound && data_q.empty()) {
  156.             break;
  157.         }
  158.         std::pair<uint64_t, block_ptr> payload = std::move(data_q.front());
  159.         data_q.pop();
  160.         locker.unlock();
  161.         data_cond.notify_all();
  162.  
  163.         auto buffer = std::move(payload.second);
  164.         uint64_t block_count = payload.first;
  165.         // Вычисляем CRC
  166.         int limit = READ_BLOCK_SIZE/CALC_BLOCK_SIZE;
  167.         std::vector<uint64_t> hashes;
  168.         for (int i = 0; i < limit; i++) {
  169.             const void* data = (const void*) (buffer->data() + i*CALC_BLOCK_SIZE);
  170.             uint64_t checkSumCRC = boost::crc<64, 0x42F0E1EBA9EA3693, 0xFFFFFFFFFFFFFFF, 0, false, false>(data, CALC_BLOCK_SIZE);
  171.             // cout << "CRC" << block_count*limit + i << ":" << checkSumCRC << " THREAD: " << thread_num << endl;
  172.             hashes.push_back(checkSumCRC);
  173.         }
  174.  
  175.         hash_m.lock();
  176.         for (int i = 0; i < limit; i++) {
  177.             hash_q.push(std::make_pair(block_count*limit + i, hashes[i]));
  178.         }
  179.         hash_m.unlock();
  180.         hash_cond.notify_one();
  181.     }
  182.     calc_finished++;
  183.     hash_cond.notify_one();
  184.     std::cout << "Calc thread #" << thread_num << " has finished work." << std::endl;
  185. }
  186.  
  187. void write_thread() {
  188.     std::ofstream fsignature;
  189.     fsignature.open(outputFilePath, std::ios::binary);
  190.     std::map<uint64_t, std::string> hashes;
  191.  
  192.     int last_written_block = -1;
  193.  
  194.     // Записываем данные в файл
  195.     while (true) {
  196.         // Получаем хранилище для обработки
  197.         std::unique_lock<std::mutex> locker(hash_m);
  198.         hash_cond.wait(locker, not_empty_hash_queue);
  199.         if (calc_finished == THREADS_COUNT && hash_q.empty()) {
  200.             break;
  201.         }
  202.         std::pair<uint64_t, uint64_t> payload = hash_q.front();
  203.         uint64_t hash = payload.second;
  204.         hash_q.pop();
  205.         locker.unlock();
  206.  
  207.         std::string hash_str = std::to_string(hash);
  208.         hashes[payload.first] = hash_str;
  209.  
  210.         auto it = hashes.begin();
  211.         while (it != hashes.end()) {
  212.             uint64_t number = it->first;
  213.             if (number != (uint64_t) (last_written_block + 1)) {
  214.                 break;
  215.             }
  216.             std::string hash = it->second;
  217.             fsignature << hash;
  218.             last_written_block++;
  219.             hashes.erase(it++);
  220.         }
  221.     }
  222.     assert(hashes.empty());
  223.  
  224.     std::cout << "File written" << std::endl;
  225. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement