Guest User

Untitled

a guest
Jan 5th, 2018
341
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.92 KB | None | 0 0
  1. import subprocess
  2. import os
  3. import datetime
  4. import re
  5. import sys
  6.  
  7. SRC_FOLDER="../"
  8.  
  9. if len(sys.argv) < 5:
  10. print """
  11. Usage: %s <username> <passwod> <webhdfs_endpoint> <target dir>
  12.  
  13. This script copy all files from parent dir to HDFS
  14. Using WebHdfs api
  15.  
  16. Developed by: Yehuda Korotkin <yehuda@alefbt.com>
  17. Date: 18-01-01
  18.  
  19. """ % sys.argv[0]
  20. exit(1)
  21.  
  22. HDFS_USER=sys.argv[1]
  23. HDFS_PASS=sys.argv[2]
  24. WEBHDFS_ENDPOINT=sys.argv[3]
  25. HDFS_TARGET=sys.argv[4]
  26.  
  27. WEBHDFS_URL="%s%s" % (WEBHDFS_ENDPOINT, HDFS_TARGET)
  28.  
  29. #
  30. # Global functions
  31. #
  32. def log(s):
  33. d = datetime.datetime.now().strftime('%Y%m%d %H:%M')
  34. print "[%s] %s" % (d,s.replace(HDFS_PASS,"****"))
  35.  
  36.  
  37. def exec_cmd(cmd):
  38. """
  39. Execute shell command
  40. :param cmd:
  41. :return:
  42. """
  43. try:
  44. log("Command executing [%s]" % (cmd))
  45. data = subprocess.check_output(cmd, shell=True)
  46. log("Command [%s] result: %s" % (cmd, data))
  47. return data
  48. except Exception, e:
  49. log("exec_command ERROR - Command [%s] ERROR: %s" % (cmd, e))
  50. return None
  51.  
  52.  
  53. #
  54. # WebHdfs api
  55. #
  56. class WebHdfsApi(object):
  57. """
  58. WebHDFS api
  59. """
  60. def __init__(self, webhdfsUrl, username, password):
  61. self.webhdfsUrl = webhdfsUrl
  62. self.username = username
  63. self.password = password
  64.  
  65. def _exec_call(self,params):
  66. return exec_cmd("""curl -i -u "%s":"%s" %s""" % (self.username,self.password,params))
  67.  
  68. def create_object(self,object_path):
  69. """
  70. Create object
  71. :param object_path:
  72. :return: object url
  73. """
  74. out = self._exec_call("""-X PUT "%s/%s?op=CREATE&overwrite=true" """ % (self.webhdfsUrl, object_path))
  75.  
  76. search_results = re.search("^Location:\s*(.*)\s*$", out, re.MULTILINE)
  77.  
  78. if search_results:
  79. return search_results.group(1).strip()
  80. else:
  81. return None
  82.  
  83. def upload_file_to_object_url(self,object_url, filename):
  84. self._exec_call("""-X PUT -T "%s" "%s" """ % (filename, object_url))
  85.  
  86. def upload_file(self,local_file,remote_path):
  87. if os.path.exists(local_file) and os.path.isfile(local_file):
  88. object_url = self.create_object(remote_path)
  89. self.upload_file_to_object_url(object_url, local_file)
  90.  
  91. else:
  92. log("File not exists %s" % local_file)
  93.  
  94.  
  95. if __name__ == "__main__":
  96. log("Started HDFS Deploy uploader application by Yehuda Korotkin <yehudako@matrixbi.co.il>")
  97. api = WebHdfsApi(WEBHDFS_URL,HDFS_USER,HDFS_PASS)
  98.  
  99. for root, dirnames, filenames in os.walk(SRC_FOLDER):
  100. # Ignore hidden folders
  101. if re.search("\/\.[^\.]",root):
  102. continue
  103.  
  104. # Ignore hidden files
  105. for filename in filenames:
  106. if "." == filename[0:1]:
  107. continue
  108.  
  109. relative_dir = root[len(SRC_FOLDER):]
  110.  
  111. api.upload_file("%s/%s" % (root,filename), "%s/%s" % (relative_dir,filename))
  112. log("Done")
Add Comment
Please, Sign In to add comment