Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /*
- * Программа для генерации сигнатуры указанного файла.
- * Сигнатура генерируется следующим образом: исходный файл делится на блоки равной (фиксированной) длины blockSize
- * (если размер файла не кратен размеру блока, последний фрагмент дополнен нулями до размера полного блока).
- * Для каждого блока вычисляется значение CRC функции и дописывается в выходной файл-сигнатуру.
- *
- * Программа делится на THREADS_COUNT потоков. Один из них работает над загрузкой входных данных в хранилища,
- * а остальные вычисляют CRC хэш и записывают его в выходной файл. На каждом этапе,поток загрузки забирает одно из
- * свободных хранилищ, заполняет его, после чего освобождает и помечает как необработанный. Этот процесс выполняется до
- * тех пор пока файл не кончится. Потоки обработки в это время ждут необработанное хранилище, и если оно становится доступно, то
- * все данные между этими хранилищами делятся на части и обрабатываются. После обработки данных, потоки обработки объединяют данные
- * и сохраняют их в сигнатурный файл. После чего, процесс потовряется до тех пор, пока в очереди есть необработанные хранилища
- * или пока исходный входной файл ещё не полностью загружен.
- *
- * Аргументы командной строки:
- * argv[1] - путь до исходного файла, для которого требуется сигнатура
- * argv[2] - путь до выходного сигнатурного файла
- * argv[3] - размер блока
- *
- *
- * Для регулировки производительности используйте константы:
- * В файле class_storage.h:
- * NUMBER_BLOCKS_IN_STORAGE - Количество блоков в хранилище
- * NUMBER_OF_STORAGE - Максимальное количество хранилищ
- * В файле main.cpp:
- * THREADS_COUNT - Количество потоков
- */
- #include <stdio.h>
- #include <iostream>
- #include <fstream>
- #include <boost/crc.hpp>
- #include <queue>
- #include <sstream>
- #include <thread>
- #include <queue>
- #include <array>
- #include <mutex>
- #include <atomic>
- #include <map>
- #include <assert.h>
- #include <condition_variable>
- #include "exception.h"
- const int THREADS_COUNT = 4; // Количество потоков. Должно быть больше или равно 2
- const int READ_BLOCK_SIZE = 1024*1024;
- const int CALC_BLOCK_SIZE = 1024;
- const int QUEUE_SIZE = 1024;
- std::string inputFilePath;
- std::string outputFilePath;
- // Флаг сигнализирует о завершении входного файла. Требуется для вычислительных потоков.
- bool eofFound = false;
- std::atomic<int> calc_finished(0);
- typedef std::array<uint8_t, READ_BLOCK_SIZE> block;
- typedef std::unique_ptr<block> block_ptr;
- std::condition_variable data_cond;
- std::mutex data_m;
- std::queue<std::pair<int, block_ptr> > data_q;
- std::condition_variable hash_cond;
- std::mutex hash_m;
- std::queue<std::pair<uint64_t, uint64_t> > hash_q;
- void read_thread();
- void calc_thread(int thread_num);
- void write_thread();
- bool not_filled_data_queue() {return data_q.size() <= QUEUE_SIZE;}
- bool not_empty_data_queue() {return eofFound || data_q.size() > 0;}
- bool not_empty_hash_queue() {return calc_finished == THREADS_COUNT || hash_q.size() > 0;}
- int main(int argc, char *argv[])
- {
- auto start = std::chrono::steady_clock::now();
- try {
- if (argc < 3) {
- throw Exception("Moar arguments!!!1", 1);
- }
- inputFilePath = argv[1];
- outputFilePath = argv[2];
- if (THREADS_COUNT < 1) {
- throw Exception("Number of threads is invalid", 1);
- }
- // Разделяем на два потока. Один будет отвечать за загрузку данных из файла,
- // а второй за вычисление CRC и сохранение его в файл.
- std::thread reader(read_thread);
- std::array<std::thread, THREADS_COUNT> pool;
- for (int i = 0; i < THREADS_COUNT; i++) {
- std::thread calculator(calc_thread, i);
- pool[i] = std::move(calculator);
- }
- std::thread writer(write_thread);
- reader.join();
- for (int i = 0; i < THREADS_COUNT; i++) {
- pool[i].join();
- }
- writer.join();
- } catch (Exception &e) {
- std::cout << "ERROR: " << e.getMessage() << std::endl;
- return e.getCode();
- }
- auto end = std::chrono::steady_clock::now();
- auto diff = end - start;
- std::cout << "Working time is " << std::chrono::duration<double, std::milli>(diff).count() << std::endl;
- return 0;
- }
- void read_thread() {
- std::ifstream fsource;
- uint64_t block_count = 0;
- fsource.open(inputFilePath, std::ios::binary);
- if (!fsource) {
- throw Exception("File not found", -1);
- }
- // Читаем исходные данные пока не наступаит конец файла
- while (!fsource.eof()) {
- block_ptr buffer(new block);
- buffer->fill(0);
- fsource.read((char*) buffer->data(), READ_BLOCK_SIZE);
- // std::fill(buffer->begin() + fsource.gcount(), buffer->end(), 0);
- std::unique_lock<std::mutex> locker(data_m);
- data_cond.wait(locker, not_filled_data_queue);
- data_q.push(std::make_pair(block_count, std::move(buffer)));
- locker.unlock();
- data_cond.notify_all();
- block_count++;
- }
- eofFound = true;
- data_cond.notify_all();
- fsource.close();
- std::cout << "Main thread has finished work " << block_count << std::endl;
- }
- void calc_thread(int thread_num) {
- std::cout << "Calc thread #" << thread_num << " is up" << std::endl;
- // Обрабатываем данные пока не наступит конец файла или пока ещё есть необработанные хранилища
- while (true) {
- // Получаем хранилище для обработки
- std::unique_lock<std::mutex> locker(data_m);
- data_cond.wait(locker, not_empty_data_queue);
- if (eofFound && data_q.empty()) {
- break;
- }
- std::pair<uint64_t, block_ptr> payload = std::move(data_q.front());
- data_q.pop();
- locker.unlock();
- data_cond.notify_all();
- auto buffer = std::move(payload.second);
- uint64_t block_count = payload.first;
- // Вычисляем CRC
- int limit = READ_BLOCK_SIZE/CALC_BLOCK_SIZE;
- std::vector<uint64_t> hashes;
- for (int i = 0; i < limit; i++) {
- const void* data = (const void*) (buffer->data() + i*CALC_BLOCK_SIZE);
- uint64_t checkSumCRC = boost::crc<64, 0x42F0E1EBA9EA3693, 0xFFFFFFFFFFFFFFF, 0, false, false>(data, CALC_BLOCK_SIZE);
- // cout << "CRC" << block_count*limit + i << ":" << checkSumCRC << " THREAD: " << thread_num << endl;
- hashes.push_back(checkSumCRC);
- }
- hash_m.lock();
- for (int i = 0; i < limit; i++) {
- hash_q.push(std::make_pair(block_count*limit + i, hashes[i]));
- }
- hash_m.unlock();
- hash_cond.notify_one();
- }
- calc_finished++;
- hash_cond.notify_one();
- std::cout << "Calc thread #" << thread_num << " has finished work." << std::endl;
- }
- void write_thread() {
- std::ofstream fsignature;
- fsignature.open(outputFilePath, std::ios::binary);
- std::map<uint64_t, std::string> hashes;
- int last_written_block = -1;
- // Записываем данные в файл
- while (true) {
- // Получаем хранилище для обработки
- std::unique_lock<std::mutex> locker(hash_m);
- hash_cond.wait(locker, not_empty_hash_queue);
- if (calc_finished == THREADS_COUNT && hash_q.empty()) {
- break;
- }
- std::pair<uint64_t, uint64_t> payload = hash_q.front();
- uint64_t hash = payload.second;
- hash_q.pop();
- locker.unlock();
- std::string hash_str = std::to_string(hash);
- hashes[payload.first] = hash_str;
- auto it = hashes.begin();
- while (it != hashes.end()) {
- uint64_t number = it->first;
- if (number != (uint64_t) (last_written_block + 1)) {
- break;
- }
- std::string hash = it->second;
- fsignature << hash;
- last_written_block++;
- hashes.erase(it++);
- }
- }
- assert(hashes.empty());
- std::cout << "File written" << std::endl;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement