Advertisement
Guest User

Untitled

a guest
Feb 2nd, 2018
114
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.01 KB | None | 0 0
  1. import asyncio
  2. from ftplib import FTP
  3. from concurrent.futures import ProcessPoolExecutor
  4.  
  5. from typing import List, Tuple, Dict, Sequence
  6.  
  7. import emma_winston_wolfe.common_utils.config_definition as env
  8.  
  9.  
  10. class FTPConnect:
  11. def __init__(self, host, user_name, user_password, ftp_dir, local_dir):
  12. self.host = host
  13. self.user_name = user_name
  14. self.user_password = user_password
  15. self.ftp_dir = ftp_dir and ftp_dir or '.'
  16. self.local_dir = local_dir and local_dir or '.'
  17.  
  18. async def async_get_file_names(self, executor: ProcessPoolExecutor):
  19. loop = asyncio.get_event_loop()
  20. return await loop.run_in_executor(executor, self.get_file_names)
  21.  
  22. def get_file_names(self):
  23. with FTP(self.host, timeout=env.FTP_CONNECTION_TIMEOUT) as ftp:
  24. ftp.login(user=self.user_name, passwd=self.user_password)
  25. ftp.cwd(self.ftp_dir)
  26.  
  27. ftp_files = {
  28. file_desc[0] for file_desc in ftp.mlsd() if self._is_file(file_desc)
  29. }
  30.  
  31. return ftp_files
  32.  
  33. @staticmethod
  34. def _is_file(dir_entry: Tuple[str, Dict[str, str]]) -> bool:
  35. """
  36. Check type of returning entity of FTP.mlsd()
  37. :param dir_entry: Tuple from FTP.mlsd()
  38. :return: true if entry is file, else false
  39. """
  40. return dir_entry[1].get('type') == 'file'
  41.  
  42. def load_ftp_file(self, file_name) -> str:
  43. with FTP(self.host) as ftp:
  44. ftp.login(user=self.user_name, passwd=self.user_password)
  45. ftp.cwd(self.ftp_dir)
  46.  
  47. with open(self.local_dir + file_name, "wb+") as output_f:
  48. ftp.retrbinary("RETR " + file_name, output_f.write)
  49.  
  50. return file_name
  51.  
  52. async def async_load_ftp_files(self, executor: ProcessPoolExecutor,
  53. file_names: Sequence[str]) -> List[str]:
  54. loop = asyncio.get_event_loop()
  55.  
  56. if not len(file_names):
  57. return []
  58.  
  59. load_ftp_coroutines = [
  60. loop.run_in_executor(executor, self.load_ftp_file, file_name)
  61. for file_name in file_names
  62. ]
  63.  
  64. completed, pending = await asyncio.wait(load_ftp_coroutines)
  65.  
  66. return [t.result() for t in completed]
  67.  
  68. def get_file_size(self, file_name: str) -> Tuple[str, int]:
  69. with FTP(self.host) as ftp:
  70. ftp.login(user=self.user_name, passwd=self.user_password)
  71. ftp.cwd(self.ftp_dir)
  72. ftp.voidcmd('TYPE I')
  73.  
  74. return file_name, ftp.size(file_name)
  75.  
  76. async def async_get_files_size(self, executor: ProcessPoolExecutor) -> Dict[str, int]:
  77. loop = asyncio.get_event_loop()
  78.  
  79. file_names = await self.async_get_file_names(executor)
  80.  
  81. get_files_size_coroutines = [
  82. loop.run_in_executor(executor, self.get_file_size, file_name)
  83. for file_name in file_names
  84. ]
  85.  
  86. result = await asyncio.gather(*get_files_size_coroutines)
  87.  
  88. return dict(result)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement