Advertisement
Guest User

TridentWordCount

a guest
Sep 10th, 2013
221
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 3.31 KB | None | 0 0
  1. package test;
  2.  
  3. import storm.trident.TridentState;
  4. import storm.trident.TridentTopology;
  5. import storm.trident.operation.builtin.Count;
  6. import storm.trident.operation.builtin.FilterNull;
  7. import storm.trident.operation.builtin.MapGet;
  8. import storm.trident.operation.builtin.Sum;
  9. import storm.trident.testing.FixedBatchSpout;
  10. import storm.trident.testing.MemoryMapState;
  11. import storm.trident.testing.Split;
  12. import backtype.storm.Config;
  13. import backtype.storm.LocalCluster;
  14. import backtype.storm.LocalDRPC;
  15. import backtype.storm.generated.StormTopology;
  16. import backtype.storm.tuple.Fields;
  17. import backtype.storm.tuple.Values;
  18.  
  19.  
  20. public class TridentWordCount {    
  21.    
  22.     public static StormTopology buildTopology(LocalDRPC drpc) {
  23.        
  24.         // Spout para el test
  25.         FixedBatchSpout spout = new FixedBatchSpout(new Fields("title", "text"), 3,
  26.                 new Values("Don Quijote de la Mancha", "Es una novela escrita por el español Miguel de Cervantes Saavedra. Publicada su primera parte con el título de El ingenioso hidalgo don Quijote de la Mancha a comienzos de 1605"),
  27.                 new Values("El nombre de la rosa", "Es una novela de misterio e histórica de Umberto Eco publicada en 1980"),
  28.                 new Values("Lazarillo de Tormes", "Es una novela española anónima, escrita en primera persona y en estilo epistolar (como una sola y larga carta), cuya edición conocida más antigua data de 1554"));
  29.        
  30.         // Topologia para el contado de palabras
  31.         TridentTopology topology = new TridentTopology();        
  32.         TridentState wordCounts =
  33.               topology.newStream("spout", spout)
  34.                 .parallelismHint(3)
  35.                 .each(new Fields("text"), new Split(), new Fields("word"))
  36.                 .groupBy(new Fields("word"))
  37.                 .persistentAggregate(new MemoryMapState.Factory(),
  38.                                      new Count(), new Fields("count"))        
  39.                 .parallelismHint(3);
  40.                
  41.         // Topologia DRPC para la consulta del numero de ocurrencias de las palabras
  42.         topology.newDRPCStream("words", drpc)
  43.                 .each(new Fields("args"), new Split(), new Fields("word"))
  44.                 .groupBy(new Fields("word"))
  45.                 .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
  46.                 .each(new Fields("count"), new FilterNull())
  47.                 .aggregate(new Fields("count"), new Sum(), new Fields("sum"));
  48.        
  49.         return topology.build();
  50.     }
  51.    
  52.     public static void main(String[] args) throws Exception {
  53.         Config conf = new Config();
  54.         conf.setMaxSpoutPending(20);
  55.         LocalDRPC drpc = new LocalDRPC();
  56.         LocalCluster cluster = new LocalCluster();
  57.         cluster.submitTopology("wordCounter", conf, buildTopology(drpc));        
  58.         for(int i=0; i<2; i++) {
  59.             System.out.println("Nº de ocurrencias de Es: " + drpc.execute("words", "Es"));
  60.             System.out.println("Nº de ocurrencias de una: " + drpc.execute("words", "una"));
  61.             System.out.println("Nº de ocurrencias de novela: " + drpc.execute("words", "novela"));
  62.             System.out.println("Nº de ocurrencias de primera:" + drpc.execute("words", "primera"));                
  63.             Thread.sleep(1000);
  64.         }
  65.     }
  66. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement