Advertisement
Guest User

Untitled

a guest
Feb 22nd, 2016
84
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.37 KB | None | 0 0
  1. **$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test**
  2.  
  3. public static void main(String[] args) throws Exception{
  4.  
  5. Map map = Maps.newHashMap();
  6. map.put("dataSourceClassName", "org.postgresql.ds.PGSimpleDataSource");
  7. map.put("dataSource.url","jdbc:postgresql://localhost:5432/test?user=postgres");
  8.  
  9. ConnectionProvider cp = new MyConnectionProvider(map);
  10.  
  11. String argument = args[0];
  12. Config conf = new Config();
  13. conf.put(JDBC_CONF, map);
  14. conf.setDebug(true);
  15. conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
  16. //set the number of workers
  17. conf.setNumWorkers(3);
  18.  
  19. TopologyBuilder builder = new TopologyBuilder();
  20.  
  21. //Setup Kafka spout
  22. BrokerHosts hosts = new ZkHosts("localhost:2181");
  23. String topic = "twitter_test2"; //twitter
  24. String zkRoot = "";
  25. String consumerGroupId = "group1";
  26. SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, consumerGroupId);
  27. spoutConfig.scheme = new RawMultiScheme();
  28. spoutConfig.scheme = new SchemeAsMultiScheme(new Search_Parser());
  29. KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
  30. builder.setSpout("KafkaSpout", kafkaSpout);
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement