Advertisement
zamotivator

Untitled

Dec 23rd, 2013
96
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 2.89 KB | None | 0 0
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3.  
  4. from hadoop import *
  5. setDir('/data/target/spd_games/')
  6.  
  7. MAXATTEMPTS = 8
  8. MAPMEMORY = 128
  9. REDUCEMEMORY = 256
  10. TIMEOUT = -1
  11. GAMES = ##GAMES##
  12.  
  13. QUEUE =     ##QUEUE##
  14. JOB=        ##JOB##
  15. TARGET=     ##TARGET##
  16. REDUCES=    ##REDUCES##
  17. RAW_INPUT = True
  18. EXTRA_FILES = ['/usr/local/target/bin/binlogreader', 'hadoop.py']
  19.  
  20.  
  21. # Вход:
  22. #     input = spd-лог
  23. # Выход:
  24. #     ключ = game
  25. #     значение = user_id
  26. def mapper(input, output):
  27.     import os
  28.     import sys
  29.     import subprocess
  30.     # формируем condition для фильтрации
  31.     condition = 'user_id_type = 2'
  32.     if GAMES != 'all':
  33.         condition += '& (%s)' % ' | '.join('game=%s' % game for game in GAMES)
  34.     # запускаем binlogreader
  35.     cmd = ['./binlogreader', '-t', 'game', '-f', '%{user_id}\t%{game}', '-c', condition]
  36.     blr = subprocess.Popen(cmd, stdin=input, stdout=subprocess.PIPE)
  37.     try:
  38.         # делаем пост-обработку результата при помощи awk
  39.         awk = subprocess.Popen(['awk', '{print $2"="$4}'], stdin=blr.stdout, stdout=subprocess.PIPE)
  40.         try:
  41.             # обрабатывем выход awk 'user_id=game'
  42.             for line in awk.stdout:
  43.                 columns = line.replace('\n', '').replace('\t', '').replace('"', '').split('=')
  44.                 if len(columns) == 2:
  45.                     # user_id
  46.                     user_id = columns[0]
  47.                     # game
  48.                     game = columns[1]
  49.                     # пишем 'key\tvalue'
  50.                     output.write('%s\t%s\n' % (game, user_id))
  51.                 else:
  52.                     sys.stderr.write("binlogreader: strange line '%s'" % line)
  53.             awk.wait()
  54.         except BaseException:
  55.             awk.kill()
  56.             raise
  57.         blr.wait()
  58.     except BaseException:
  59.         blr.kill()
  60.         raise
  61.  
  62. # Распарсить
  63. def read(game, users):
  64.     value = set(users.split(','))
  65.     return (game, value)
  66.  
  67. # Смержить
  68. def merge((prev_key, prev), (curr_key, curr)):
  69.     assert(prev_key == curr_key)
  70.     return prev.union(curr)
  71.  
  72. # Записать в output ключ + значение
  73. def write(output):
  74.     def result(game, users):
  75.         users = ','.join(users)
  76.         output.write('%s\t%s\n' % (game, users))
  77.     return result
  78.  
  79.  
  80. # Вход:
  81. #     ключ = game
  82. #     значение = user_id,[,user_id]
  83. # Выход:
  84. #     ключ = game
  85. #     значение = user_id,[,user_id]
  86. def combiner(input, output):
  87.     grouping(input, read=read, merge=merge, write=write(output))
  88.  
  89.  
  90. # Вход:
  91. #     ключ = game
  92. #     значение = user_id,[,user_id]
  93. # Выход:
  94. #     ключ = game
  95. #     значение = user_id,[,user_id]
  96. def reducer(input, output):
  97.     grouping(input, read=read, merge=merge, write=write(output))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement