Advertisement
Guest User

Untitled

a guest
Feb 23rd, 2016
71
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.22 KB | None | 0 0
  1. public static void main(String[] args) throws Exception{
  2.  
  3. Map map = Maps.newHashMap();
  4. map.put("dataSourceClassName", "org.postgresql.ds.PGSimpleDataSource");
  5. map.put("dataSource.url","jdbc:postgresql://localhost:5432/test?user=postgres");
  6.  
  7. ConnectionProvider cp = new MyConnectionProvider(map);
  8.  
  9. String argument = args[0];
  10. Config conf = new Config();
  11. conf.put(JDBC_CONF, map);
  12. conf.setDebug(true);
  13. conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 3);
  14. //set the number of workers
  15. conf.setNumWorkers(3);
  16.  
  17. TopologyBuilder builder = new TopologyBuilder();
  18.  
  19. //Setup Kafka spout
  20. BrokerHosts hosts = new ZkHosts("localhost:2181");
  21. String topic = "test";
  22. String zkRoot = "";
  23. String consumerGroupId = "group1";
  24. SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, consumerGroupId);
  25. spoutConfig.scheme = new RawMultiScheme();
  26. spoutConfig.scheme = new SchemeAsMultiScheme(new Search_Parser());
  27. KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
  28. builder.setSpout("KafkaSpout", kafkaSpout);
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement