Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- // TwitterMySQLTopo : main class
- package storm.starter.bigdata;
- import org.apache.storm.Config;
- import org.apache.storm.LocalCluster;
- import org.apache.storm.StormSubmitter;
- import org.apache.storm.topology.TopologyBuilder;
- import org.apache.storm.utils.Utils;
- import storm.starter.bigdata.bolt.TwitterToMysqlBolt;
- import storm.starter.bigdata.spout.SQLSpout;
- import storm.starter.bigdata.util.MyProperties;
- public class TwitterMysqlTopology {
- public static void main(String[] args) throws Exception {
- TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout("sqlinput", new SQLSpout("jdbc:mysql://localhost:3306/databasetopology", "root", "poiuytrewq"));
- builder.setBolt("push", new TwitterToMysqlBolt(), 2).shuffleGrouping("sqlinput");
- Config conf = new Config();
- conf.setDebug(false);
- if (args != null && args.length > 0)
- {
- conf.setNumWorkers(1);
- StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
- }
- else
- {
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("twitterfilter", conf, builder.createTopology());
- Utils.sleep(640000);
- cluster.killTopology("twitterfilter");
- cluster.shutdown();
- }
- }
- }
- // Spout
- package storm.starter.bigdata.spout;
- import org.apache.storm.Config;
- import org.apache.storm.spout.SpoutOutputCollector;
- import org.apache.storm.task.TopologyContext;
- import org.apache.storm.topology.OutputFieldsDeclarer;
- import org.apache.storm.topology.base.BaseRichSpout;
- import org.apache.storm.tuple.Fields;
- import org.apache.storm.tuple.Values;
- import org.apache.storm.utils.Utils;
- import twitter4j.*;
- import twitter4j.conf.ConfigurationBuilder;
- import java.sql.Connection;
- import java.sql.DriverManager;
- import java.sql.Statement;
- import java.sql.SQLException;
- import java.util.Map;
- import java.util.concurrent.LinkedBlockingQueue;
- /**
- * Created by Knut on 01/09/2015.
- */
- public class SQLSpout extends BaseRichSpout
- {
- private final String url;
- private final String username;
- private final String password;
- private Connection connection;
- public SQLSpout(String url, String user, String pass)
- {
- this.url = url;
- this.username = user;
- this.password = pass;
- }
- @Override
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)
- {
- System.out.println("Connecting database...");
- try (this.connection = DriverManager.getConnection(this.url, this.username, this.password))
- {
- System.out.println("Database connected!");
- }
- catch (SQLException e)
- {
- System.out.println("Error while connecting to database.");
- }
- StatusListener listener = new StatusListener() {
- @Override
- public void onStatus(Status status)
- {
- }
- @Override
- public void onDeletionNotice(StatusDeletionNotice sdn)
- {
- }
- @Override
- public void onTrackLimitationNotice(int i)
- {
- }
- @Override
- public void onScrubGeo(long l, long l1)
- {
- }
- @Override
- public void onException(Exception ex)
- {
- }
- @Override
- public void onStallWarning(StallWarning arg0)
- {
- }
- };
- }
- @Override
- public void nextTuple()
- {
- ss
- }
- @Override
- public void close()
- {
- }
- @Override
- public Map<String, Object> getComponentConfiguration()
- {
- }
- @Override
- public void ack(Object id)
- {
- }
- @Override
- public void fail(Object id)
- {
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer)
- {
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement