daily pastebin goal
57%
SHARE
TWEET

Untitled

a guest Mar 22nd, 2019 77 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. <properties>
  2.         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  3.         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  4.         <java.version>1.8</java.version>
  5.         <spring-cloud.version>Finchley.BUILD-SNAPSHOT</spring-cloud.version>
  6.     </properties>
  7.  
  8. <dependency>
  9.             <groupId>org.springframework.boot</groupId>
  10.             <artifactId>spring-boot-starter-websocket</artifactId>
  11.         </dependency>
  12.         <dependency>
  13.             <groupId>org.springframework.cloud</groupId>
  14.             <artifactId>spring-cloud-stream</artifactId>
  15.         </dependency>
  16.         <dependency>
  17.             <groupId>org.springframework.cloud</groupId>
  18.             <artifactId>spring-cloud-stream-binder-kafka</artifactId>
  19.         </dependency>
  20.         <dependency>
  21.             <groupId>org.springframework.boot</groupId>
  22.             <artifactId>spring-boot-starter-test</artifactId>
  23.             <scope>test</scope>
  24.         </dependency>
  25.         <dependency>
  26.             <groupId>org.springframework.cloud</groupId>
  27.             <artifactId>spring-cloud-stream-test-support</artifactId>
  28.             <scope>test</scope>
  29.         </dependency>
  30.        
  31.         <dependencyManagement>
  32.         <dependencies>
  33.             <dependency>
  34.                 <groupId>org.springframework.cloud</groupId>
  35.                 <artifactId>spring-cloud-dependencies</artifactId>
  36.                 <version>${spring-cloud.version}</version>
  37.                 <type>pom</type>
  38.                 <scope>import</scope>
  39.             </dependency>
  40.         </dependencies>
  41.     </dependencyManagement>
  42.    
  43.     spring.cloud.stream.kafka.binder.zkNodes=localhost:2181
  44. spring.cloud.stream.kafka.binder.brokers=localhost:9092
  45.  
  46. spring.cloud.stream.bindings.output.destination=meetupTopic
  47. spring.cloud.stream.bindings.output.producer.partitionCount=1
  48.  
  49. spring.cloud.stream.bindings.output.content-type=text/plain
  50. spring.cloud.stream.bindings.output.producer.headerMode=raw
  51.  
  52. server.port=8081
  53. ===== Producer
  54. @SpringBootApplication
  55. public class RsvpApplication {
  56.  
  57.     private static final String MEETUP_RSVPS_ENDPOINT = "ws://stream.meetup.com/2/rsvps";
  58.  
  59.     public static void main(String[] args) {
  60.         SpringApplication.run(RsvpApplication.class, args);
  61.     }
  62.  
  63.     @Bean
  64.     public ApplicationRunner initializeConnection(
  65.         RsvpsWebSocketHandler rsvpsWebSocketHandler) {
  66.             return args -> {
  67.                 WebSocketClient rsvpsSocketClient = new StandardWebSocketClient();
  68.  
  69.                 rsvpsSocketClient.doHandshake(
  70.                     rsvpsWebSocketHandler, MEETUP_RSVPS_ENDPOINT);          
  71.             };
  72.         }
  73. }
  74.  
  75. @Component
  76. @EnableBinding(Source.class)
  77. public class RsvpsKafkaProducer {
  78.  
  79.     private static final int SENDING_MESSAGE_TIMEOUT_MS = 10000;
  80.  
  81.     private final Source source;
  82.  
  83.     public RsvpsKafkaProducer(Source source) {
  84.         this.source = source;
  85.     }
  86.  
  87.     public void sendRsvpMessage(WebSocketMessage<?> message) {
  88.        
  89.         source.output()
  90.                 .send(MessageBuilder.withPayload(message.getPayload())
  91.                         .build(),
  92.                         SENDING_MESSAGE_TIMEOUT_MS);  
  93.     }
  94. }
  95.  
  96. @Component
  97. class RsvpsWebSocketHandler extends AbstractWebSocketHandler {
  98.  
  99.     private static final Logger logger =
  100.             Logger.getLogger(RsvpsWebSocketHandler.class.getName());
  101.        
  102.     private final RsvpsKafkaProducer rsvpsKafkaProducer;
  103.  
  104.     public RsvpsWebSocketHandler(RsvpsKafkaProducer rsvpsKafkaProducer) {    
  105.         this.rsvpsKafkaProducer = rsvpsKafkaProducer;
  106.     }
  107.  
  108.     @Override
  109.     public void handleMessage(WebSocketSession session,
  110.             WebSocketMessage<?> message) {
  111.         logger.log(Level.INFO, "New RSVP:\n {0}", message.getPayload());
  112.        
  113.         rsvpsKafkaProducer.sendRsvpMessage(message);        
  114.     }
  115. }
  116.  
  117. ============= Consumer
  118.  <dependencies>          
  119.         <dependency>
  120.             <groupId>org.scala-lang</groupId>
  121.             <artifactId>scala-library</artifactId>
  122.             <version>2.11.8</version>
  123.         </dependency>        
  124.         <dependency>
  125.             <groupId>org.apache.spark</groupId>
  126.             <artifactId>spark-core_2.11</artifactId>
  127.             <version>2.2.1</version>                
  128.         </dependency>      
  129.         <dependency>
  130.             <groupId>org.apache.spark</groupId>
  131.             <artifactId>spark-streaming_2.11</artifactId>
  132.             <version>2.2.1</version>
  133.         </dependency>        
  134.         <dependency>
  135.             <groupId>org.apache.spark</groupId>
  136.             <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
  137.             <version>2.2.1</version>
  138.         </dependency>
  139.         <dependency>
  140.             <groupId>org.mongodb.spark</groupId>
  141.             <artifactId>mongo-spark-connector_2.11</artifactId>
  142.             <version>2.2.1</version>
  143.         </dependency>            
  144.         <dependency>
  145.             <groupId>org.apache.spark</groupId>
  146.             <artifactId>spark-sql_2.11</artifactId>
  147.             <version>2.2.1</version>
  148.         </dependency>
  149.     </dependencies>
  150.    
  151.     <build>
  152.         <pluginManagement>
  153.             <plugins>
  154.                 <plugin>
  155.                     <groupId>org.scala-tools</groupId>
  156.                     <artifactId>maven-scala-plugin</artifactId>
  157.                     <version>2.15.2</version>
  158.                 </plugin>
  159.                 <plugin>
  160.                     <groupId>org.apache.maven.plugins</groupId>
  161.                     <artifactId>maven-compiler-plugin</artifactId>
  162.                     <version>2.5.1</version>
  163.                 </plugin>
  164.             </plugins>
  165.         </pluginManagement>
  166.         <plugins>
  167.             <plugin>
  168.                 <groupId>net.alchim31.maven</groupId>
  169.                 <artifactId>scala-maven-plugin</artifactId>
  170.                 <version>3.1.6</version>
  171.                 <configuration>
  172.                     <scalaCompatVersion>2.11</scalaCompatVersion>
  173.                     <scalaVersion>2.11.8</scalaVersion>
  174.                 </configuration>
  175.                 <!-- other settings-->
  176.             </plugin>
  177.             <plugin>
  178.                 <groupId>org.apache.maven.plugins</groupId>
  179.                 <artifactId>maven-compiler-plugin</artifactId>
  180.                 <executions>
  181.                     <execution>
  182.                         <phase>compile</phase>
  183.                         <goals>
  184.                             <goal>compile</goal>
  185.                         </goals>
  186.                     </execution>
  187.                 </executions>
  188.                 <configuration>
  189.                     <source>1.8</source>
  190.                     <target>1.8</target>
  191.                 </configuration>
  192.             </plugin>
  193.         </plugins>
  194.     </build>    
  195.    
  196.    
  197.     public class StreamingRsvpsDStream {
  198.  
  199.     private static final String APPLICATION_NAME = "Streaming Rsvps DStream";
  200.     private static final String HADOOP_HOME_DIR_VALUE = "C:/winutils";    
  201.     private static final String RUN_LOCAL_WITH_AVAILABLE_CORES = "local[*]";    
  202.     private static final int BATCH_DURATION_INTERVAL_MS = 5000;
  203.  
  204.     private static final Map<String, Object> KAFKA_CONSUMER_PROPERTIES;    
  205.    
  206.     private static final String KAFKA_BROKERS = "localhost:9092";
  207.     private static final String KAFKA_OFFSET_RESET_TYPE = "latest";
  208.     private static final String KAFKA_GROUP = "meetupGroup";
  209.     private static final String KAFKA_TOPIC = "meetupTopic";
  210.     private static final Collection<String> TOPICS =
  211.             Collections.unmodifiableList(Arrays.asList(KAFKA_TOPIC));
  212.            
  213.     static {
  214.         Map<String, Object> kafkaProperties = new HashMap<>();
  215.         kafkaProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKERS);
  216.         kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  217.         kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  218.         kafkaProperties.put(ConsumerConfig.GROUP_ID_CONFIG, KAFKA_GROUP);
  219.         kafkaProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, KAFKA_OFFSET_RESET_TYPE);
  220.         kafkaProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  221.  
  222.         KAFKA_CONSUMER_PROPERTIES = Collections.unmodifiableMap(kafkaProperties);
  223.     }
  224.    
  225.     private static final String MONGODB_OUTPUT_URI = "mongodb://localhost/meetupDB.rsvpsguests";
  226.        
  227.     public static void main(String[] args) throws InterruptedException {
  228.  
  229.         System.setProperty("hadoop.home.dir", HADOOP_HOME_DIR_VALUE);
  230.  
  231.         final SparkConf conf = new SparkConf()
  232.                 .setMaster(RUN_LOCAL_WITH_AVAILABLE_CORES)
  233.                 .setAppName(APPLICATION_NAME)
  234.                 .set("spark.mongodb.output.uri", MONGODB_OUTPUT_URI);
  235.  
  236.         final JavaStreamingContext streamingContext
  237.                 = new JavaStreamingContext(conf, new Duration(BATCH_DURATION_INTERVAL_MS));
  238.  
  239.         final JavaInputDStream<ConsumerRecord<String, String>> meetupStream =
  240.                 KafkaUtils.createDirectStream(
  241.                         streamingContext,
  242.                         LocationStrategies.PreferConsistent(),
  243.                         ConsumerStrategies.<String, String>Subscribe(TOPICS, KAFKA_CONSUMER_PROPERTIES)
  244.                 );
  245.                
  246.         // transformations, streaming algorithms, etc
  247.         JavaDStream<ConsumerRecord<String, String>> rsvpsWithGuestsStream =
  248.                 meetupStream.filter(f -> !f.value().contains("\"guests\":0"));
  249.  
  250.         rsvpsWithGuestsStream.foreachRDD((JavaRDD<ConsumerRecord<String, String>> r) -> {        
  251.             MongoSpark.save(
  252.                     r.map(
  253.                         e -> Document.parse(e.value())
  254.                     )
  255.             );            
  256.         });
  257.        
  258.         // some time later, after outputs have completed
  259.         meetupStream.foreachRDD((JavaRDD<ConsumerRecord<String, String>> meetupRDD) -> {        
  260.             OffsetRange[] offsetRanges = ((HasOffsetRanges) meetupRDD.rdd()).offsetRanges();            
  261.  
  262.             ((CanCommitOffsets) meetupStream.inputDStream())
  263.                 .commitAsync(offsetRanges, new MeetupOffsetCommitCallback());
  264.         });
  265.  
  266.         streamingContext.start();
  267.         streamingContext.awaitTermination();    
  268.     }
  269. }
  270.  
  271. final class MeetupOffsetCommitCallback implements OffsetCommitCallback {
  272.  
  273.     private static final Logger log = Logger.getLogger(MeetupOffsetCommitCallback.class.getName());
  274.  
  275.     @Override
  276.     public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
  277.         log.info("---------------------------------------------------");
  278.         log.log(Level.INFO, "{0} | {1}", new Object[]{offsets, exception});
  279.         log.info("---------------------------------------------------");
  280.     }
  281. }
  282.  
  283. === To Console
  284.  
  285. public class SparkStructuredStreaming {
  286.    
  287.     private static final String HADOOP_HOME_DIR_VALUE = "C:/winutils";
  288.     private static final String CHECKPOINT_LOCATION = "D://rsvpck";
  289.  
  290.     private static final String RUN_LOCAL_WITH_AVAILABLE_CORES = "local[*]";
  291.     private static final String APPLICATION_NAME = "Spark Structured Streaming";
  292.     private static final String CASE_SENSITIVE = "false";
  293.    
  294.     private static final String KAFKA_BROKERS = "localhost:9092";
  295.     private static final String STREAM_FORMAT = "kafka";    
  296.     private static final String KAFKA_TOPIC = "meetupTopic";
  297.  
  298.     // * the schema can be written on disk, and read from disk
  299.     // * the schema is not mandatory to be complete, it can contain only the needed fields
  300.     private static final StructType RSVP_SCHEMA = new StructType()
  301.         .add("venue",
  302.                 new StructType()
  303.                         .add("venue_name", StringType, true)                        
  304.                         .add("lon", DoubleType, true)
  305.                         .add("lat", DoubleType, true)
  306.                         .add("venue_id", LongType, true))        
  307.         .add("visibility", StringType, true)                                
  308.         .add("response", StringType, true)
  309.         .add("guests", LongType, true)
  310.         .add("member",
  311.                 new StructType()
  312.                         .add("member_id", LongType, true)
  313.                         .add("photo", StringType, true)
  314.                         .add("member_name", StringType, true))                            
  315.         .add("rsvp_id", LongType, true)      
  316.         .add("mtime", LongType, true)              
  317.         .add("event",
  318.                 new StructType()
  319.                         .add("event_name", StringType, true)
  320.                         .add("event_id", StringType, true)                
  321.                         .add("time", LongType, true)
  322.                         .add("event_url", StringType, true))
  323.         .add("group",
  324.                 new StructType()
  325.                         .add("group_city", StringType, true)
  326.                         .add("group_country", StringType, true)
  327.                         .add("group_id", LongType, true)
  328.                         .add("group_lat", DoubleType, true)
  329.                         .add("group_long", DoubleType, true)
  330.                         .add("group_name", StringType, true)
  331.                         .add("group_state", StringType, true)
  332.                         .add("group_topics", DataTypes.createArrayType(
  333.                                 new StructType()
  334.                                         .add("topicName", StringType, true)
  335.                                         .add("urlkey", StringType, true)), true)
  336.                         .add("group_urlname", StringType, true));
  337.  
  338.     public static void main(String[] args) throws InterruptedException, StreamingQueryException {
  339.  
  340.         System.setProperty("hadoop.home.dir", HADOOP_HOME_DIR_VALUE);
  341.  
  342.         final SparkConf conf = new SparkConf()
  343.                 .setMaster(RUN_LOCAL_WITH_AVAILABLE_CORES)
  344.                 .setAppName(APPLICATION_NAME)
  345.                 .set("spark.sql.caseSensitive", CASE_SENSITIVE);
  346.  
  347.         SparkSession sparkSession = SparkSession.builder()
  348.                 .config(conf)
  349.                 .getOrCreate();
  350.  
  351.         Dataset<Row> meetupDF = sparkSession.readStream()
  352.                 .format(STREAM_FORMAT)
  353.                 .option("kafka.bootstrap.servers", KAFKA_BROKERS)
  354.                 .option("subscribe", KAFKA_TOPIC)                
  355.                 .load();                              
  356.        
  357.         meetupDF.printSchema();
  358.  
  359.         Dataset<Row> rsvpAndTimestampDF = meetupDF
  360.                 .select(col("timestamp"),
  361.                         from_json(col("value").cast("string"), RSVP_SCHEMA)
  362.                                  .alias("rsvp"))
  363.                 .alias("meetup")
  364.                 .select("meetup.*");
  365.        
  366.         rsvpAndTimestampDF.printSchema();
  367.        
  368.         Dataset<Row> window = rsvpAndTimestampDF
  369.                 .withWatermark("timestamp", "1 minute")
  370.                 .groupBy(
  371.                         window(col("timestamp"), "4 minutes", "2 minutes"),
  372.                         col("rsvp.guests"))
  373.                 .count();
  374.  
  375.         StreamingQuery query = window.writeStream()
  376.                 .outputMode("complete")
  377.                 .format("console")                              
  378.                 .option("checkpointLocation", CHECKPOINT_LOCATION)
  379.                 .option("truncate", false)
  380.                 .start();
  381.  
  382.         query.awaitTermination();
  383.     }
  384. }
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top