Don't like ads? PRO users don't see any ads ;-)
Guest

Untitled

By: a guest on Apr 30th, 2012  |  syntax: None  |  size: 2.85 KB  |  hits: 19  |  expires: Never
download  |  raw  |  embed  |  report abuse  |  print
Text below is selected. Please press Ctrl+C to copy to your clipboard. (⌘+C on Mac)
  1. Memory limit hit with appengine-mapreduce
  2. def time_count_map(data):
  3.   """Time count map function."""
  4.   (entry, text_fn) = data
  5.   text = text_fn()
  6.  
  7.   try:
  8.     q = text.split('n')
  9.     for m in q:
  10.         reader = csv.reader([m.replace('', '')], skipinitialspace=True)
  11.         for s in reader:
  12.             """Calculate time elapsed"""
  13.             sdw = s[1]
  14.             start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p")
  15.             edw = s[2]
  16.             end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p")
  17.             time_difference = time.mktime(end_date) - time.mktime(start_date)
  18.             yield (s[0], time_difference)
  19.   except IndexError, e:
  20.     logging.debug(e)
  21.  
  22.  
  23. def time_count_reduce(key, values):
  24.   """Time count reduce function."""
  25.   time = 0.0
  26.   for subtime in values:
  27.     time += float(subtime)
  28.     realtime = int(time)
  29.   yield "%s: %dn" % (key, realtime)
  30.        
  31. class TimeCountPipeline(base_handler.PipelineBase):
  32.   """A pipeline to run Time count demo.
  33.  
  34.   Args:
  35.     blobkey: blobkey to process as string. Should be a zip archive with
  36.       text files inside.
  37.   """
  38.  
  39.   def run(self, filekey, blobkey):
  40.     logging.debug("filename is %s" % filekey)
  41.     output = yield mapreduce_pipeline.MapreducePipeline(
  42.         "time_count",
  43.         "main.time_count_map",
  44.         "main.time_count_reduce",
  45.         "mapreduce.input_readers.BlobstoreZipInputReader",
  46.         "mapreduce.output_writers.BlobstoreOutputWriter",
  47.         mapper_params={
  48.             "blob_key": blobkey,
  49.         },
  50.         reducer_params={
  51.             "mime_type": "text/plain",
  52.         },
  53.         shards=32)
  54.     yield StoreOutput("TimeCount", filekey, output)
  55.        
  56. mapreduce:
  57. - name: Make messages lowercase
  58.   params:
  59.   - name: done_callback
  60.     value: /done
  61.   mapper:
  62.     handler: main.lower_case_posts
  63.     input_reader: mapreduce.input_readers.DatastoreInputReader
  64.     params:
  65.     - name: entity_kind
  66.       default: main.Post
  67.     - name: processing_rate
  68.       default: 100
  69.     - name: shard_count
  70.       default: 4
  71. - name: Make messages upper case
  72.   params:
  73.   - name: done_callback
  74.     value: /done
  75.   mapper:
  76.     handler: main.upper_case_posts
  77.     input_reader: mapreduce.input_readers.DatastoreInputReader
  78.     params:
  79.     - name: entity_kind
  80.       default: main.Post
  81.     - name: processing_rate
  82.       default: 100
  83.     - name: shard_count
  84.       default: 4
  85.        
  86. def time_count_map(data):
  87.     """Time count map function."""
  88.     text = data[1]
  89.  
  90.     try:
  91.         reader = csv.reader([text.replace('', '')], skipinitialspace=True)
  92.         for s in reader:
  93.             """Calculate time elapsed"""
  94.             sdw = s[1]
  95.             start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p")
  96.             edw = s[2]
  97.             end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p")
  98.             time_difference = time.mktime(end_date) - time.mktime(start_date)
  99.             yield (s[0], time_difference)
  100.     except IndexError, e:
  101.         logging.debug(e)