Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.test.anna.KafkaSpark;
- import java.sql.Connection;
- import java.sql.DriverManager;
- import java.sql.ResultSet;
- import java.sql.SQLException;
- import java.sql.Statement;
- import java.util.Properties;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.Future;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.TimeoutException;
- import org.apache.kafka.clients.producer.Partitioner;
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.Producer;
- import org.apache.kafka.clients.producer.ProducerRecord;
- //import org.hibernate.validator.constraints.Length;
- public class ProducerTest {
- public static void main(String[] args) {
- // System.setProperty("java.security.auth.login.config", "/Users/srikanth_kopparthy/Documents/keystore/client_jaas.conf");
- Properties properties = new Properties();
- properties.put("bootstrap.servers", "localhost:9092");
- properties.put("acks", "all");
- properties.put("retries", "0");
- properties.put("batch.size", "16384");
- properties.put("linger.ms", "1");
- properties.put("request.timeout.ms", "150000");
- properties.put("buffer.memory", "33554432");
- properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- //properties.put("partitioner.class", "SimplePartitioner");
- properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- Producer producer = new KafkaProducer(properties);
- //JDBC Connection
- // JDBC driver name and database URL
- String JDBC_DRIVER = "oracle.jdbc.driver.OracleDriver";
- String DB_URL = "jdbc:oracle:thin:@xyz:1976:wsa";
- // Database credentials
- String USER = "xyz";
- String PASS = "abc";
- Connection conn = null;
- Statement stmt = null;
- try{
- //STEP 2: Register JDBC driver
- Class.forName("oracle.jdbc.driver.OracleDriver");
- //STEP 3: Open a connection
- System.out.println("Connecting to database...");
- conn = DriverManager.getConnection(DB_URL,USER,PASS);
- //STEP 4: Execute a query
- System.out.println("Creating statement...");
- stmt = conn.createStatement();
- String sql;
- sql = "select * from AP_SALESWS_COMP_CM WHERE ROWNUM <= 100";
- ResultSet rs = stmt.executeQuery(sql);
- //STEP 5: Extract data from result set
- while(rs.next()){
- //Retrieve by column name
- String bo_id = rs.getString("BO_ID");
- String profile_cm_seq = rs.getString("PROFILE_CM_SEQ");
- String bo_cm_start_dt = rs.getString("BO_CM_START_DT");
- String bo_cm_end_dt = rs.getString("BO_CM_END_DT");
- String cm_id = rs.getString("CM_ID");
- //Display values
- System.out.print("BO_ID: " + bo_id);
- // String msg = "Mdm Testing : Published Message from sample Producer Client " + i;
- // Future response = producer.send(new ProducerRecord("test6","BO_ID "+ bo_id + " " +"PRFILE SEQ " + profile_cm_seq + " " + "START DATE " + bo_cm_start_dt + " " + "END DATE " + bo_cm_end_dt + " " + "CM ID " + cm_id));
- Future response = producer.send(new ProducerRecord("test7", bo_id));
- try {
- response.get(60, TimeUnit.SECONDS);
- while(response.isDone()) {
- System.out.println(response.get().toString());
- break;
- }
- System.out.println("Sent:" + bo_id);
- } catch (InterruptedException e1) {
- System.out.println("interrupted exception..");
- e1.printStackTrace();
- } catch (ExecutionException e1) {
- System.out.println("execution exception..");
- e1.printStackTrace();
- } catch (TimeoutException e1) {
- System.out.println("TimeoutException..");
- e1.printStackTrace();
- }
- }
- //STEP 6: Clean-up environment
- rs.close();
- stmt.close();
- conn.close();
- }catch(SQLException se){
- //Handle errors for JDBC
- se.printStackTrace();
- }catch(Exception e){
- //Handle errors for Class.forName
- e.printStackTrace();
- }finally{
- //finally block used to close resources
- try{
- if(stmt!=null)
- stmt.close();
- }catch(SQLException se2){
- }// nothing we can do
- try{
- if(conn!=null)
- conn.close();
- }catch(SQLException se){
- se.printStackTrace();
- }//end finally try
- }//end try
- System.out.println("Goodbye!");
- //JDBC Connection END
- }
- }
- package com.test.anna.KafkaSpark;
- import java.util.Map;
- import static com.datastax.spark.connector.japi.CassandraJavaUtil.*;
- import static com.datastax.spark.connector.japi.CassandraStreamingJavaUtil.*;
- import static com.datastax.spark.connector.japi.DStreamJavaFunctions.*;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.Collections;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Set;
- import java.util.function.Function;
- import com.datastax.driver.core.Session;
- import org.apache.commons.collections.KeyValue;
- import org.apache.kafka.common.TopicPartition;
- import org.apache.spark.SparkConf;
- import org.apache.spark.SparkContext;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.FlatMapFunction;
- import org.apache.spark.api.java.function.PairFunction;
- import org.apache.spark.streaming.Duration;
- import org.apache.spark.streaming.api.java.JavaDStream;
- import org.apache.spark.streaming.api.java.JavaPairInputDStream;
- import org.apache.spark.streaming.api.java.JavaStreamingContext;
- import org.apache.spark.streaming.kafka.KafkaUtils;
- import com.datastax.spark.connector.cql.CassandraConnector;
- import com.datastax.spark.connector.japi.CassandraJavaUtil;
- import com.datastax.spark.connector.japi.CassandraStreamingJavaUtil;
- import com.datastax.spark.connector.writer.RowWriterFactory;
- import kafka.serializer.StringDecoder;
- import scala.Tuple2;
- public class SparkStreamingConsumer {
- public static void main(String[] args) {
- // TODO Auto-generated method stub
- SparkConf conf = new SparkConf()
- .setAppName("kafka-sandbox")
- .setMaster("local[*]")
- .set("spark.cassandra.connection.host","localhost"); //for cassandra
- JavaSparkContext sc = new JavaSparkContext(conf);
- JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(20000));
- // TODO: processing pipeline
- Map<String, String> kafkaParams = new HashMap();
- kafkaParams.put("metadata.broker.list", "localhost:9092");
- kafkaParams.put("zookeeper.connect","localhost:2181");
- Set<String> topics = Collections.singleton("test7");
- JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc,
- String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);
- directKafkaStream.foreachRDD(rdd -> {
- System.out.println("Message Received "+rdd.values().take(1));
- System.out.println("--- New RDD with " + rdd.partitions().size()
- + " partitions and " + rdd.count() + " records");
- rdd.foreach(record -> System.out.println(record._2));
- System.out.println("Count by key---->>>>>"+rdd.countByKey());
- });
- ssc.start();
- ssc.awaitTermination();
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement