Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import difflib
- import glob
- import json
- import logging
- import os
- from pathlib import Path
- import re
- import shutil
- import structlog
- import sys
- import time
- import torrent_parser
- '''
- requirements to install
- six==1.12.0
- structlog==19.2.0
- torrent-parser==0.3.0
- '''
- ##########################VARIABELS##############################################
- # set to True if you want to move the files (set to False so you can test first)
- move_files = False
- # confidence level == used for not matching filenames, set threshold level for accepting modified filename
- confidence_level = 0.9
- # set this to True if you want to clean filenames, which removes one of [] and one of () if it's present in the file name -- does not remove multiple, only first appearance
- # used for matching filenames and determining confidence level
- cleaning = False
- # path to top folder with torrent files to load -- example: "C:\\torrents\\"
- torrents_path = ""
- extension = ".torrent"
- # list of all top folder paths to folders with video files to check -- example: ["C:\\Documents\\files\\", "C:\\myfiles\\myvids\\"]
- video_path = []
- # path to move matched video files to -- example: "C:\\Documents\\\\moved\\"
- move_path = ""
- ##########################LOGGER##############################################
- runtime = time.localtime()
- current_time = time.strftime('%m-%d-%H-%M-%S', runtime)
- log_mode = 'standard'
- if log_mode == 'standard':
- log_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
- handler = logging.StreamHandler(sys.stdout)
- handler.setFormatter(log_formatter)
- fh = logging.FileHandler(f'{current_time} - log.log')
- fh.setFormatter(log_formatter)
- logger = logging.getLogger()
- logger.addHandler(fh)
- logger.addHandler(handler)
- logger.setLevel(logging.INFO)
- ##########################FUNCTIONS##############################################
- def remove_nested_parens(input_str, char):
- """Returns a copy of 'input_str' with any text removed between specified characters. Nested parentheses are handled."""
- result = ''
- paren_level = 0
- for ch in input_str:
- if ch == char[0]:
- paren_level += 1
- elif (ch == char[1]) and paren_level:
- paren_level -= 1
- elif not paren_level:
- result += ch
- return result
- def clean_confidence(file_name, torrent_file_name):
- file_name_clean = file_name
- name_clean = torrent_file_name
- if cleaning:
- for char in [['[',']'],['(',')']]: #remove text between these characters [] and ()
- file_name_clean = remove_nested_parens(file_name_clean, char)
- name_clean = remove_nested_parens(name_clean, char)
- # test similarity based on torrent name and file name
- confidence = difflib.SequenceMatcher(None,file_name_clean, name_clean).ratio()
- return confidence
- ##############################CODE##########################################
- #grab all torrent files inside torrents_path folder and subfolders
- torrent_files = list()
- for filename in Path(torrents_path).glob('**/*{}'.format(extension)):
- torrent_files.append(filename)
- #read info from torrent files
- files = dict()
- for file_path in torrent_files:
- filename = os.path.basename(file_path)
- files[filename] = dict()
- try:
- data = torrent_parser.parse_torrent_file(file_path)
- except Exception:
- logger.warning(f'torrent filed to load data at {file_path}')
- continue
- try:
- names = data['info']['files']
- for file in names:
- if file['path'][0].lower() == 'screens':
- pass
- elif file['path'][0].lower() == 'videos':
- files[filename][file['path'][1]] = file['length']
- else:
- if len(file['path']) == 1:
- files[filename][file['path'][0]] = file['length']
- except Exception:
- name = data['info']['name']
- length = data['info']['length']
- files[filename] = {name: length}
- #grab all files from video_path folder and subfolders
- #key = size
- #value = list of videopath, name
- video_files = dict()
- for path in video_path:
- for video in Path(path).glob('**/*'):
- name = os.path.basename(video)
- size = os.path.getsize(video)
- if os.path.isdir(video):
- pass
- elif name.endswith('.jpg'):
- pass
- elif name.endswith('.jpeg'):
- pass
- elif name.endswith('.JPG'):
- pass
- elif name.endswith('.zip'):
- pass
- elif name.endswith('.png'):
- pass
- elif name.endswith('.gif'):
- pass
- elif name.endswith('.mp3'):
- pass
- else:
- if size in video_files:
- value = video_files[size]
- value.extend([str(video)])
- video_files[size] = value
- else:
- #TODO -> remove string? And do this on saving??
- video_files[size] = [str(video)]
- #torrent = torrentname
- #files = dictionary of files in torrent
- torrent_del = list()
- for torrentz, torrent_files in files.items():
- logger.debug(f'------------> starting new torrent: {torrentz}')
- for name, size in torrent_files.items():
- if not name.endswith('.jpeg') and not name.endswith('.gif'):
- if size in video_files:
- all_names = list()
- for names in video_files[size]:
- all_names.append(os.path.basename(names))
- if name in all_names: # check for name match
- index = all_names.index(name)
- new_path = move_path + torrentz + '\\' + os.path.basename(video_files[size][index])
- if move_files:
- # move video file to new folder
- try:
- os.makedirs(os.path.dirname(new_path), exist_ok=True)
- shutil.move(video_files[size][index], new_path)
- except FileNotFoundError:
- logger.warning(f'{file_name} not found')
- pass
- # check for multiple files
- if index == 0:
- del video_files[size]
- else:
- value = video_files[size]
- del value[index]
- video_files[size] = value
- # add matched torrent file info to list
- torrent_del.append([torrentz, name])
- else: #if name does not match -- check for many files exist for this size
- if len(video_files[size]) == 1: # if there is only one file at this size
- file_name = os.path.basename(video_files[size][0])
- #check confidence level for name match
- confidence = clean_confidence(file_name=file_name, torrent_file_name= name)
- if confidence > confidence_level: # assume file match at X confidence level
- # create new path
- new_path = move_path + torrentz + '\\' + file_name
- if move_files:
- # move video file to new folder
- try:
- os.makedirs(os.path.dirname(new_path), exist_ok=True)
- shutil.move(video_files[size][0], new_path)
- except FileNotFoundError:
- logger.warning(f'{file_name} not found')
- pass
- # remove file from dictionary key
- del video_files[size]
- # add matched torrent file info to list
- torrent_del.append([torrentz, name])
- logging.info(f'found 1 file for size: {size} \n {file_name} \n confidence level {confidence} \n name to match: {name}')
- else: # assume wrong file with low confidence level
- pass
- else: # multiple files at this size exist
- start = 0
- confidence_levels = list()
- # check for confidence levels for each video file
- number_files = len(video_files[size])
- while start < len(video_files[size]):
- file_name = os.path.basename(video_files[size][start])
- confidence = clean_confidence(file_name=file_name, torrent_file_name=name)
- confidence_levels.append(confidence)
- start += 1
- #get file with highest confidence level
- max_confidence = max(confidence_levels)
- if max_confidence > confidence_level: # assume file match at X confidence level
- max_index = confidence_levels.index(max_confidence)
- # create new path
- new_path = move_path + torrentz + '\\' + file_name
- if move_files:
- #move video file to new folder
- try:
- os.makedirs(os.path.dirname(new_path), exist_ok=True)
- shutil.move(video_files[size][max_index], new_path)
- except FileNotFoundError:
- logger.warning(f'{file_name} not found')
- pass
- value = video_files[size]
- del value[max_index]
- video_files[size] = value
- # add matched torrent file info to list
- torrent_del.append([torrentz, name])
- logging.info(f'{len(confidence_levels)} files found \n {confidence_levels} \n {file_name} \n confidence level {max_confidence} \n name to match: {name}')
- else: # assume wrong file with low confidence level
- logging.warning(f'confidence level too low \n {len(confidence_levels)} files found \n {confidence_levels} \n {video_files[size]} \n {name}')
- pass
- else:
- pass
- #logging.info(f'nothing found for file \n {name}')
- # assume wrong file
- else:
- #file is jpeg or gif#
- pass
- # cleaning torrent information based on found files
- for list_item in torrent_del:
- del files[list_item[0]][list_item[1]]
- # delete keys for empty values
- for k, v in list(files.items()):
- if bool(v) == False:
- del files[k]
- # save dictionary with torrent info
- # all files that not have been found per torrent
- with open(f'{current_time} - torrent_files.json', 'w') as fp:
- json.dump(files, fp, indent=4)
- # save dictionary with video file info
- # all files that not have been found per size
- with open(f'{current_time} - video_files.json', 'w') as fp:
- json.dump(video_files, fp, indent=4)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement