Advertisement
Guest User

Topo Java

a guest
Mar 3rd, 2017
101
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 3.92 KB | None | 0 0
  1. // TwitterMySQLTopo : main class
  2.  
  3. package storm.starter.bigdata;
  4.  
  5. import org.apache.storm.Config;
  6. import org.apache.storm.LocalCluster;
  7. import org.apache.storm.StormSubmitter;
  8. import org.apache.storm.topology.TopologyBuilder;
  9. import org.apache.storm.utils.Utils;
  10. import storm.starter.bigdata.bolt.TwitterToMysqlBolt;
  11. import storm.starter.bigdata.spout.SQLSpout;
  12. import storm.starter.bigdata.util.MyProperties;
  13.  
  14. public class TwitterMysqlTopology {
  15.  
  16.     public static void main(String[] args) throws Exception {
  17.         TopologyBuilder builder = new TopologyBuilder();
  18.         builder.setSpout("sqlinput", new SQLSpout("jdbc:mysql://localhost:3306/databasetopology", "root", "poiuytrewq"));
  19.        
  20.         builder.setBolt("push", new TwitterToMysqlBolt(), 2).shuffleGrouping("sqlinput");
  21.  
  22.         Config conf = new Config();
  23.         conf.setDebug(false);
  24.  
  25.         if (args != null && args.length > 0)
  26.         {
  27.             conf.setNumWorkers(1);
  28.  
  29.             StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
  30.         }
  31.         else
  32.         {
  33.  
  34.             LocalCluster cluster = new LocalCluster();
  35.             cluster.submitTopology("twitterfilter", conf, builder.createTopology());
  36.             Utils.sleep(640000);
  37.             cluster.killTopology("twitterfilter");
  38.             cluster.shutdown();
  39.         }
  40.     }
  41. }
  42.  
  43.  
  44. // Spout
  45. package storm.starter.bigdata.spout;
  46.  
  47. import org.apache.storm.Config;
  48. import org.apache.storm.spout.SpoutOutputCollector;
  49. import org.apache.storm.task.TopologyContext;
  50. import org.apache.storm.topology.OutputFieldsDeclarer;
  51. import org.apache.storm.topology.base.BaseRichSpout;
  52. import org.apache.storm.tuple.Fields;
  53. import org.apache.storm.tuple.Values;
  54. import org.apache.storm.utils.Utils;
  55. import twitter4j.*;
  56. import twitter4j.conf.ConfigurationBuilder;
  57.  
  58. import java.sql.Connection;
  59. import java.sql.DriverManager;
  60. import java.sql.Statement;
  61. import java.sql.SQLException;
  62.  
  63. import java.util.Map;
  64. import java.util.concurrent.LinkedBlockingQueue;
  65.  
  66. /**
  67.  * Created by Knut on 01/09/2015.
  68.  */
  69. public class SQLSpout extends BaseRichSpout
  70. {
  71.     private final String url;
  72.     private final String username;
  73.     private final String password;
  74.     private Connection connection;
  75.  
  76.     public SQLSpout(String url, String user, String pass)
  77.     {
  78.         this.url = url;
  79.         this.username = user;
  80.         this.password = pass;
  81.     }
  82.  
  83.     @Override
  84.     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)
  85.     {
  86.         System.out.println("Connecting database...");
  87.        
  88.         try (this.connection = DriverManager.getConnection(this.url, this.username, this.password))
  89.         {
  90.             System.out.println("Database connected!");
  91.         }
  92.         catch (SQLException e)
  93.         {
  94.             System.out.println("Error while connecting to database.");
  95.         }
  96.        
  97.         StatusListener listener = new StatusListener() {
  98.  
  99.             @Override
  100.             public void onStatus(Status status)
  101.             {
  102.                
  103.             }
  104.  
  105.             @Override
  106.             public void onDeletionNotice(StatusDeletionNotice sdn)
  107.             {
  108.             }
  109.  
  110.             @Override
  111.             public void onTrackLimitationNotice(int i)
  112.             {
  113.             }
  114.  
  115.             @Override
  116.             public void onScrubGeo(long l, long l1)
  117.             {
  118.             }
  119.  
  120.             @Override
  121.             public void onException(Exception ex)
  122.             {
  123.             }
  124.  
  125.             @Override
  126.             public void onStallWarning(StallWarning arg0)
  127.             {
  128.                
  129.             }
  130.  
  131.         };
  132.     }
  133.  
  134.     @Override
  135.     public void nextTuple()
  136.     {
  137. ss
  138.     }
  139.  
  140.     @Override
  141.     public void close()
  142.     {
  143.     }
  144.  
  145.     @Override
  146.     public Map<String, Object> getComponentConfiguration()
  147.     {
  148.     }
  149.  
  150.     @Override
  151.     public void ack(Object id)
  152.     {
  153.     }
  154.  
  155.     @Override
  156.     public void fail(Object id)
  157.     {
  158.     }
  159.  
  160.     @Override
  161.     public void declareOutputFields(OutputFieldsDeclarer declarer)
  162.     {
  163.  
  164.     }
  165. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement