- Memory limit hit with appengine-mapreduce
- def time_count_map(data):
- """Time count map function."""
- (entry, text_fn) = data
- text = text_fn()
- try:
- q = text.split('n')
- for m in q:
- reader = csv.reader([m.replace(' ', '')], skipinitialspace=True)
- for s in reader:
- """Calculate time elapsed"""
- sdw = s[1]
- start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p")
- edw = s[2]
- end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p")
- time_difference = time.mktime(end_date) - time.mktime(start_date)
- yield (s[0], time_difference)
- except IndexError, e:
- logging.debug(e)
- def time_count_reduce(key, values):
- """Time count reduce function."""
- time = 0.0
- for subtime in values:
- time += float(subtime)
- realtime = int(time)
- yield "%s: %dn" % (key, realtime)
- class TimeCountPipeline(base_handler.PipelineBase):
- """A pipeline to run Time count demo.
- Args:
- blobkey: blobkey to process as string. Should be a zip archive with
- text files inside.
- """
- def run(self, filekey, blobkey):
- logging.debug("filename is %s" % filekey)
- output = yield mapreduce_pipeline.MapreducePipeline(
- "time_count",
- "main.time_count_map",
- "main.time_count_reduce",
- "mapreduce.input_readers.BlobstoreZipInputReader",
- "mapreduce.output_writers.BlobstoreOutputWriter",
- mapper_params={
- "blob_key": blobkey,
- },
- reducer_params={
- "mime_type": "text/plain",
- },
- shards=32)
- yield StoreOutput("TimeCount", filekey, output)
- mapreduce:
- - name: Make messages lowercase
- params:
- - name: done_callback
- value: /done
- mapper:
- handler: main.lower_case_posts
- input_reader: mapreduce.input_readers.DatastoreInputReader
- params:
- - name: entity_kind
- default: main.Post
- - name: processing_rate
- default: 100
- - name: shard_count
- default: 4
- - name: Make messages upper case
- params:
- - name: done_callback
- value: /done
- mapper:
- handler: main.upper_case_posts
- input_reader: mapreduce.input_readers.DatastoreInputReader
- params:
- - name: entity_kind
- default: main.Post
- - name: processing_rate
- default: 100
- - name: shard_count
- default: 4
- def time_count_map(data):
- """Time count map function."""
- text = data[1]
- try:
- reader = csv.reader([text.replace(' ', '')], skipinitialspace=True)
- for s in reader:
- """Calculate time elapsed"""
- sdw = s[1]
- start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p")
- edw = s[2]
- end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p")
- time_difference = time.mktime(end_date) - time.mktime(start_date)
- yield (s[0], time_difference)
- except IndexError, e:
- logging.debug(e)