Advertisement
Guest User

Untitled

a guest
Nov 25th, 2015
65
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.45 KB | None | 0 0
  1. def incremental_map_reduce(
  2. map_f,
  3. reduce_f,
  4. db,
  5. source_table_name,
  6. target_table_name,
  7. source_queued_date_field_name,
  8. counter_table_name = "IncrementalMRCounters",
  9. counter_key = None,
  10. max_datetime = None,
  11. reset = False,
  12. force = False):
  13.  
  14. """ This method performs an incremental map-reduce on any new data in 'source_table_name'
  15. into 'target_table_name'. It can be run in a cron job, for instance, and on each execution will
  16. process only the new, unprocessed records.
  17.  
  18. The set of data to be processed incrementally is determined non-invasively (meaning the source table is not
  19. written to) by using the queued_date field 'source_queued_date_field_name'. When a record is ready to be processed,
  20. simply set its queued_date (which should be indexed for efficiency). When incremental_map_reduce() is run, any documents
  21. with queued_dates between the counter in 'counter_key' and 'max_datetime' will be map/reduced.
  22.  
  23. If reset is True, it will clear 'target_table_name' and do a map reduce across all records older
  24. than max_datetime.
  25.  
  26. If unspecified/None, counter_key defaults to counter_table_name:LastMaxDatetime.
  27. """
  28.  
  29. now = datetime.datetime.now()
  30. if max_datetime is None:
  31. max_datetime = now
  32.  
  33. if reset:
  34. logging.debug("Resetting, dropping table " + target_table_name)
  35. db.drop_collection(target_table_name)
  36.  
  37. time_limits = { "$lt" : max_datetime }
  38. if counter_key is None:
  39. counter_key = target_table_name + ":LastMaxDatetime"
  40.  
  41. # If we've run before, filter out anything that we've processed already.
  42. last_max_datetime = None
  43.  
  44. last_max_datetime_record = db[counter_table_name].find_and_modify(
  45. {'_id': counter_key},
  46. {'$set': { 'inprogress': True}, '$push': { 'm': now } },
  47. upsert = True
  48. )
  49.  
  50. if force or last_max_datetime_record is None or not last_max_datetime_record.has_key('inprogress'):
  51. # first time ever run, or forced to go ahead anyway
  52. pass
  53. else:
  54. if last_max_datetime_record['inprogress']:
  55. if last_max_datetime_record['m'][0] < now - datetime.timedelta(hours = 2):
  56. # lock timed out, so go ahead...
  57. logging.error(target_table_name + " lock is old. Ignoring it, but something was broken that caused it to not be unlocked...")
  58. else:
  59. logging.warning(target_table_name + " mapreduce already in progress, skipping...")
  60. raise RuntimeError(target_table_name + " locked since %s. Skipping..." % last_max_datetime_record['m'][0])
  61.  
  62. if not reset:
  63. if last_max_datetime_record is not None:
  64. try:
  65. last_max_datetime = last_max_datetime_record['value']
  66. time_limits['$gt'] = last_max_datetime
  67. logging.debug('~FR limit last_max_datetime = %s' % (last_max_datetime,))
  68. except KeyError:
  69. # This happened on staging. i guess it crashed somehow
  70. # between the find_and_modify and the final update?
  71. logging.error("~FR no value on message!")
  72.  
  73. query = { source_queued_date_field_name: time_limits }
  74. ret = db[source_table_name].map_reduce(
  75. map_f,
  76. reduce_f,
  77. out = { 'reduce' : target_table_name },
  78. query = query,
  79. full_response = True
  80. )
  81.  
  82. num_processed = ret['counts']['input']
  83.  
  84. # Update our counter so we remember for the next pass.
  85. already_processed_through = db[counter_table_name].update(
  86. {'_id': counter_key},
  87. {'$set': { 'inprogress': False, 'value': max_datetime }, '$unset': {'m': 1}},
  88. upsert = False,
  89. multi = False,
  90. safe = True)
  91.  
  92. logging.debug("Processed %d completed surveys from %s through %s.\nmap_reduce details: %s" % (num_processed, last_max_datetime, max_datetime, ret))
  93.  
  94. return ret
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement