Advertisement
Guest User

Untitled

a guest
May 29th, 2015
306
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.75 KB | None | 0 0
  1. package dzq;
  2.  
  3. import com.google.common.collect.Lists;
  4. import org.apache.spark.SparkConf;
  5. import org.apache.spark.api.java.JavaRDD;
  6. import org.apache.spark.api.java.JavaSparkContext;
  7. import org.apache.spark.api.java.function.Function2;
  8. import org.apache.spark.api.java.function.VoidFunction;
  9. import org.apache.spark.sql.Row;
  10. import org.apache.spark.streaming.Duration;
  11. import org.apache.spark.streaming.Time;
  12. import org.apache.spark.streaming.api.java.JavaDStream;
  13. import org.apache.spark.streaming.api.java.JavaStreamingContext;
  14. import org.json.JSONObject;
  15. import org.apache.spark.sql.DataFrame;
  16. import org.apache.spark.sql.SQLContext;
  17.  
  18. import java.util.LinkedList;
  19. import java.util.List;
  20. import java.util.Queue;
  21. import java.util.Random;
  22.  
  23. public class SqlInStream {
  24.  
  25. public static void main(String[] args) {
  26. SparkConf sparkConf = new SparkConf().setAppName("TestWindowSlide");
  27. JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));
  28. JavaSparkContext ctx = new JavaSparkContext(sparkConf);
  29. final SQLContext sqlContext = new SQLContext(ctx);
  30.  
  31. // generate rdds
  32. System.out.println("Generate rdds ...");
  33.  
  34. Queue<JavaRDD<String>> queue = new LinkedList<JavaRDD<String>>();
  35. Random rand = new Random();
  36. for (int i=0; i<100; i++) {
  37. List<String> list = Lists.newArrayList();
  38. JSONObject jso;
  39.  
  40. for(int j=0; j<1000; j++) {
  41. jso = new JSONObject();
  42. jso.put("id", rand.nextInt(100000));
  43. jso.put("key", rand.nextInt(100));
  44. jso.put("value", rand.nextInt(100));
  45. list.add(jso.toString());
  46. }
  47. JavaRDD<String> rdd = jssc.sparkContext().parallelize(list);
  48. queue.add(rdd);
  49. }
  50.  
  51. // generate streaming
  52. System.out.println("Generate streaming...");
  53. JavaDStream<String> stream = jssc.queueStream(queue);
  54.  
  55. // search in stream RDD
  56. stream.foreachRDD(new Function2<JavaRDD<String>, Time, Void>() {
  57. @Override
  58. public Void call(JavaRDD<String> rdd, Time time) throws Exception {
  59. DataFrame df = sqlContext.jsonRDD(rdd);
  60. df.registerTempTable("tmp");
  61. DataFrame searchResult = sqlContext.sql("select * from tmp where key < 7 and value > 90");
  62. searchResult.toJavaRDD().foreach(new VoidFunction<Row>() {
  63. @Override
  64. public void call(Row row) throws Exception {
  65. System.out.println(row.toString());
  66. }
  67. });
  68. System.out.println(rdd.toString());
  69. return null;
  70. }
  71. });
  72.  
  73. jssc.start();
  74. jssc.awaitTermination();
  75. }
  76. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement