Advertisement
zamotivator

Untitled

Dec 20th, 2013
197
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.54 KB | None | 0 0
  1. Какой мне принцип объяснили *** и что потом потвердил ***:
  2.  
  3. mapper должно последовательно обрабатывать вход и не выполнять агрегации. Например, выводить из binlog'ов некоторые данные. Желательно - в минималистично виде. Например в рамках своей задачи я запускаю из mapper'а statfilter + awk - с одной единственной целью, чтобы на выходе mapper'а была одна колонка - user_id (а не как statfilter выводит это "user_id: X")
  4. reducer'ам на вход данные идут в отсортированном по ключам виде. На это нужно полагаться, это поможет не делать лишнюю агрегацию и позволит экономить память.
  5.  
  6. Про reducer'ы хочу немного пояснить. Позвольте на примере SQL.
  7.  
  8. Пусть у нас есть запрос такого вида:
  9.  
  10. select id, sum(value), avg(another value) from some_table group by id.
  11.  
  12. Есть два пути выполнить этот запрос - Hash Group и Sort Group.
  13.  
  14. Hash Group на вход идёт несортированный выход.
  15. Внутри он строит hash_map вида id => (value_sum, another_value_sum, another_value_count) (id => аккумуляторы)
  16. На каждую новую пришедшую на вход строку (current_id, current_value, current_another_value) он ищет в hash_map по current_id аккумуляторы и делает
  17. value_sum += current_value
  18. another_value_sum += current_another_value
  19. another_value_count +=1
  20. Когда вход закончился - Hash Group обходит hash_map и на каждый ключ (id) делает с аккумулятора (value_sum, another_value_sum, another_value_sum_count) следующее:
  21. result_tuple = (id, value_sum, another_value_sum / another_value_count)
  22. возвращает result_tuple наружу
  23.  
  24. Псевдокод:
  25. # hash map
  26. data = {}
  27. # идём по входу
  28. for curr_tuple in input:
  29. # достаём аккумуляторы либо default значения
  30. (current_value_sum, current_another_value_sum, current_another_value_count) = data.get(curr_tuple.id, (0, 0, 0))
  31. сurrent_value_sum = += curr_tuple.value
  32. current_another_value_sum += curr_tuple.another_value
  33. сurrent_another_value_count += 1
  34. data[curr_tuple.id] = (current_value_sum, current_another_value_sum, current_another_value_count)
  35. # выдаём результаты:
  36. for id in data:
  37. (current_value_sum, current_another_value_sum, current_another_value_count) = data[id]
  38. # выдаём результат
  39. yield (id, current_value_sum, current_another_value_sum / current_another_value_count)
  40.  
  41. Плюсы:
  42.  
  43. Вход Hash Group может быть произвольным, порядок не требуется
  44.  
  45. Минусы:
  46.  
  47. Требуется память (sizeof(id) + sizeof(value_sum) + sizeof(another_value_sum) + sizeof(another_value_count)) * unique key count
  48.  
  49.  
  50. Реализация в стиле Hash Group - плохой путь реализации reducer в случае hadoop. Нужно идти другим путем...
  51.  
  52. Sort Group на выход получается данные отсортированные по id.
  53. Если посмотреть внимательно на такой вход, то мы увидим, что одинаковые id идут рядышком, один за другим. Т.е. id уже сгруппированы сортировкой
  54. Именно в таком виде hadoop отдаёт данные на вход reducer.
  55.  
  56. Как агрегировать такие данные?
  57.  
  58. Sort Group работает так:
  59.  
  60. # ID предыдущей записи
  61. prev_id = None
  62. # аккумуляторы
  63. current_value_sum = 0
  64. current_another_value_sum = 0
  65. current_another_value_count = 0
  66. # идём по входу
  67. for curr_tuple in input:
  68. if prev_id = None:
  69. prev_id = curr_tuple.id
  70. if prev_id != curr_tuple.id:
  71. # предыдущая группа закончилась, финализируем аккумуляторы и отдаём наружу
  72. yield (prev_id, current_value_sum, current_another_value_sum / current_another_value_count)
  73. current_value_sum = 0
  74. current_another_value_sum = 0
  75. current_another_value_count = 0
  76. prev_id=curr_tuple.id
  77. else:
  78. # та же самая группа, что была до этого, обновляем аккумулятор
  79. current_value_sum += curr_tuple.value
  80. current_another_value_sum += curr_tuple.another_value
  81. current_another_value_count += 1
  82. else:
  83. # обрабатываем последнюю группу
  84. ....
  85.  
  86.  
  87. Плюсы Sort Group:
  88. потребление оперативной памяти не зависит от объёма входных данных или количества уникальных ключей Memory = O(1)
  89. выдаёт результаты по мере вычисления
  90. Минусы Sort Group:
  91. требует сортировки входа по id Но Hadoop всегда сортирует ключи что идут на вход reducer
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement