Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -*- coding: utf-8 -*-
- import md5 as mmd5
- DEBUG = False
- api_upyun = 'api.upyun.com'
- ENDPOINT_V0 = 'v0.' + api_upyun
- ENDPOINT_V1 = 'v1.' + api_upyun
- ENDPOINT_V2 = 'v2.' + api_upyun
- ENDPOINT_V3 = 'v3.' + api_upyun
- default_endpoint = ENDPOINT_V0
- def md5(e):
- m = mmd5.new()
- m.update(e)
- return m.hexdigest()
- def md5_fd(fobj):
- m = mmd5.new()
- while True:
- data = fobj.read(8096)
- if not data:
- break
- m.update(data)
- fobj.seek(0)
- return m.hexdigest()
- def set_endpoint(e):
- global default_endpoint
- default_endpoint = e
- def get_endpoint():
- return default_endpoint
- class UpyunCore(object):
- def __init__(self, auth_type='upyun'):
- self._auth_type = auth_type
- def _basic_auth(self, headers, username, password):
- import base64
- data = base64.encodestring(username + ':' + password).strip()
- headers['Authorization'] = 'Basic ' + data
- def _upyun_auth(self, headers, method, path, username, password):
- import time
- headers['Date'] = time.strftime('%a, %d %b %Y %X GMT', time.gmtime())
- content_length = headers.get('Content-Length') or 0
- signature = '&'.join([method, path, headers['Date'], str(content_length),
- md5(password)])
- headers['Authorization'] = 'UpYun %s:%s' % (username, md5(signature))
- def request(self, host, method, path, data, username, password, options):
- import httplib
- headers = options.copy()
- if self._auth_type == 'basic':
- self._basic_auth(headers, username, password)
- else:
- self._upyun_auth(headers, method, path, username, password)
- conn = httplib.HTTPConnection(host)
- conn.request(method, path, data, headers)
- res = conn.getresponse()
- if DEBUG:
- print '%s %s => %s %s' % (method, host + path, res.status, res.reason)
- return res
- class UpyunEntry(object):
- def __init__(self, name, type, size, date):
- self.name = name
- self.type = 'folder' if type == 'F' else 'file'
- self.size = size
- self.date = date
- def __str__(self):
- return '<%s: %s>' % (self.type, self.name)
- class Upyun(object):
- def __init__(self, bucket, username, password, transport=UpyunCore()):
- self._bucket = bucket
- self._username = username
- self._password = password
- self._core = transport
- def __str__(self):
- return '<bucket: %s, username: %s>' % (self._bucket, self._username)
- def touch(self, path, data, md5_checksum=None, auto_mkdir=False):
- options = {}
- if md5_checksum:
- headers['Content-MD5'] = md5_checksum
- if auto_mkdir:
- options['mkdir'] = 'true'
- if type(data) != file:
- options['Content-Length'] = len(data)
- res = self._request('PUT', self._generate_path(path), data, options)
- return self._is_res_ok(res)
- def touch_image(self, path, data, secret_code=None,
- md5_checksum=None, auto_mkdir=False):
- options = {}
- if secret_code:
- headers['Content-Secret'] = secret_code
- if md5_checksum:
- headers['Content-MD5'] = md5_checksum
- if auto_mkdir:
- options['mkdir'] = 'true'
- if type(data) != file:
- options['Content-Length'] = len(data)
- res = self._request('PUT', self._generate_path(path), data, options)
- if self._is_res_ok(res):
- info = {}
- info['x-upyun-width'] = res.getheader('x-upyun-width')
- info['x-upyun-height'] = res.getheader('x-upyun-height')
- info['x-upyun-frames'] = res.getheader('x-upyun-frames')
- info['x-upyun-file-type'] = res.getheader('x-upyun-file-type')
- return True, info
- return False, self._get_res_errmsg(res)
- def cat(self, path):
- res = self._request('GET', self._generate_path(path))
- if self._is_res_ok(res):
- return True, res.read()
- return False, self._get_res_errmsg(res)
- def stat(self, path):
- res = self._request('HEAD', self._generate_path(path))
- if self._is_res_ok(res):
- info = {}
- info['type'] = res.getheader('x-upyun-file-type')
- info['size'] = res.getheader('x-upyun-file-size')
- info['date'] = res.getheader('x-upyun-file-date')
- return True, info
- return False, self._get_res_errmsg(res)
- def usage(self, path='/'):
- res = self._request('GET', self._generate_path(path) + '?usage')
- if self._is_res_ok(res):
- return True, int(res.read())
- return False, self._get_res_errmsg(res)
- def ls(self, path='/'):
- res = self._request('GET', self._generate_path(path))
- if self._is_res_ok(res):
- data = res.read()
- for e in data.split('\n'):
- info_lst = e.split('\t')
- if len(info_lst) >= 4:
- yield UpyunEntry(*info_lst)
- def mkdir(self, path, auto_mkdir=False):
- options = {}
- options['folder'] = 'create'
- if auto_mkdir:
- options['mkdir'] = 'true'
- res = self._request('POST', self._generate_path(path), None, options)
- return self._is_res_ok(res)
- def rmdir(self, path):
- return self.rm(path)
- def rm(self, path):
- res = self._request('DELETE', self._generate_path(path))
- return self._is_res_ok(res)
- def _is_res_ok(self, res):
- return res.status == 200
- def _get_res_errmsg(self, res):
- return '%s %s: %s' % (res.status, res.reason, res.read())
- def _request(self, method, path, data=None, options={}):
- return self._core.request(get_endpoint(), method, path, data,
- self._username, self._password,
- options)
- def _generate_path(self, filename=None):
- path = '/' + self._bucket
- return path + filename if filename else path
- if __name__ == '__main__':
- s = Upyun('<bucket-name>', '<operator-name>', '<operator-pwd>')
- # endpoint
- print 'endpoint: %s' % get_endpoint()
- set_endpoint(ENDPOINT_V1)
- print 'endpoint: %s' % get_endpoint()
- # usage
- rc, usage = s.usage()
- if rc:
- print 'usage: %d' % usage
- # mkdir
- s.mkdir('/tmp')
- s.mkdir('/tmp/1/2', True)
- # rmdir
- s.rmdir('/tmp/1/2')
- s.rmdir('/tmp/1')
- s.rmdir('/tmp')
- # touch
- s.touch('/a.txt', 'hijcak')
- # cat
- rc, data = s.cat('/a.txt')
- if rc:
- print 'data of a.txt is: ' + data
- # stat
- rc, info = s.stat('/a.txt')
- if rc:
- print info
- # rm
- s.rm('/a.txt')
- # ls
- for e in s.ls():
- print e
Add Comment
Please, Sign In to add comment