Advertisement
Guest User

Untitled

a guest
Mar 22nd, 2019
135
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 14.96 KB | None | 0 0
  1. <properties>
  2. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  3. <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  4. <java.version>1.8</java.version>
  5. <spring-cloud.version>Finchley.BUILD-SNAPSHOT</spring-cloud.version>
  6. </properties>
  7.  
  8. <dependency>
  9. <groupId>org.springframework.boot</groupId>
  10. <artifactId>spring-boot-starter-websocket</artifactId>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.springframework.cloud</groupId>
  14. <artifactId>spring-cloud-stream</artifactId>
  15. </dependency>
  16. <dependency>
  17. <groupId>org.springframework.cloud</groupId>
  18. <artifactId>spring-cloud-stream-binder-kafka</artifactId>
  19. </dependency>
  20. <dependency>
  21. <groupId>org.springframework.boot</groupId>
  22. <artifactId>spring-boot-starter-test</artifactId>
  23. <scope>test</scope>
  24. </dependency>
  25. <dependency>
  26. <groupId>org.springframework.cloud</groupId>
  27. <artifactId>spring-cloud-stream-test-support</artifactId>
  28. <scope>test</scope>
  29. </dependency>
  30.  
  31. <dependencyManagement>
  32. <dependencies>
  33. <dependency>
  34. <groupId>org.springframework.cloud</groupId>
  35. <artifactId>spring-cloud-dependencies</artifactId>
  36. <version>${spring-cloud.version}</version>
  37. <type>pom</type>
  38. <scope>import</scope>
  39. </dependency>
  40. </dependencies>
  41. </dependencyManagement>
  42.  
  43. spring.cloud.stream.kafka.binder.zkNodes=localhost:2181
  44. spring.cloud.stream.kafka.binder.brokers=localhost:9092
  45.  
  46. spring.cloud.stream.bindings.output.destination=meetupTopic
  47. spring.cloud.stream.bindings.output.producer.partitionCount=1
  48.  
  49. spring.cloud.stream.bindings.output.content-type=text/plain
  50. spring.cloud.stream.bindings.output.producer.headerMode=raw
  51.  
  52. server.port=8081
  53. ===== Producer
  54. @SpringBootApplication
  55. public class RsvpApplication {
  56.  
  57. private static final String MEETUP_RSVPS_ENDPOINT = "ws://stream.meetup.com/2/rsvps";
  58.  
  59. public static void main(String[] args) {
  60. SpringApplication.run(RsvpApplication.class, args);
  61. }
  62.  
  63. @Bean
  64. public ApplicationRunner initializeConnection(
  65. RsvpsWebSocketHandler rsvpsWebSocketHandler) {
  66. return args -> {
  67. WebSocketClient rsvpsSocketClient = new StandardWebSocketClient();
  68.  
  69. rsvpsSocketClient.doHandshake(
  70. rsvpsWebSocketHandler, MEETUP_RSVPS_ENDPOINT);
  71. };
  72. }
  73. }
  74.  
  75. @Component
  76. @EnableBinding(Source.class)
  77. public class RsvpsKafkaProducer {
  78.  
  79. private static final int SENDING_MESSAGE_TIMEOUT_MS = 10000;
  80.  
  81. private final Source source;
  82.  
  83. public RsvpsKafkaProducer(Source source) {
  84. this.source = source;
  85. }
  86.  
  87. public void sendRsvpMessage(WebSocketMessage<?> message) {
  88.  
  89. source.output()
  90. .send(MessageBuilder.withPayload(message.getPayload())
  91. .build(),
  92. SENDING_MESSAGE_TIMEOUT_MS);
  93. }
  94. }
  95.  
  96. @Component
  97. class RsvpsWebSocketHandler extends AbstractWebSocketHandler {
  98.  
  99. private static final Logger logger =
  100. Logger.getLogger(RsvpsWebSocketHandler.class.getName());
  101.  
  102. private final RsvpsKafkaProducer rsvpsKafkaProducer;
  103.  
  104. public RsvpsWebSocketHandler(RsvpsKafkaProducer rsvpsKafkaProducer) {
  105. this.rsvpsKafkaProducer = rsvpsKafkaProducer;
  106. }
  107.  
  108. @Override
  109. public void handleMessage(WebSocketSession session,
  110. WebSocketMessage<?> message) {
  111. logger.log(Level.INFO, "New RSVP:\n {0}", message.getPayload());
  112.  
  113. rsvpsKafkaProducer.sendRsvpMessage(message);
  114. }
  115. }
  116.  
  117. ============= Consumer
  118. <dependencies>
  119. <dependency>
  120. <groupId>org.scala-lang</groupId>
  121. <artifactId>scala-library</artifactId>
  122. <version>2.11.8</version>
  123. </dependency>
  124. <dependency>
  125. <groupId>org.apache.spark</groupId>
  126. <artifactId>spark-core_2.11</artifactId>
  127. <version>2.2.1</version>
  128. </dependency>
  129. <dependency>
  130. <groupId>org.apache.spark</groupId>
  131. <artifactId>spark-streaming_2.11</artifactId>
  132. <version>2.2.1</version>
  133. </dependency>
  134. <dependency>
  135. <groupId>org.apache.spark</groupId>
  136. <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
  137. <version>2.2.1</version>
  138. </dependency>
  139. <dependency>
  140. <groupId>org.mongodb.spark</groupId>
  141. <artifactId>mongo-spark-connector_2.11</artifactId>
  142. <version>2.2.1</version>
  143. </dependency>
  144. <dependency>
  145. <groupId>org.apache.spark</groupId>
  146. <artifactId>spark-sql_2.11</artifactId>
  147. <version>2.2.1</version>
  148. </dependency>
  149. </dependencies>
  150.  
  151. <build>
  152. <pluginManagement>
  153. <plugins>
  154. <plugin>
  155. <groupId>org.scala-tools</groupId>
  156. <artifactId>maven-scala-plugin</artifactId>
  157. <version>2.15.2</version>
  158. </plugin>
  159. <plugin>
  160. <groupId>org.apache.maven.plugins</groupId>
  161. <artifactId>maven-compiler-plugin</artifactId>
  162. <version>2.5.1</version>
  163. </plugin>
  164. </plugins>
  165. </pluginManagement>
  166. <plugins>
  167. <plugin>
  168. <groupId>net.alchim31.maven</groupId>
  169. <artifactId>scala-maven-plugin</artifactId>
  170. <version>3.1.6</version>
  171. <configuration>
  172. <scalaCompatVersion>2.11</scalaCompatVersion>
  173. <scalaVersion>2.11.8</scalaVersion>
  174. </configuration>
  175. <!-- other settings-->
  176. </plugin>
  177. <plugin>
  178. <groupId>org.apache.maven.plugins</groupId>
  179. <artifactId>maven-compiler-plugin</artifactId>
  180. <executions>
  181. <execution>
  182. <phase>compile</phase>
  183. <goals>
  184. <goal>compile</goal>
  185. </goals>
  186. </execution>
  187. </executions>
  188. <configuration>
  189. <source>1.8</source>
  190. <target>1.8</target>
  191. </configuration>
  192. </plugin>
  193. </plugins>
  194. </build>
  195.  
  196.  
  197. public class StreamingRsvpsDStream {
  198.  
  199. private static final String APPLICATION_NAME = "Streaming Rsvps DStream";
  200. private static final String HADOOP_HOME_DIR_VALUE = "C:/winutils";
  201. private static final String RUN_LOCAL_WITH_AVAILABLE_CORES = "local[*]";
  202. private static final int BATCH_DURATION_INTERVAL_MS = 5000;
  203.  
  204. private static final Map<String, Object> KAFKA_CONSUMER_PROPERTIES;
  205.  
  206. private static final String KAFKA_BROKERS = "localhost:9092";
  207. private static final String KAFKA_OFFSET_RESET_TYPE = "latest";
  208. private static final String KAFKA_GROUP = "meetupGroup";
  209. private static final String KAFKA_TOPIC = "meetupTopic";
  210. private static final Collection<String> TOPICS =
  211. Collections.unmodifiableList(Arrays.asList(KAFKA_TOPIC));
  212.  
  213. static {
  214. Map<String, Object> kafkaProperties = new HashMap<>();
  215. kafkaProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKERS);
  216. kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  217. kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  218. kafkaProperties.put(ConsumerConfig.GROUP_ID_CONFIG, KAFKA_GROUP);
  219. kafkaProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, KAFKA_OFFSET_RESET_TYPE);
  220. kafkaProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  221.  
  222. KAFKA_CONSUMER_PROPERTIES = Collections.unmodifiableMap(kafkaProperties);
  223. }
  224.  
  225. private static final String MONGODB_OUTPUT_URI = "mongodb://localhost/meetupDB.rsvpsguests";
  226.  
  227. public static void main(String[] args) throws InterruptedException {
  228.  
  229. System.setProperty("hadoop.home.dir", HADOOP_HOME_DIR_VALUE);
  230.  
  231. final SparkConf conf = new SparkConf()
  232. .setMaster(RUN_LOCAL_WITH_AVAILABLE_CORES)
  233. .setAppName(APPLICATION_NAME)
  234. .set("spark.mongodb.output.uri", MONGODB_OUTPUT_URI);
  235.  
  236. final JavaStreamingContext streamingContext
  237. = new JavaStreamingContext(conf, new Duration(BATCH_DURATION_INTERVAL_MS));
  238.  
  239. final JavaInputDStream<ConsumerRecord<String, String>> meetupStream =
  240. KafkaUtils.createDirectStream(
  241. streamingContext,
  242. LocationStrategies.PreferConsistent(),
  243. ConsumerStrategies.<String, String>Subscribe(TOPICS, KAFKA_CONSUMER_PROPERTIES)
  244. );
  245.  
  246. // transformations, streaming algorithms, etc
  247. JavaDStream<ConsumerRecord<String, String>> rsvpsWithGuestsStream =
  248. meetupStream.filter(f -> !f.value().contains("\"guests\":0"));
  249.  
  250. rsvpsWithGuestsStream.foreachRDD((JavaRDD<ConsumerRecord<String, String>> r) -> {
  251. MongoSpark.save(
  252. r.map(
  253. e -> Document.parse(e.value())
  254. )
  255. );
  256. });
  257.  
  258. // some time later, after outputs have completed
  259. meetupStream.foreachRDD((JavaRDD<ConsumerRecord<String, String>> meetupRDD) -> {
  260. OffsetRange[] offsetRanges = ((HasOffsetRanges) meetupRDD.rdd()).offsetRanges();
  261.  
  262. ((CanCommitOffsets) meetupStream.inputDStream())
  263. .commitAsync(offsetRanges, new MeetupOffsetCommitCallback());
  264. });
  265.  
  266. streamingContext.start();
  267. streamingContext.awaitTermination();
  268. }
  269. }
  270.  
  271. final class MeetupOffsetCommitCallback implements OffsetCommitCallback {
  272.  
  273. private static final Logger log = Logger.getLogger(MeetupOffsetCommitCallback.class.getName());
  274.  
  275. @Override
  276. public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
  277. log.info("---------------------------------------------------");
  278. log.log(Level.INFO, "{0} | {1}", new Object[]{offsets, exception});
  279. log.info("---------------------------------------------------");
  280. }
  281. }
  282.  
  283. === To Console
  284.  
  285. public class SparkStructuredStreaming {
  286.  
  287. private static final String HADOOP_HOME_DIR_VALUE = "C:/winutils";
  288. private static final String CHECKPOINT_LOCATION = "D://rsvpck";
  289.  
  290. private static final String RUN_LOCAL_WITH_AVAILABLE_CORES = "local[*]";
  291. private static final String APPLICATION_NAME = "Spark Structured Streaming";
  292. private static final String CASE_SENSITIVE = "false";
  293.  
  294. private static final String KAFKA_BROKERS = "localhost:9092";
  295. private static final String STREAM_FORMAT = "kafka";
  296. private static final String KAFKA_TOPIC = "meetupTopic";
  297.  
  298. // * the schema can be written on disk, and read from disk
  299. // * the schema is not mandatory to be complete, it can contain only the needed fields
  300. private static final StructType RSVP_SCHEMA = new StructType()
  301. .add("venue",
  302. new StructType()
  303. .add("venue_name", StringType, true)
  304. .add("lon", DoubleType, true)
  305. .add("lat", DoubleType, true)
  306. .add("venue_id", LongType, true))
  307. .add("visibility", StringType, true)
  308. .add("response", StringType, true)
  309. .add("guests", LongType, true)
  310. .add("member",
  311. new StructType()
  312. .add("member_id", LongType, true)
  313. .add("photo", StringType, true)
  314. .add("member_name", StringType, true))
  315. .add("rsvp_id", LongType, true)
  316. .add("mtime", LongType, true)
  317. .add("event",
  318. new StructType()
  319. .add("event_name", StringType, true)
  320. .add("event_id", StringType, true)
  321. .add("time", LongType, true)
  322. .add("event_url", StringType, true))
  323. .add("group",
  324. new StructType()
  325. .add("group_city", StringType, true)
  326. .add("group_country", StringType, true)
  327. .add("group_id", LongType, true)
  328. .add("group_lat", DoubleType, true)
  329. .add("group_long", DoubleType, true)
  330. .add("group_name", StringType, true)
  331. .add("group_state", StringType, true)
  332. .add("group_topics", DataTypes.createArrayType(
  333. new StructType()
  334. .add("topicName", StringType, true)
  335. .add("urlkey", StringType, true)), true)
  336. .add("group_urlname", StringType, true));
  337.  
  338. public static void main(String[] args) throws InterruptedException, StreamingQueryException {
  339.  
  340. System.setProperty("hadoop.home.dir", HADOOP_HOME_DIR_VALUE);
  341.  
  342. final SparkConf conf = new SparkConf()
  343. .setMaster(RUN_LOCAL_WITH_AVAILABLE_CORES)
  344. .setAppName(APPLICATION_NAME)
  345. .set("spark.sql.caseSensitive", CASE_SENSITIVE);
  346.  
  347. SparkSession sparkSession = SparkSession.builder()
  348. .config(conf)
  349. .getOrCreate();
  350.  
  351. Dataset<Row> meetupDF = sparkSession.readStream()
  352. .format(STREAM_FORMAT)
  353. .option("kafka.bootstrap.servers", KAFKA_BROKERS)
  354. .option("subscribe", KAFKA_TOPIC)
  355. .load();
  356.  
  357. meetupDF.printSchema();
  358.  
  359. Dataset<Row> rsvpAndTimestampDF = meetupDF
  360. .select(col("timestamp"),
  361. from_json(col("value").cast("string"), RSVP_SCHEMA)
  362. .alias("rsvp"))
  363. .alias("meetup")
  364. .select("meetup.*");
  365.  
  366. rsvpAndTimestampDF.printSchema();
  367.  
  368. Dataset<Row> window = rsvpAndTimestampDF
  369. .withWatermark("timestamp", "1 minute")
  370. .groupBy(
  371. window(col("timestamp"), "4 minutes", "2 minutes"),
  372. col("rsvp.guests"))
  373. .count();
  374.  
  375. StreamingQuery query = window.writeStream()
  376. .outputMode("complete")
  377. .format("console")
  378. .option("checkpointLocation", CHECKPOINT_LOCATION)
  379. .option("truncate", false)
  380. .start();
  381.  
  382. query.awaitTermination();
  383. }
  384. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement