Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- from ftplib import FTP
- from concurrent.futures import ProcessPoolExecutor
- from typing import List, Tuple, Dict, Sequence
- import emma_winston_wolfe.common_utils.config_definition as env
- class FTPConnect:
- def __init__(self, host, user_name, user_password, ftp_dir, local_dir):
- self.host = host
- self.user_name = user_name
- self.user_password = user_password
- self.ftp_dir = ftp_dir and ftp_dir or '.'
- self.local_dir = local_dir and local_dir or '.'
- async def async_get_file_names(self, executor: ProcessPoolExecutor):
- loop = asyncio.get_event_loop()
- return await loop.run_in_executor(executor, self.get_file_names)
- def get_file_names(self):
- with FTP(self.host, timeout=env.FTP_CONNECTION_TIMEOUT) as ftp:
- ftp.login(user=self.user_name, passwd=self.user_password)
- ftp.cwd(self.ftp_dir)
- ftp_files = {
- file_desc[0] for file_desc in ftp.mlsd() if self._is_file(file_desc)
- }
- return ftp_files
- @staticmethod
- def _is_file(dir_entry: Tuple[str, Dict[str, str]]) -> bool:
- """
- Check type of returning entity of FTP.mlsd()
- :param dir_entry: Tuple from FTP.mlsd()
- :return: true if entry is file, else false
- """
- return dir_entry[1].get('type') == 'file'
- def load_ftp_file(self, file_name) -> str:
- with FTP(self.host) as ftp:
- ftp.login(user=self.user_name, passwd=self.user_password)
- ftp.cwd(self.ftp_dir)
- with open(self.local_dir + file_name, "wb+") as output_f:
- ftp.retrbinary("RETR " + file_name, output_f.write)
- return file_name
- async def async_load_ftp_files(self, executor: ProcessPoolExecutor,
- file_names: Sequence[str]) -> List[str]:
- loop = asyncio.get_event_loop()
- if not len(file_names):
- return []
- load_ftp_coroutines = [
- loop.run_in_executor(executor, self.load_ftp_file, file_name)
- for file_name in file_names
- ]
- completed, pending = await asyncio.wait(load_ftp_coroutines)
- return [t.result() for t in completed]
- def get_file_size(self, file_name: str) -> Tuple[str, int]:
- with FTP(self.host) as ftp:
- ftp.login(user=self.user_name, passwd=self.user_password)
- ftp.cwd(self.ftp_dir)
- ftp.voidcmd('TYPE I')
- return file_name, ftp.size(file_name)
- async def async_get_files_size(self, executor: ProcessPoolExecutor) -> Dict[str, int]:
- loop = asyncio.get_event_loop()
- file_names = await self.async_get_file_names(executor)
- get_files_size_coroutines = [
- loop.run_in_executor(executor, self.get_file_size, file_name)
- for file_name in file_names
- ]
- result = await asyncio.gather(*get_files_size_coroutines)
- return dict(result)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement