Advertisement
Guest User

Untitled

a guest
Oct 6th, 2017
75
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 7.82 KB | None | 0 0
  1. package com.test.anna.KafkaSpark;
  2.  
  3. import java.sql.Connection;
  4. import java.sql.DriverManager;
  5. import java.sql.ResultSet;
  6. import java.sql.SQLException;
  7. import java.sql.Statement;
  8. import java.util.Properties;
  9. import java.util.concurrent.ExecutionException;
  10. import java.util.concurrent.Future;
  11. import java.util.concurrent.TimeUnit;
  12. import java.util.concurrent.TimeoutException;
  13. import org.apache.kafka.clients.producer.Partitioner;
  14. import org.apache.kafka.clients.producer.KafkaProducer;
  15. import org.apache.kafka.clients.producer.Producer;
  16. import org.apache.kafka.clients.producer.ProducerRecord;
  17. //import org.hibernate.validator.constraints.Length;
  18.  
  19. public class ProducerTest {
  20.  
  21. public static void main(String[] args) {
  22.  
  23. // System.setProperty("java.security.auth.login.config", "/Users/srikanth_kopparthy/Documents/keystore/client_jaas.conf");
  24.  
  25. Properties properties = new Properties();
  26. properties.put("bootstrap.servers", "localhost:9092");
  27.  
  28. properties.put("acks", "all");
  29. properties.put("retries", "0");
  30. properties.put("batch.size", "16384");
  31. properties.put("linger.ms", "1");
  32. properties.put("request.timeout.ms", "150000");
  33. properties.put("buffer.memory", "33554432");
  34. properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  35. //properties.put("partitioner.class", "SimplePartitioner");
  36. properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  37.  
  38. Producer producer = new KafkaProducer(properties);
  39.  
  40. //JDBC Connection
  41. // JDBC driver name and database URL
  42. String JDBC_DRIVER = "oracle.jdbc.driver.OracleDriver";
  43. String DB_URL = "jdbc:oracle:thin:@xyz:1976:wsa";
  44.  
  45. // Database credentials
  46. String USER = "xyz";
  47. String PASS = "abc";
  48.  
  49. Connection conn = null;
  50. Statement stmt = null;
  51. try{
  52. //STEP 2: Register JDBC driver
  53. Class.forName("oracle.jdbc.driver.OracleDriver");
  54.  
  55. //STEP 3: Open a connection
  56. System.out.println("Connecting to database...");
  57. conn = DriverManager.getConnection(DB_URL,USER,PASS);
  58.  
  59. //STEP 4: Execute a query
  60. System.out.println("Creating statement...");
  61. stmt = conn.createStatement();
  62. String sql;
  63. sql = "select * from AP_SALESWS_COMP_CM WHERE ROWNUM <= 100";
  64. ResultSet rs = stmt.executeQuery(sql);
  65.  
  66. //STEP 5: Extract data from result set
  67. while(rs.next()){
  68. //Retrieve by column name
  69. String bo_id = rs.getString("BO_ID");
  70. String profile_cm_seq = rs.getString("PROFILE_CM_SEQ");
  71. String bo_cm_start_dt = rs.getString("BO_CM_START_DT");
  72. String bo_cm_end_dt = rs.getString("BO_CM_END_DT");
  73. String cm_id = rs.getString("CM_ID");
  74.  
  75. //Display values
  76. System.out.print("BO_ID: " + bo_id);
  77.  
  78. // String msg = "Mdm Testing : Published Message from sample Producer Client " + i;
  79. // Future response = producer.send(new ProducerRecord("test6","BO_ID "+ bo_id + " " +"PRFILE SEQ " + profile_cm_seq + " " + "START DATE " + bo_cm_start_dt + " " + "END DATE " + bo_cm_end_dt + " " + "CM ID " + cm_id));
  80. Future response = producer.send(new ProducerRecord("test7", bo_id));
  81. try {
  82. response.get(60, TimeUnit.SECONDS);
  83. while(response.isDone()) {
  84. System.out.println(response.get().toString());
  85. break;
  86. }
  87. System.out.println("Sent:" + bo_id);
  88. } catch (InterruptedException e1) {
  89. System.out.println("interrupted exception..");
  90. e1.printStackTrace();
  91. } catch (ExecutionException e1) {
  92. System.out.println("execution exception..");
  93. e1.printStackTrace();
  94. } catch (TimeoutException e1) {
  95. System.out.println("TimeoutException..");
  96. e1.printStackTrace();
  97. }
  98.  
  99.  
  100.  
  101. }
  102. //STEP 6: Clean-up environment
  103. rs.close();
  104. stmt.close();
  105. conn.close();
  106. }catch(SQLException se){
  107. //Handle errors for JDBC
  108. se.printStackTrace();
  109. }catch(Exception e){
  110. //Handle errors for Class.forName
  111. e.printStackTrace();
  112. }finally{
  113. //finally block used to close resources
  114. try{
  115. if(stmt!=null)
  116. stmt.close();
  117. }catch(SQLException se2){
  118. }// nothing we can do
  119. try{
  120. if(conn!=null)
  121. conn.close();
  122. }catch(SQLException se){
  123. se.printStackTrace();
  124. }//end finally try
  125. }//end try
  126. System.out.println("Goodbye!");
  127.  
  128. //JDBC Connection END
  129.  
  130.  
  131.  
  132.  
  133.  
  134.  
  135. }
  136.  
  137. }
  138.  
  139. package com.test.anna.KafkaSpark;
  140. import java.util.Map;
  141. import static com.datastax.spark.connector.japi.CassandraJavaUtil.*;
  142. import static com.datastax.spark.connector.japi.CassandraStreamingJavaUtil.*;
  143. import static com.datastax.spark.connector.japi.DStreamJavaFunctions.*;
  144. import java.util.ArrayList;
  145. import java.util.Arrays;
  146. import java.util.Collections;
  147. import java.util.HashMap;
  148. import java.util.List;
  149. import java.util.Set;
  150. import java.util.function.Function;
  151. import com.datastax.driver.core.Session;
  152.  
  153. import org.apache.commons.collections.KeyValue;
  154. import org.apache.kafka.common.TopicPartition;
  155. import org.apache.spark.SparkConf;
  156. import org.apache.spark.SparkContext;
  157. import org.apache.spark.api.java.JavaRDD;
  158. import org.apache.spark.api.java.JavaSparkContext;
  159. import org.apache.spark.api.java.function.FlatMapFunction;
  160. import org.apache.spark.api.java.function.PairFunction;
  161. import org.apache.spark.streaming.Duration;
  162. import org.apache.spark.streaming.api.java.JavaDStream;
  163. import org.apache.spark.streaming.api.java.JavaPairInputDStream;
  164. import org.apache.spark.streaming.api.java.JavaStreamingContext;
  165. import org.apache.spark.streaming.kafka.KafkaUtils;
  166. import com.datastax.spark.connector.cql.CassandraConnector;
  167. import com.datastax.spark.connector.japi.CassandraJavaUtil;
  168. import com.datastax.spark.connector.japi.CassandraStreamingJavaUtil;
  169. import com.datastax.spark.connector.writer.RowWriterFactory;
  170. import kafka.serializer.StringDecoder;
  171. import scala.Tuple2;
  172.  
  173.  
  174. public class SparkStreamingConsumer {
  175. public static void main(String[] args) {
  176. // TODO Auto-generated method stub
  177. SparkConf conf = new SparkConf()
  178. .setAppName("kafka-sandbox")
  179. .setMaster("local[*]")
  180. .set("spark.cassandra.connection.host","localhost"); //for cassandra
  181. JavaSparkContext sc = new JavaSparkContext(conf);
  182. JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(20000));
  183.  
  184. // TODO: processing pipeline
  185. Map<String, String> kafkaParams = new HashMap();
  186. kafkaParams.put("metadata.broker.list", "localhost:9092");
  187. kafkaParams.put("zookeeper.connect","localhost:2181");
  188. Set<String> topics = Collections.singleton("test7");
  189. JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc,
  190. String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);
  191.  
  192. directKafkaStream.foreachRDD(rdd -> {
  193. System.out.println("Message Received "+rdd.values().take(1));
  194. System.out.println("--- New RDD with " + rdd.partitions().size()
  195. + " partitions and " + rdd.count() + " records");
  196. rdd.foreach(record -> System.out.println(record._2));
  197. System.out.println("Count by key---->>>>>"+rdd.countByKey());
  198.  
  199. });
  200.  
  201. ssc.start();
  202. ssc.awaitTermination();
  203. }
  204.  
  205.  
  206.  
  207.  
  208. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement