Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- Какой мне принцип объяснили *** и что потом потвердил ***:
- mapper должно последовательно обрабатывать вход и не выполнять агрегации. Например, выводить из binlog'ов некоторые данные. Желательно - в минималистично виде. Например в рамках своей задачи я запускаю из mapper'а statfilter + awk - с одной единственной целью, чтобы на выходе mapper'а была одна колонка - user_id (а не как statfilter выводит это "user_id: X")
- reducer'ам на вход данные идут в отсортированном по ключам виде. На это нужно полагаться, это поможет не делать лишнюю агрегацию и позволит экономить память.
- Про reducer'ы хочу немного пояснить. Позвольте на примере SQL.
- Пусть у нас есть запрос такого вида:
- select id, sum(value), avg(another value) from some_table group by id.
- Есть два пути выполнить этот запрос - Hash Group и Sort Group.
- Hash Group на вход идёт несортированный выход.
- Внутри он строит hash_map вида id => (value_sum, another_value_sum, another_value_count) (id => аккумуляторы)
- На каждую новую пришедшую на вход строку (current_id, current_value, current_another_value) он ищет в hash_map по current_id аккумуляторы и делает
- value_sum += current_value
- another_value_sum += current_another_value
- another_value_count +=1
- Когда вход закончился - Hash Group обходит hash_map и на каждый ключ (id) делает с аккумулятора (value_sum, another_value_sum, another_value_sum_count) следующее:
- result_tuple = (id, value_sum, another_value_sum / another_value_count)
- возвращает result_tuple наружу
- Псевдокод:
- # hash map
- data = {}
- # идём по входу
- for curr_tuple in input:
- # достаём аккумуляторы либо default значения
- (current_value_sum, current_another_value_sum, current_another_value_count) = data.get(curr_tuple.id, (0, 0, 0))
- сurrent_value_sum = += curr_tuple.value
- current_another_value_sum += curr_tuple.another_value
- сurrent_another_value_count += 1
- data[curr_tuple.id] = (current_value_sum, current_another_value_sum, current_another_value_count)
- # выдаём результаты:
- for id in data:
- (current_value_sum, current_another_value_sum, current_another_value_count) = data[id]
- # выдаём результат
- yield (id, current_value_sum, current_another_value_sum / current_another_value_count)
- Плюсы:
- Вход Hash Group может быть произвольным, порядок не требуется
- Минусы:
- Требуется память (sizeof(id) + sizeof(value_sum) + sizeof(another_value_sum) + sizeof(another_value_count)) * unique key count
- Реализация в стиле Hash Group - плохой путь реализации reducer в случае hadoop. Нужно идти другим путем...
- Sort Group на выход получается данные отсортированные по id.
- Если посмотреть внимательно на такой вход, то мы увидим, что одинаковые id идут рядышком, один за другим. Т.е. id уже сгруппированы сортировкой
- Именно в таком виде hadoop отдаёт данные на вход reducer.
- Как агрегировать такие данные?
- Sort Group работает так:
- # ID предыдущей записи
- prev_id = None
- # аккумуляторы
- current_value_sum = 0
- current_another_value_sum = 0
- current_another_value_count = 0
- # идём по входу
- for curr_tuple in input:
- if prev_id = None:
- prev_id = curr_tuple.id
- if prev_id != curr_tuple.id:
- # предыдущая группа закончилась, финализируем аккумуляторы и отдаём наружу
- yield (prev_id, current_value_sum, current_another_value_sum / current_another_value_count)
- current_value_sum = 0
- current_another_value_sum = 0
- current_another_value_count = 0
- prev_id=curr_tuple.id
- else:
- # та же самая группа, что была до этого, обновляем аккумулятор
- current_value_sum += curr_tuple.value
- current_another_value_sum += curr_tuple.another_value
- current_another_value_count += 1
- else:
- # обрабатываем последнюю группу
- ....
- Плюсы Sort Group:
- потребление оперативной памяти не зависит от объёма входных данных или количества уникальных ключей Memory = O(1)
- выдаёт результаты по мере вычисления
- Минусы Sort Group:
- требует сортировки входа по id Но Hadoop всегда сортирует ключи что идут на вход reducer
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement