Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import os
- import hashlib
- import platform
- import concurrent.futures
- from concurrent.futures import ProcessPoolExecutor, as_completed
- from tqdm import tqdm
- def compute_hash(filepath, algorithm):
- """Вычисляет хеш файла с использованием указанного алгоритма."""
- try:
- hash_obj = hashlib.new(algorithm)
- with open(filepath, 'rb') as f:
- while chunk := f.read(4096 * 16):
- hash_obj.update(chunk)
- return {'status': 'success', 'hash': hash_obj.hexdigest(), 'path': filepath}
- except Exception as e:
- return {'status': 'error', 'path': filepath, 'message': str(e)}
- def normalize_path(path):
- """Обработка длинных путей для Windows."""
- if platform.system() == 'Windows':
- path = os.path.abspath(path)
- if len(path) > 260 and not path.startswith('\\\\?\\'):
- return '\\\\?\\' + path
- return os.path.abspath(path)
- def get_files(path):
- """Рекурсивный поиск файлов с обработкой специальных символов."""
- try:
- path = normalize_path(path)
- if os.path.isfile(path):
- return [path]
- file_list = []
- for root, _, files in os.walk(path):
- for file in files:
- full_path = os.path.join(root, file)
- file_list.append(normalize_path(full_path))
- return file_list
- except Exception as e:
- raise ValueError(f"Ошибка обхода каталога: {str(e)}")
- def process_batch(files, algorithm):
- """Обработка пакета файлов в отдельном процессе."""
- return [compute_hash(f, algorithm) for f in files]
- def main():
- # Ввод параметров
- scan_path = input("1) Укажите путь для сканирования: ").strip()
- if not os.path.exists(scan_path):
- print("Ошибка: Указанный путь не существует")
- return
- algorithms = ['sha256', 'sha3-256', 'blake2s', 'md5']
- print("\n2) Выберите алгоритм хеширования:")
- for i, alg in enumerate(algorithms, 1):
- print(f"{i}. {alg}")
- try:
- choice = int(input("Номер алгоритма: ")) - 1
- algorithm = algorithms[choice].replace('-', '_')
- except (ValueError, IndexError):
- print("Ошибка: Некорректный выбор алгоритма")
- return
- save_dir = input("\n3) Путь для сохранения результатов: ").strip()
- if not os.path.isdir(save_dir):
- print("Ошибка: Каталог для сохранения не существует")
- return
- # Получение списка файлов
- try:
- files = get_files(scan_path)
- if not files:
- print("Нет файлов для обработки")
- return
- except Exception as e:
- print(f"Ошибка: {str(e)}")
- return
- # Настройка многопроцессорности
- cpu_count = os.cpu_count() or 4
- batch_size = max(len(files) // (cpu_count * 4), 1)
- batches = [files[i:i+batch_size] for i in range(0, len(files), batch_size)]
- # Инициализация файлов результатов
- save_path = os.path.join(save_dir, f"checksums.{algorithms[choice]}")
- error_log_path = os.path.join(save_dir, "hash_errors.log")
- stats = {
- 'total': len(files),
- 'processed': 0,
- 'errors': 0,
- 'error_list': []
- }
- with ProcessPoolExecutor(max_workers=cpu_count) as executor, \
- open(save_path, 'w', encoding='utf-8') as output_file, \
- tqdm(total=stats['total'], desc="Обработка файлов", unit="file") as pbar:
- # Запуск задач обработки
- futures = [executor.submit(process_batch, batch, algorithm) for batch in batches]
- # Обработка результатов
- for future in as_completed(futures):
- try:
- results = future.result()
- for result in results:
- if result['status'] == 'success':
- output_file.write(f"{result['hash']} {result['path']}\n")
- stats['processed'] += 1
- else:
- stats['error_list'].append(result)
- stats['errors'] += 1
- pbar.update(1)
- except Exception as e:
- print(f"\nОшибка обработки пакета: {str(e)}")
- # Запись лога ошибок
- if stats['error_list']:
- with open(error_log_path, 'w', encoding='utf-8') as err_file:
- err_file.write("Список ошибок при обработке файлов:\n\n")
- for error in stats['error_list']:
- err_file.write(f"Файл: {error['path']}\nОшибка: {error['message']}\n\n")
- # Формирование отчета
- print("\n" + "=" * 50)
- print(f"Итоговый отчет:")
- print(f"Всего файлов: {stats['total']}")
- print(f"Успешно обработано: {stats['processed']}")
- print(f"Файлов с ошибками: {stats['errors']}")
- if stats['error_list']:
- print(f"\nСписок ошибок сохранен в: {error_log_path}")
- print("Первые 5 ошибок:")
- for error in stats['error_list'][:5]:
- print(f"• {error['path']}\n → {error['message']}")
- print(f"\nФайл с хеш-суммами: {save_path}")
- print("=" * 50)
- # Пауза перед завершением
- if platform.system() == 'Windows':
- os.system('pause')
- else:
- input("\nНажмите Enter для выхода...")
- if __name__ == "__main__":
- main()
Add Comment
Please, Sign In to add comment