Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import sys
- from flink.plan.Environment import get_environment
- from flink.plan.Constants import WriteMode
- from flink.functions.GroupReduceFunction import GroupReduceFunction
- class Adder(GroupReduceFunction):
- def reduce(self, iterator, collector):
- count, word = iterator.next()
- count += sum([x[0] for x in iterator])
- collector.collect((count, word))
- if __name__ == "__main__":
- input_file = 'hdfs://cdh01.sbgdinc.com:8020/viktor/in.txt'
- output_file = 'hdfs://cdh01.sbgdinc.com:8020/viktor/out.txt'
- env = get_environment()
- env.set_parallelism(3)
- data = env.read_text(input_file)
- data.flat_map(lambda x, c: [(1, word) for word in x.lower().split()]) \
- .group_by(1) \
- .reduce_group(Adder(), combinable=True) \
- .map(lambda y: 'Count: %s Word: %s' % (y[0], y[1])) \
- .write_text(output_file, write_mode=WriteMode.OVERWRITE)
- env.execute()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement