Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import boto
- import time
- import uuid
- import random
- from threading import Thread
- from boto.s3.connection import S3Connection
- from boto.s3.key import Key
- class repository:
- def __init__(self, accessKey, secretKey, lockDomain, s3bucket):
- self.db = boto.connect_sdb(accessKey, secretKey)
- self.s3 = S3Connection(accessKey, secretKey)
- self.domain = self.db.create_domain(lockDomain)
- self.bucket = self.s3.create_bucket(s3bucket)
- def get(self, id):
- k = Key(self.bucket)
- k.key = id
- try:
- return k.get_contents_as_string()
- except boto.exception.S3ResponseError, e:
- if e.status == 404:
- return None
- else:
- raise e
- def put(self, id, value):
- k = Key(self.bucket)
- k.key = id
- k.set_contents_from_string(value)
- def delete(self, id):
- k = Key(self.bucket)
- k.key = id
- try:
- return k.delete()
- except boto.exception.S3ResponseError, e:
- if e.status == 404:
- return None
- else:
- raise e
- def acquireLock(self, id, lockDurationSeconds, acquireTimeoutSeconds):
- """ Acquires lock and returns a lockId that can be passed to releaseLock()
- id - ID of object to lock - can be any string up to 256 chars in length
- lockDurationSeconds - Seconds to hold lock for. Once this number of seconds has
- elapsed, the lock will expire and other threads will be able to
- acquire a lock for the given id
- acquireTimeoutSeconds - Seconds to try to acquire lock. If id is already locked, this
- method will sleep/retry until acquireTimeoutSeconds is reached
- Returns lockId (string) if lock is acquired. If lock cannot be acquired, throws SystemError"""
- lockId = uuid.uuid4()
- acquireTimeout = time.time() + acquireTimeoutSeconds
- while time.time() < acquireTimeout:
- try:
- # try to create the lock if it doesn't exist
- lockTimeout = time.time() + lockDurationSeconds
- if self.db.put_attributes(self.domain, id, { 'timeout' : lockTimeout, 'lockId' : lockId }, replace=False, expected_value=['lockId', False]):
- return lockId
- except boto.exception.SDBResponseError, e:
- if e.status != 404 and e.status != 409:
- raise e
- # couldn't create lock - check for stale lock
- attribs = self.db.get_attributes(self.domain, id, consistent_read=True)
- if attribs.has_key('timeout') and float(attribs['timeout']) < time.time():
- print "lock timed out - releasing with id: %s" % attribs['lockId']
- self.releaseLock(id, attribs['lockId']) # lock has timed out, so delete it
- time.sleep(0.05) # sleep and retry
- # couldn't acquire lock - throw error
- raise SystemError("Unable to obtain lock for: %s" % id)
- def releaseLock(self, id, lockId):
- """ Releases previously acquired lock. id - ID of object to lock. lockId - Lock ID returned from acquireLock() """
- print "releaseLock(%s, %s)" % (id, lockId)
- try:
- return self.db.delete_attributes(self.domain, id, [ 'timeout', 'lockId' ], expected_value=['lockId', lockId])
- except boto.exception.SDBResponseError, e:
- if e.status == 404 or e.status == 409:
- return False
- else:
- raise e
- class Runner(Thread):
- def __init__(self, repository):
- Thread.__init__(self)
- self.repository = repository
- self.id = "myid"
- def run(self):
- r = self.repository
- id = self.id
- lockId = r.acquireLock(id, 2, 50)
- self.updateS3Obj()
- r.releaseLock(id, lockId)
- lockId = r.acquireLock(id, 2, 50)
- self.updateS3Obj()
- r.releaseLock(id, lockId)
- def updateS3Obj(self):
- r = self.repository
- id = self.id
- val = r.get(id)
- if not val:
- val = 1
- val = int(val) * 2
- r.put(id, val)
- if __name__ == "__main__":
- r = repository('xxx', 'xxx', 'myDomain', 'myBucket')
- r.delete("myid")
- runnerList = [ ]
- for i in range(0, 10):
- runner = Runner(r)
- runnerList.append(runner)
- runner.start()
- for runner in runnerList:
- runner.join()
- print "Final value: %s" % r.get("myid")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement