Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
- <java.version>1.8</java.version>
- <spring-cloud.version>Finchley.BUILD-SNAPSHOT</spring-cloud.version>
- </properties>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-websocket</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-stream</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-stream-binder-kafka</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-stream-test-support</artifactId>
- <scope>test</scope>
- </dependency>
- <dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-dependencies</artifactId>
- <version>${spring-cloud.version}</version>
- <type>pom</type>
- <scope>import</scope>
- </dependency>
- </dependencies>
- </dependencyManagement>
- spring.cloud.stream.kafka.binder.zkNodes=localhost:2181
- spring.cloud.stream.kafka.binder.brokers=localhost:9092
- spring.cloud.stream.bindings.output.destination=meetupTopic
- spring.cloud.stream.bindings.output.producer.partitionCount=1
- spring.cloud.stream.bindings.output.content-type=text/plain
- spring.cloud.stream.bindings.output.producer.headerMode=raw
- server.port=8081
- ===== Producer
- @SpringBootApplication
- public class RsvpApplication {
- private static final String MEETUP_RSVPS_ENDPOINT = "ws://stream.meetup.com/2/rsvps";
- public static void main(String[] args) {
- SpringApplication.run(RsvpApplication.class, args);
- }
- @Bean
- public ApplicationRunner initializeConnection(
- RsvpsWebSocketHandler rsvpsWebSocketHandler) {
- return args -> {
- WebSocketClient rsvpsSocketClient = new StandardWebSocketClient();
- rsvpsSocketClient.doHandshake(
- rsvpsWebSocketHandler, MEETUP_RSVPS_ENDPOINT);
- };
- }
- }
- @Component
- @EnableBinding(Source.class)
- public class RsvpsKafkaProducer {
- private static final int SENDING_MESSAGE_TIMEOUT_MS = 10000;
- private final Source source;
- public RsvpsKafkaProducer(Source source) {
- this.source = source;
- }
- public void sendRsvpMessage(WebSocketMessage<?> message) {
- source.output()
- .send(MessageBuilder.withPayload(message.getPayload())
- .build(),
- SENDING_MESSAGE_TIMEOUT_MS);
- }
- }
- @Component
- class RsvpsWebSocketHandler extends AbstractWebSocketHandler {
- private static final Logger logger =
- Logger.getLogger(RsvpsWebSocketHandler.class.getName());
- private final RsvpsKafkaProducer rsvpsKafkaProducer;
- public RsvpsWebSocketHandler(RsvpsKafkaProducer rsvpsKafkaProducer) {
- this.rsvpsKafkaProducer = rsvpsKafkaProducer;
- }
- @Override
- public void handleMessage(WebSocketSession session,
- WebSocketMessage<?> message) {
- logger.log(Level.INFO, "New RSVP:\n {0}", message.getPayload());
- rsvpsKafkaProducer.sendRsvpMessage(message);
- }
- }
- ============= Consumer
- <dependencies>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>2.11.8</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.11</artifactId>
- <version>2.2.1</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_2.11</artifactId>
- <version>2.2.1</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
- <version>2.2.1</version>
- </dependency>
- <dependency>
- <groupId>org.mongodb.spark</groupId>
- <artifactId>mongo-spark-connector_2.11</artifactId>
- <version>2.2.1</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.11</artifactId>
- <version>2.2.1</version>
- </dependency>
- </dependencies>
- <build>
- <pluginManagement>
- <plugins>
- <plugin>
- <groupId>org.scala-tools</groupId>
- <artifactId>maven-scala-plugin</artifactId>
- <version>2.15.2</version>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>2.5.1</version>
- </plugin>
- </plugins>
- </pluginManagement>
- <plugins>
- <plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <version>3.1.6</version>
- <configuration>
- <scalaCompatVersion>2.11</scalaCompatVersion>
- <scalaVersion>2.11.8</scalaVersion>
- </configuration>
- <!-- other settings-->
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <executions>
- <execution>
- <phase>compile</phase>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <source>1.8</source>
- <target>1.8</target>
- </configuration>
- </plugin>
- </plugins>
- </build>
- public class StreamingRsvpsDStream {
- private static final String APPLICATION_NAME = "Streaming Rsvps DStream";
- private static final String HADOOP_HOME_DIR_VALUE = "C:/winutils";
- private static final String RUN_LOCAL_WITH_AVAILABLE_CORES = "local[*]";
- private static final int BATCH_DURATION_INTERVAL_MS = 5000;
- private static final Map<String, Object> KAFKA_CONSUMER_PROPERTIES;
- private static final String KAFKA_BROKERS = "localhost:9092";
- private static final String KAFKA_OFFSET_RESET_TYPE = "latest";
- private static final String KAFKA_GROUP = "meetupGroup";
- private static final String KAFKA_TOPIC = "meetupTopic";
- private static final Collection<String> TOPICS =
- Collections.unmodifiableList(Arrays.asList(KAFKA_TOPIC));
- static {
- Map<String, Object> kafkaProperties = new HashMap<>();
- kafkaProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKERS);
- kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- kafkaProperties.put(ConsumerConfig.GROUP_ID_CONFIG, KAFKA_GROUP);
- kafkaProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, KAFKA_OFFSET_RESET_TYPE);
- kafkaProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
- KAFKA_CONSUMER_PROPERTIES = Collections.unmodifiableMap(kafkaProperties);
- }
- private static final String MONGODB_OUTPUT_URI = "mongodb://localhost/meetupDB.rsvpsguests";
- public static void main(String[] args) throws InterruptedException {
- System.setProperty("hadoop.home.dir", HADOOP_HOME_DIR_VALUE);
- final SparkConf conf = new SparkConf()
- .setMaster(RUN_LOCAL_WITH_AVAILABLE_CORES)
- .setAppName(APPLICATION_NAME)
- .set("spark.mongodb.output.uri", MONGODB_OUTPUT_URI);
- final JavaStreamingContext streamingContext
- = new JavaStreamingContext(conf, new Duration(BATCH_DURATION_INTERVAL_MS));
- final JavaInputDStream<ConsumerRecord<String, String>> meetupStream =
- KafkaUtils.createDirectStream(
- streamingContext,
- LocationStrategies.PreferConsistent(),
- ConsumerStrategies.<String, String>Subscribe(TOPICS, KAFKA_CONSUMER_PROPERTIES)
- );
- // transformations, streaming algorithms, etc
- JavaDStream<ConsumerRecord<String, String>> rsvpsWithGuestsStream =
- meetupStream.filter(f -> !f.value().contains("\"guests\":0"));
- rsvpsWithGuestsStream.foreachRDD((JavaRDD<ConsumerRecord<String, String>> r) -> {
- MongoSpark.save(
- r.map(
- e -> Document.parse(e.value())
- )
- );
- });
- // some time later, after outputs have completed
- meetupStream.foreachRDD((JavaRDD<ConsumerRecord<String, String>> meetupRDD) -> {
- OffsetRange[] offsetRanges = ((HasOffsetRanges) meetupRDD.rdd()).offsetRanges();
- ((CanCommitOffsets) meetupStream.inputDStream())
- .commitAsync(offsetRanges, new MeetupOffsetCommitCallback());
- });
- streamingContext.start();
- streamingContext.awaitTermination();
- }
- }
- final class MeetupOffsetCommitCallback implements OffsetCommitCallback {
- private static final Logger log = Logger.getLogger(MeetupOffsetCommitCallback.class.getName());
- @Override
- public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
- log.info("---------------------------------------------------");
- log.log(Level.INFO, "{0} | {1}", new Object[]{offsets, exception});
- log.info("---------------------------------------------------");
- }
- }
- === To Console
- public class SparkStructuredStreaming {
- private static final String HADOOP_HOME_DIR_VALUE = "C:/winutils";
- private static final String CHECKPOINT_LOCATION = "D://rsvpck";
- private static final String RUN_LOCAL_WITH_AVAILABLE_CORES = "local[*]";
- private static final String APPLICATION_NAME = "Spark Structured Streaming";
- private static final String CASE_SENSITIVE = "false";
- private static final String KAFKA_BROKERS = "localhost:9092";
- private static final String STREAM_FORMAT = "kafka";
- private static final String KAFKA_TOPIC = "meetupTopic";
- // * the schema can be written on disk, and read from disk
- // * the schema is not mandatory to be complete, it can contain only the needed fields
- private static final StructType RSVP_SCHEMA = new StructType()
- .add("venue",
- new StructType()
- .add("venue_name", StringType, true)
- .add("lon", DoubleType, true)
- .add("lat", DoubleType, true)
- .add("venue_id", LongType, true))
- .add("visibility", StringType, true)
- .add("response", StringType, true)
- .add("guests", LongType, true)
- .add("member",
- new StructType()
- .add("member_id", LongType, true)
- .add("photo", StringType, true)
- .add("member_name", StringType, true))
- .add("rsvp_id", LongType, true)
- .add("mtime", LongType, true)
- .add("event",
- new StructType()
- .add("event_name", StringType, true)
- .add("event_id", StringType, true)
- .add("time", LongType, true)
- .add("event_url", StringType, true))
- .add("group",
- new StructType()
- .add("group_city", StringType, true)
- .add("group_country", StringType, true)
- .add("group_id", LongType, true)
- .add("group_lat", DoubleType, true)
- .add("group_long", DoubleType, true)
- .add("group_name", StringType, true)
- .add("group_state", StringType, true)
- .add("group_topics", DataTypes.createArrayType(
- new StructType()
- .add("topicName", StringType, true)
- .add("urlkey", StringType, true)), true)
- .add("group_urlname", StringType, true));
- public static void main(String[] args) throws InterruptedException, StreamingQueryException {
- System.setProperty("hadoop.home.dir", HADOOP_HOME_DIR_VALUE);
- final SparkConf conf = new SparkConf()
- .setMaster(RUN_LOCAL_WITH_AVAILABLE_CORES)
- .setAppName(APPLICATION_NAME)
- .set("spark.sql.caseSensitive", CASE_SENSITIVE);
- SparkSession sparkSession = SparkSession.builder()
- .config(conf)
- .getOrCreate();
- Dataset<Row> meetupDF = sparkSession.readStream()
- .format(STREAM_FORMAT)
- .option("kafka.bootstrap.servers", KAFKA_BROKERS)
- .option("subscribe", KAFKA_TOPIC)
- .load();
- meetupDF.printSchema();
- Dataset<Row> rsvpAndTimestampDF = meetupDF
- .select(col("timestamp"),
- from_json(col("value").cast("string"), RSVP_SCHEMA)
- .alias("rsvp"))
- .alias("meetup")
- .select("meetup.*");
- rsvpAndTimestampDF.printSchema();
- Dataset<Row> window = rsvpAndTimestampDF
- .withWatermark("timestamp", "1 minute")
- .groupBy(
- window(col("timestamp"), "4 minutes", "2 minutes"),
- col("rsvp.guests"))
- .count();
- StreamingQuery query = window.writeStream()
- .outputMode("complete")
- .format("console")
- .option("checkpointLocation", CHECKPOINT_LOCATION)
- .option("truncate", false)
- .start();
- query.awaitTermination();
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement