Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- public static void main(String[] args) throws Exception{
- Map map = Maps.newHashMap();
- map.put("dataSourceClassName", "org.postgresql.ds.PGSimpleDataSource");
- map.put("dataSource.url","jdbc:postgresql://localhost:5432/test?user=postgres");
- ConnectionProvider cp = new MyConnectionProvider(map);
- String argument = args[0];
- Config conf = new Config();
- conf.put(JDBC_CONF, map);
- conf.setDebug(true);
- conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 3);
- //set the number of workers
- conf.setNumWorkers(3);
- TopologyBuilder builder = new TopologyBuilder();
- //Setup Kafka spout
- BrokerHosts hosts = new ZkHosts("localhost:2181");
- String topic = "test";
- String zkRoot = "";
- String consumerGroupId = "group1";
- SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, consumerGroupId);
- spoutConfig.scheme = new RawMultiScheme();
- spoutConfig.scheme = new SchemeAsMultiScheme(new Search_Parser());
- KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
- builder.setSpout("KafkaSpout", kafkaSpout);
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement