Advertisement
rakslice

Test mongo conditional upsert

Oct 6th, 2013
115
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 2.19 KB | None | 0 0
  1. '''
  2. Test for mongo conditional upsert
  3. http://stackoverflow.com/a/19214951/60422
  4.  
  5. Created on 2013-10-06
  6.  
  7. @author: rakslice
  8. '''
  9. import pymongo
  10. import unittest
  11.  
  12. # Obviously this doesn't test for race conditions.
  13.  
  14. db = pymongo.MongoClient().mongo_conditional_update
  15. collection = db.test
  16.  
  17. assert isinstance(collection, pymongo.collection.Collection)
  18.  
  19.  
  20. def update_if_stale(key, new_value, new_time):
  21.     collection.update({'key': key,
  22.                        'update_time': {'$lt': new_time}
  23.                        },
  24.                       {'$set': {'value': new_value,
  25.                                 'update_time': new_time
  26.                                 }
  27.                        }
  28.                       )
  29.  
  30.  
  31. def insert_if_missing(key, new_value, new_time):
  32.     collection.update({'key': key},
  33.                       {'$setOnInsert': {'value': new_value,
  34.                                         'update_time': new_time
  35.                                         }
  36.                        },
  37.                       upsert=True
  38.                       )
  39.  
  40.  
  41. def update_key(key, new_value, new_time):
  42.     insert_if_missing(key, new_value, new_time)
  43.     update_if_stale(key, new_value, new_time)
  44.  
  45.  
  46. def get_key(key):
  47.     result = collection.find_one({'key': key})
  48.     if result is None:
  49.         return result
  50.     return result["value"]
  51.  
  52.  
  53. def key_count(key):
  54.     result = list(collection.find({'key': key}))
  55.     return len(result)
  56.  
  57.  
  58. class TestConditionalUpdate(unittest.TestCase):
  59.     def setUp(self):
  60.         collection.drop()
  61.  
  62.     def testPreviouslyDidNotExist(self):
  63.         update_key("a", "foo", 10)
  64.         self.assertEqual("foo", get_key("a"))
  65.  
  66.     def testPreviouslyHadStaleValue(self):
  67.         collection.insert({"key": "a", "value": "joe", "update_time": 5})
  68.  
  69.         assert "joe" == get_key("a")
  70.  
  71.         update_key("a", "foo", 10)
  72.  
  73.         assert "foo" == get_key("a")
  74.         assert 1 == key_count("a")
  75.  
  76.     def testPreviouslyHadNewValue(self):
  77.         collection.insert({"key": "a", "value": "joe", "update_time": 15})
  78.  
  79.         assert "joe" == get_key("a")
  80.  
  81.         update_key("a", "foo", 10)
  82.  
  83.         assert "joe" == get_key("a")
  84.         assert 1 == key_count("a")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement