Advertisement
Guest User

Untitled

a guest
Aug 31st, 2016
103
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.65 KB | None | 0 0
  1. agent1.channels = ch1
  2. agent1.channels.ch1.type = memory
  3. agent1.channels.ch1.capacity = 100000
  4. agent1.channels.ch1.transactionCapacity = 10000
  5.  
  6. agent1.sources = sqlSource
  7. agent1.sources.sqlSource.channels = ch1
  8. agent1.sources.sqlSource.type = org.keedio.flume.source.SQLSource
  9.  
  10. # Database connection properties
  11. agent1.sources.sqlSource.connection.url = jdbc:oracle:thin:@10.91.35.21:1521/userdb
  12. agent1.sources.sqlSource.user = XXXXXXX
  13. agent1.sources.sqlSource.password = XXXXXXX
  14. agent1.sources.sqlSource.table = MV_USR_TXNO
  15. agent1.sources.sqlSource.database = userdb
  16.  
  17. # Query delay, each configured milisecond the query will be sent
  18. agent1.sources.sqlSource.run.query.delay=10000
  19.  
  20. # Status file is used to save last readed row
  21. agent1.sources.sqlSource.status.file.path = c:/opt/temp/data/input/txt/sql
  22. agent1.sources.sqlSource.status.file.name = sqlSource.status
  23.  
  24. agent1.sinks = spark
  25. agent1.sinks.spark.channel = ch1
  26. agent1.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
  27. agent1.sinks.spark.hostname = localhost
  28. agent1.sinks.spark.port = 41412
  29.  
  30. flume-ng agent -name agent1 -f ../conf/flume.conf -property "flume.root.logger=INFO,LOGFILE,console;"
  31.  
  32. {"LastIndex":"392"}
  33.  
  34. import org.apache.log4j.Level;
  35. import org.apache.log4j.Logger;
  36. import org.apache.spark.SparkConf;
  37. import org.apache.spark.streaming.Duration;
  38. import org.apache.spark.streaming.api.java.JavaDStream;
  39. import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
  40. import org.apache.spark.streaming.api.java.JavaStreamingContext;
  41. import org.apache.spark.streaming.flume.FlumeUtils;
  42. import org.apache.spark.streaming.flume.SparkFlumeEvent;
  43.  
  44. public class FlumeEventTest {
  45.  
  46. public static void main(String[] args) {
  47.  
  48. Logger.getLogger("org").setLevel(Level.OFF);
  49. Logger.getLogger("akka").setLevel(Level.OFF);
  50.  
  51. Duration batchInterval = new Duration(5000);
  52. SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount").setMaster("local");
  53. sparkConf.set("spark.driver.allowMultipleContexts", "true");
  54. JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,batchInterval);
  55. JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createPollingStream(ssc, "localhost", 41412);
  56.  
  57. JavaDStream<String> dataList = flumeStream.map(sfe ->{
  58. byte[] buffer = sfe.event().getBody().array();
  59. String data = new String(buffer);
  60. System.out.println("!!!!!!! Data Received...."+data);
  61. return data;
  62. });
  63.  
  64. dataList.count().map(in -> {
  65. return "Received " + in + " flume events.";
  66. }).print();
  67.  
  68. ssc.start();
  69. ssc.awaitTermination();
  70. }
  71.  
  72. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement