Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- agent1.channels = ch1
- agent1.channels.ch1.type = memory
- agent1.channels.ch1.capacity = 100000
- agent1.channels.ch1.transactionCapacity = 10000
- agent1.sources = sqlSource
- agent1.sources.sqlSource.channels = ch1
- agent1.sources.sqlSource.type = org.keedio.flume.source.SQLSource
- # Database connection properties
- agent1.sources.sqlSource.connection.url = jdbc:oracle:thin:@10.91.35.21:1521/userdb
- agent1.sources.sqlSource.user = XXXXXXX
- agent1.sources.sqlSource.password = XXXXXXX
- agent1.sources.sqlSource.table = MV_USR_TXNO
- agent1.sources.sqlSource.database = userdb
- # Query delay, each configured milisecond the query will be sent
- agent1.sources.sqlSource.run.query.delay=10000
- # Status file is used to save last readed row
- agent1.sources.sqlSource.status.file.path = c:/opt/temp/data/input/txt/sql
- agent1.sources.sqlSource.status.file.name = sqlSource.status
- agent1.sinks = spark
- agent1.sinks.spark.channel = ch1
- agent1.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
- agent1.sinks.spark.hostname = localhost
- agent1.sinks.spark.port = 41412
- flume-ng agent -name agent1 -f ../conf/flume.conf -property "flume.root.logger=INFO,LOGFILE,console;"
- {"LastIndex":"392"}
- import org.apache.log4j.Level;
- import org.apache.log4j.Logger;
- import org.apache.spark.SparkConf;
- import org.apache.spark.streaming.Duration;
- import org.apache.spark.streaming.api.java.JavaDStream;
- import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
- import org.apache.spark.streaming.api.java.JavaStreamingContext;
- import org.apache.spark.streaming.flume.FlumeUtils;
- import org.apache.spark.streaming.flume.SparkFlumeEvent;
- public class FlumeEventTest {
- public static void main(String[] args) {
- Logger.getLogger("org").setLevel(Level.OFF);
- Logger.getLogger("akka").setLevel(Level.OFF);
- Duration batchInterval = new Duration(5000);
- SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount").setMaster("local");
- sparkConf.set("spark.driver.allowMultipleContexts", "true");
- JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,batchInterval);
- JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createPollingStream(ssc, "localhost", 41412);
- JavaDStream<String> dataList = flumeStream.map(sfe ->{
- byte[] buffer = sfe.event().getBody().array();
- String data = new String(buffer);
- System.out.println("!!!!!!! Data Received...."+data);
- return data;
- });
- dataList.count().map(in -> {
- return "Received " + in + " flume events.";
- }).print();
- ssc.start();
- ssc.awaitTermination();
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement