Guest User

Untitled

a guest
Dec 7th, 2016
110
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.22 KB | None | 0 0
  1. package org.myorg.quickstart;
  2.  
  3. import org.apache.flink.api.common.functions.CoGroupFunction;
  4. import org.apache.flink.api.common.functions.MapFunction;
  5. import org.apache.flink.api.java.DataSet;
  6. import org.apache.flink.api.java.ExecutionEnvironment;
  7. import org.apache.flink.api.java.operators.DataSource;
  8. import org.apache.flink.api.java.tuple.Tuple2;
  9. import org.apache.flink.util.Collector;
  10.  
  11. public class LeftOuterJoinExample {
  12.  
  13. public static class LeftOuterJoin implements CoGroupFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, Integer>> {
  14.  
  15. @Override
  16. public void coGroup(Iterable<Tuple2<Integer, String>> leftElements,
  17. Iterable<Tuple2<Integer, String>> rightElements,
  18. Collector<Tuple2<Integer, Integer>> out) throws Exception {
  19.  
  20. final int NULL_ELEMENT = -1;
  21.  
  22. for (Tuple2<Integer, String> leftElem : leftElements) {
  23. boolean hadElements = false;
  24. for (Tuple2<Integer, String> rightElem : rightElements) {
  25. out.collect(new Tuple2<Integer, Integer>(leftElem.f0, rightElem.f0));
  26. hadElements = true;
  27. }
  28. if (!hadElements) {
  29. out.collect(new Tuple2<Integer, Integer>(leftElem.f0, NULL_ELEMENT));
  30. }
  31. }
  32.  
  33. }
  34. }
  35.  
  36. public static void main(String[] args) throws Exception {
  37.  
  38. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  39.  
  40.  
  41. DataSource<Integer> leftSide = env.fromElements(1, 2, 3, 4, 5);
  42. DataSet<Tuple2<Integer, String>> leftSide2 = leftSide.map(
  43. new MapFunction<Integer, Tuple2<Integer, String>>() {
  44. @Override
  45. public Tuple2<Integer, String> map(Integer integer) throws Exception {
  46. return new Tuple2<Integer, String>(integer, "some data");
  47. }
  48. });
  49.  
  50. DataSource<Integer> rightSide = env.fromElements(4, 5, 6, 7, 8, 9, 10);
  51. DataSet<Tuple2<Integer, String>> rightSide2 = rightSide.map(
  52. new MapFunction<Integer, Tuple2<Integer, String>>() {
  53. @Override
  54. public Tuple2<Integer, String> map(Integer integer) throws Exception {
  55. return new Tuple2<Integer, String>(integer, "some other data");
  56. }
  57. });
  58.  
  59. DataSet<Tuple2<Integer, Integer>> leftOuterJoin = leftSide2.coGroup(rightSide2)
  60. .where(0)
  61. .equalTo(0)
  62. .with(new LeftOuterJoin());
  63.  
  64. leftOuterJoin.print();
  65.  
  66. env.execute();
  67. }
  68. }
Add Comment
Please, Sign In to add comment