Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import subprocess
- import os
- import datetime
- import re
- import sys
- SRC_FOLDER="../"
- if len(sys.argv) < 5:
- print """
- Usage: %s <username> <passwod> <webhdfs_endpoint> <target dir>
- This script copy all files from parent dir to HDFS
- Using WebHdfs api
- Developed by: Yehuda Korotkin <yehuda@alefbt.com>
- Date: 18-01-01
- """ % sys.argv[0]
- exit(1)
- HDFS_USER=sys.argv[1]
- HDFS_PASS=sys.argv[2]
- WEBHDFS_ENDPOINT=sys.argv[3]
- HDFS_TARGET=sys.argv[4]
- WEBHDFS_URL="%s%s" % (WEBHDFS_ENDPOINT, HDFS_TARGET)
- #
- # Global functions
- #
- def log(s):
- d = datetime.datetime.now().strftime('%Y%m%d %H:%M')
- print "[%s] %s" % (d,s.replace(HDFS_PASS,"****"))
- def exec_cmd(cmd):
- """
- Execute shell command
- :param cmd:
- :return:
- """
- try:
- log("Command executing [%s]" % (cmd))
- data = subprocess.check_output(cmd, shell=True)
- log("Command [%s] result: %s" % (cmd, data))
- return data
- except Exception, e:
- log("exec_command ERROR - Command [%s] ERROR: %s" % (cmd, e))
- return None
- #
- # WebHdfs api
- #
- class WebHdfsApi(object):
- """
- WebHDFS api
- """
- def __init__(self, webhdfsUrl, username, password):
- self.webhdfsUrl = webhdfsUrl
- self.username = username
- self.password = password
- def _exec_call(self,params):
- return exec_cmd("""curl -i -u "%s":"%s" %s""" % (self.username,self.password,params))
- def create_object(self,object_path):
- """
- Create object
- :param object_path:
- :return: object url
- """
- out = self._exec_call("""-X PUT "%s/%s?op=CREATE&overwrite=true" """ % (self.webhdfsUrl, object_path))
- search_results = re.search("^Location:\s*(.*)\s*$", out, re.MULTILINE)
- if search_results:
- return search_results.group(1).strip()
- else:
- return None
- def upload_file_to_object_url(self,object_url, filename):
- self._exec_call("""-X PUT -T "%s" "%s" """ % (filename, object_url))
- def upload_file(self,local_file,remote_path):
- if os.path.exists(local_file) and os.path.isfile(local_file):
- object_url = self.create_object(remote_path)
- self.upload_file_to_object_url(object_url, local_file)
- else:
- log("File not exists %s" % local_file)
- if __name__ == "__main__":
- log("Started HDFS Deploy uploader application by Yehuda Korotkin <yehudako@matrixbi.co.il>")
- api = WebHdfsApi(WEBHDFS_URL,HDFS_USER,HDFS_PASS)
- for root, dirnames, filenames in os.walk(SRC_FOLDER):
- # Ignore hidden folders
- if re.search("\/\.[^\.]",root):
- continue
- # Ignore hidden files
- for filename in filenames:
- if "." == filename[0:1]:
- continue
- relative_dir = root[len(SRC_FOLDER):]
- api.upload_file("%s/%s" % (root,filename), "%s/%s" % (relative_dir,filename))
- log("Done")
Add Comment
Please, Sign In to add comment