Advertisement
Guest User

S3 locking with python / boto

a guest
Aug 8th, 2010
5,690
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 4.62 KB | None | 0 0
  1. import boto
  2. import time
  3. import uuid
  4. import random
  5. from threading import Thread
  6. from boto.s3.connection import S3Connection
  7. from boto.s3.key import Key
  8.  
  9. class repository:
  10.     def __init__(self, accessKey, secretKey, lockDomain, s3bucket):
  11.         self.db = boto.connect_sdb(accessKey, secretKey)
  12.         self.s3 = S3Connection(accessKey, secretKey)
  13.         self.domain = self.db.create_domain(lockDomain)
  14.         self.bucket = self.s3.create_bucket(s3bucket)
  15.  
  16.     def get(self, id):
  17.         k = Key(self.bucket)
  18.         k.key = id
  19.         try:
  20.             return k.get_contents_as_string()
  21.         except boto.exception.S3ResponseError, e:
  22.             if e.status == 404:
  23.                 return None
  24.             else:
  25.                 raise e
  26.        
  27.     def put(self, id, value):
  28.         k = Key(self.bucket)
  29.         k.key = id
  30.         k.set_contents_from_string(value)
  31.  
  32.     def delete(self, id):
  33.         k = Key(self.bucket)
  34.         k.key = id
  35.         try:
  36.             return k.delete()
  37.         except boto.exception.S3ResponseError, e:
  38.             if e.status == 404:
  39.                 return None
  40.             else:
  41.                 raise e
  42.  
  43.     def acquireLock(self, id, lockDurationSeconds, acquireTimeoutSeconds):
  44.         """ Acquires lock and returns a lockId that can be passed to releaseLock()
  45.  
  46.              id - ID of object to lock - can be any string up to 256 chars in length
  47.              lockDurationSeconds - Seconds to hold lock for.  Once this number of seconds has
  48.                                    elapsed, the lock will expire and other threads will be able to
  49.                                    acquire a lock for the given id
  50.              acquireTimeoutSeconds - Seconds to try to acquire lock.  If id is already locked, this
  51.                                      method will sleep/retry until acquireTimeoutSeconds is reached
  52.  
  53.              Returns lockId (string) if lock is acquired.  If lock cannot be acquired, throws SystemError"""
  54.         lockId         = uuid.uuid4()
  55.         acquireTimeout = time.time() + acquireTimeoutSeconds
  56.         while time.time() < acquireTimeout:
  57.             try:
  58.                 # try to create the lock if it doesn't exist
  59.                 lockTimeout    = time.time() + lockDurationSeconds
  60.                 if self.db.put_attributes(self.domain, id, { 'timeout' : lockTimeout, 'lockId' : lockId }, replace=False, expected_value=['lockId', False]):
  61.                     return lockId
  62.             except boto.exception.SDBResponseError, e:
  63.                 if e.status != 404 and e.status != 409:
  64.                     raise e
  65.             # couldn't create lock - check for stale lock
  66.             attribs = self.db.get_attributes(self.domain, id, consistent_read=True)
  67.             if attribs.has_key('timeout') and float(attribs['timeout']) < time.time():
  68.                 print "lock timed out - releasing with id: %s" % attribs['lockId']
  69.                 self.releaseLock(id, attribs['lockId'])  # lock has timed out, so delete it
  70.             time.sleep(0.05)  # sleep and retry
  71.         # couldn't acquire lock - throw error
  72.         raise SystemError("Unable to obtain lock for: %s" % id)
  73.  
  74.     def releaseLock(self, id, lockId):
  75.         """ Releases previously acquired lock. id - ID of object to lock.  lockId - Lock ID returned from acquireLock() """
  76.         print "releaseLock(%s, %s)" % (id, lockId)
  77.         try:
  78.             return self.db.delete_attributes(self.domain, id, [ 'timeout', 'lockId' ], expected_value=['lockId', lockId])
  79.         except boto.exception.SDBResponseError, e:
  80.             if e.status == 404 or e.status == 409:
  81.                 return False
  82.             else:
  83.                 raise e
  84.  
  85. class Runner(Thread):
  86.     def __init__(self, repository):
  87.         Thread.__init__(self)
  88.         self.repository = repository
  89.         self.id = "myid"
  90.  
  91.     def run(self):
  92.         r = self.repository
  93.         id = self.id
  94.         lockId = r.acquireLock(id, 2, 50)
  95.         self.updateS3Obj()
  96.         r.releaseLock(id, lockId)
  97.         lockId = r.acquireLock(id, 2, 50)
  98.         self.updateS3Obj()
  99.         r.releaseLock(id, lockId)
  100.  
  101.     def updateS3Obj(self):
  102.         r = self.repository
  103.         id = self.id
  104.         val = r.get(id)
  105.         if not val:
  106.             val = 1
  107.         val = int(val) * 2
  108.         r.put(id, val)
  109.  
  110. if __name__ == "__main__":
  111.     r = repository('xxx', 'xxx', 'myDomain', 'myBucket')
  112.     r.delete("myid")
  113.     runnerList = [ ]
  114.     for i in range(0, 10):
  115.         runner = Runner(r)
  116.         runnerList.append(runner)
  117.         runner.start()
  118.     for runner in runnerList:
  119.         runner.join()
  120.     print "Final value: %s" % r.get("myid")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement