Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import java.util.Properties;
- import java.sql.*;
- import kafka.javaapi.producer.Producer;
- import kafka.producer.KeyedMessage;
- import kafka.producer.ProducerConfig;
- public class ProducerTest {
- public static void main(String[] args) throws ClassNotFoundException, SQLException
- {
- Properties props = new Properties();
- props.put("zk.connect","localhost:2181");
- props.put("serializer.class","kafka.serializer.StringEncoder");
- props.put("metadata.broker.list","localhost:9092");
- ProducerConfig config = new ProducerConfig(props);
- Producer producer = new Producer(config);
- try
- {
- Class.forName("com.mysql.jdbc.Driver");
- Connection con = DriverManager.getConnection(
- "jdbc:mysql://172.18.67.8:3306/big_data","root","root");
- Statement stmt = con.createStatement();
- ResultSet rs = stmt.executeQuery("select * from content_log");
- while(rs.next())
- {
- producer.send(new KeyedMessage("lamtest",rs.getString(1) + " " + rs.getString(2)+" "+rs.getString(3)+" "+rs.getString(4)+" "+rs.getString(5)+ " "+ rs.getString(6)
- +" "+ rs.getString(7)
- +" "+ rs.getString(8)
- +" "+ rs.getString(9)
- +" "+ rs.getString(10)
- +" "+ rs.getString(11)
- +" "+ rs.getString(12)
- +" "+ rs.getString(13)
- +" "+ rs.getString(14)
- +" "+ rs.getString(15)
- +" "+ rs.getString(17)
- +" "+ rs.getString(18)
- +" "+ rs.getString(19)
- +" "+ rs.getString(21)
- +" "+ rs.getString(22)
- ));
- }
- con.close();
- }
- catch(Exception e)
- {
- System.out.println(e);
- }
- }
- }
- import kafka.consumer.Consumer;
- import kafka.consumer.ConsumerConfig;
- import kafka.consumer.ConsumerIterator;
- import kafka.consumer.KafkaStream;
- import kafka.javaapi.consumer.ConsumerConnector;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.Properties;
- public class SimpleHLConsumer {
- private final ConsumerConnector consumer;
- private final String topic;
- public SimpleHLConsumer(String zookeeper, String groupId, String topic) {
- Properties props = new Properties();
- props.put("zookeeper.connect", zookeeper);
- props.put("group.id", groupId);
- props.put("zookeeper.session.timeout.ms", "500");
- props.put("zookeeper.sync.time.ms", "250");
- props.put("auto.commit.interval.ms", "1000");
- consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
- this.topic = topic;
- }
- public void testConsumer() {
- Map<String, Integer> topicCount = new HashMap<>();
- topicCount.put(topic, 1);
- Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
- List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
- for (final KafkaStream stream : streams) {
- ConsumerIterator<byte[], byte[]> it = stream.iterator();
- while (it.hasNext()) {
- System.out.println("Message from Single Topic: " + new String(it.next().message()));
- }
- }
- if (consumer != null) {
- consumer.shutdown();
- }
- }
- public static void main(String[] args) {
- String topic = "lamtest";
- SimpleHLConsumer simpleHLConsumer = new SimpleHLConsumer("localhost:2181", "testgroup", topic);
- simpleHLConsumer.testConsumer();
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement