Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- from hadoop import *
- setDir('/data/target/spd_games/')
- MAXATTEMPTS = 8
- MAPMEMORY = 128
- REDUCEMEMORY = 256
- TIMEOUT = -1
- GAMES = ##GAMES##
- QUEUE = ##QUEUE##
- JOB= ##JOB##
- TARGET= ##TARGET##
- REDUCES= ##REDUCES##
- RAW_INPUT = True
- EXTRA_FILES = ['/usr/local/target/bin/binlogreader', 'hadoop.py']
- # Вход:
- # input = spd-лог
- # Выход:
- # ключ = game
- # значение = user_id
- def mapper(input, output):
- import os
- import sys
- import subprocess
- # формируем condition для фильтрации
- condition = 'user_id_type = 2'
- if GAMES != 'all':
- condition += '& (%s)' % ' | '.join('game=%s' % game for game in GAMES)
- # запускаем binlogreader
- cmd = ['./binlogreader', '-t', 'game', '-f', '%{user_id}\t%{game}', '-c', condition]
- blr = subprocess.Popen(cmd, stdin=input, stdout=subprocess.PIPE)
- try:
- # делаем пост-обработку результата при помощи awk
- awk = subprocess.Popen(['awk', '{print $2"="$4}'], stdin=blr.stdout, stdout=subprocess.PIPE)
- try:
- # обрабатывем выход awk 'user_id=game'
- for line in awk.stdout:
- columns = line.replace('\n', '').replace('\t', '').replace('"', '').split('=')
- if len(columns) == 2:
- # user_id
- user_id = columns[0]
- # game
- game = columns[1]
- # пишем 'key\tvalue'
- output.write('%s\t%s\n' % (game, user_id))
- else:
- sys.stderr.write("binlogreader: strange line '%s'" % line)
- awk.wait()
- except BaseException:
- awk.kill()
- raise
- blr.wait()
- except BaseException:
- blr.kill()
- raise
- # Распарсить
- def read(game, users):
- value = set(users.split(','))
- return (game, value)
- # Смержить
- def merge((prev_key, prev), (curr_key, curr)):
- assert(prev_key == curr_key)
- return prev.union(curr)
- # Записать в output ключ + значение
- def write(output):
- def result(game, users):
- users = ','.join(users)
- output.write('%s\t%s\n' % (game, users))
- return result
- # Вход:
- # ключ = game
- # значение = user_id,[,user_id]
- # Выход:
- # ключ = game
- # значение = user_id,[,user_id]
- def combiner(input, output):
- grouping(input, read=read, merge=merge, write=write(output))
- # Вход:
- # ключ = game
- # значение = user_id,[,user_id]
- # Выход:
- # ключ = game
- # значение = user_id,[,user_id]
- def reducer(input, output):
- grouping(input, read=read, merge=merge, write=write(output))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement