Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -*- coding: utf-8 -*-
- # Python 3
- # μτf±8
- # ___________________________________________________________________________
- #
- # Name: crpg_update
- # Description: -
- # Version: 1.00
- # Project: cRPG
- #
- # Version Last to First: yyyy/mm/dd x.xx Author: Description
- # NB: Change __version__
- #
- # 2016/04/10 1.00 Élio: Creation
- # ___________________________________________________________________________
- """cRPG update"""
- import argparse # Parser for command-line options, arguments and sub-commands
- import gzip # Support for gzip files
- import hashlib # Secure hashes and message digests
- import os.path # Common pathname manipulations
- import sys # System-specific parameters and functions
- import urllib.error # Exception classes raised by urllib.request
- import urllib.parse # Parse URLs into components
- import urllib.request # Extensible library for opening URLs
- __version__ = '1.00'
- APPLICATION_NAME = "cRPG update {}".format(__version__)
- def binary_prefix_format(value: int) -> str: # {
- """IEC binary prefix format
- IEC (SI) names: Yobi (Yotta), Zebi (Zetta), Exbi (Exa), Pebi (Peta), Tebi (Tera), Gibi (Giga), Mebi (Mega), Kibi (kilo)
- """
- for val, unit in zip(
- (1 << 80, 1 << 70, 1 << 60, 1 << 50, 1 << 40, 1 << 30, 1 << 20, 1 << 10),
- ("Yi", "Zi", "Ei", "Pi", "Ti", "Gi", "Mi", "Ki")
- ): # {
- if value >= val:
- return "{:0.2f} {}".format(value / val, unit)
- # } for
- else: return "{:0.0f}".format(value)
- # } binary_prefix_format
- def url_read(url: str, verbose=True) -> bytes: # {
- """Read URL data file
- """
- url_o = urllib.parse.urlparse(url)
- try: # {
- with urllib.request.urlopen(url) as f: # {
- if f.reason != "OK" or not f.readable(): # { Validation: PASSED 2016/04/10
- exc_value = Exception("Unable to retrieve file: {}".format(
- os.path.basename(url_o.path),
- ))
- print(exc_value, url)
- sys.exit(exc_value) # TODO: Handle error
- # }
- meta = f.info()
- data_length = int(meta.get_all('Content-Length')[0])
- index = 0
- progress = 0
- progress_old = -1
- data = bytes()
- while index < data_length: # {
- data_chunck = f.read(100 << 10) # 100 kB
- data += data_chunck
- index += len(data_chunck)
- progress = index / data_length
- if ((progress - progress_old) >= 0.1) or progress == 1: # {
- msg = "\tDownloading file... {}: {} / {} bytes".format(
- "{:0.0%}".format(progress).rjust(4),
- binary_prefix_format(index),
- binary_prefix_format(data_length),
- )
- if verbose: print(msg)
- progress_old = progress
- # } if
- # } while
- if verbose: print("")
- # } with
- return data
- # } try
- except urllib.error.HTTPError as exc_value: # {
- print(exc_value, url)
- sys.exit(exc_value) # TODO: Handle error
- # os.path.basename(url_o.path)
- # exc_value.code
- # exc_value.reason
- # } except urllib.error.HTTPError
- # } url_read
- def url_unparse(scheme: str, netloc: str, path: str, params: str="", query: str="", fragment: str="") -> str: # {
- return urllib.parse.urlunparse((scheme, netloc, path, params, query, fragment))
- # } url_unparse
- def file_open(file: str) -> bytes: # {
- try: # {
- with open(file, 'rb') as f: # {
- # TODO: Segment packets
- return f.read()
- # }
- # } try
- except (IOError, OSError) as exc_value: # {
- sys.exit(exc_value) # TODO: Handle error
- # } except (IOError, OSError)
- # } file_open
- def compute_file_md5_checksum(file_path): # {
- """Compute file MD5 checksum
- """
- m = hashlib.md5()
- with open(file_path, 'rb') as f: # {
- m.update(f.read())
- # } with
- return m.hexdigest()
- # } compute_file_md5_checksum
- def retrieve_file(url_o, filename, output_path, iscompressed=True, verbose=True): # {
- # Read URL file
- url_path_file = url_unparse(
- url_o.scheme,
- url_o.netloc,
- os.path.normpath(os.path.join(url_o.path, filename + ["", ".gz"][iscompressed])).replace("\\", "/"),
- )
- if verbose: print("Downloading: {}".format(filename))
- data = url_read(url_path_file, verbose)
- if iscompressed: # {
- # Decompress file
- data = gzip.decompress(data)
- # }
- file_path = os.path.normpath(os.path.join(output_path, filename))
- # Create directory
- if not os.path.exists(os.path.dirname(file_path)): os.makedirs(os.path.dirname(file_path))
- # Store file
- with open(file_path, "wb") as f: # {
- f.write(data)
- # }
- return data
- # } retrieve_file
- if __name__ == '__main__': # {
- input_path_name = "cRPG"
- parser = argparse.ArgumentParser(description=APPLICATION_NAME)
- parser.add_argument( # Positional argument
- action='store',
- # nargs,
- # const,
- # default,
- type=str,
- # choices,
- help="{} directory".format(input_path_name),
- # metavar,
- dest='input_directory'
- )
- arguments = parser.parse_args()
- input_path = arguments.input_directory
- input_url = "" ## Removed
- #
- input_path = os.path.normpath(input_path)
- # Check input path
- if not os.path.isdir(input_path) or not os.path.exists(input_path): # {
- exc_value = Exception("Invalid {} path: {}".format(
- input_path_name,
- input_path,
- ))
- sys.exit(exc_value)
- # }
- input_path_version = os.path.join(input_path, "version.txt")
- data = file_open(input_path_version)
- local_version = data.splitlines()[0].decode('utf-8', errors='ignore')
- print("Local version: {}".format(local_version))
- input_url_o = urllib.parse.urlparse(input_url)
- url_path_version = url_unparse(
- input_url_o.scheme,
- input_url_o.netloc,
- os.path.join(input_url_o.path, "version.txt"),
- )
- data = url_read(url_path_version, False)
- server_version = data.splitlines()[0].decode('utf-8', errors='ignore')
- print("Server version: {}\n".format(server_version))
- update_required = (local_version != server_version)
- print("{} update is required\n".format(input_path_name))
- filelist = retrieve_file(input_url_o, "filelist.txt", input_path, iscompressed=False, verbose=False)
- filelist = filelist.decode('utf-8', errors='ignore').splitlines()
- index = 0
- progress = 0
- progress_old = -1
- for line in filelist: # { for each line of filelist
- file_path, file_md5_checksum = line.split(", ")
- if "version.txt" in file_path: index += 1; continue # Skip troll
- if "filelist.txt" in file_path: index += 1; continue # Skip troll
- if not update_required and "std_banners" not in file_path: index += 1; continue
- local_file_path = os.path.normpath(os.path.join(input_path, file_path))
- local_file_md5_checksum = None
- if os.path.exists(local_file_path): # {
- local_file_md5_checksum = compute_file_md5_checksum(local_file_path)
- # }
- if local_file_md5_checksum != file_md5_checksum: # {
- # print("{}: {}".format(file_path.ljust(50), file_md5_checksum))
- # print("{}: {}".format("Local".ljust(50), local_file_md5_checksum))
- retrieve_file(input_url_o, file_path, input_path, iscompressed=True, verbose=True)
- # }
- index += 1
- progress = index / len(filelist)
- if ((progress - progress_old) >= 0.1) or progress == 1: # {
- msg = "Updating files... {}: {} / {}".format(
- "{:0.0%}".format(progress).rjust(4),
- index,
- len(filelist),
- )
- print(msg)
- progress_old = progress
- # } if
- # } for each line of filelist
- retrieve_file(input_url_o, "version.txt", input_path, iscompressed=False, verbose=False)
- retrieve_file(input_url_o, "filelist.txt", input_path, iscompressed=False, verbose=False)
- # } __main__
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement