Advertisement
Guest User

Untitled

a guest
Jul 10th, 2016
128
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.46 KB | None | 0 0
  1. import java.util.Properties;
  2. import java.sql.*;
  3. import kafka.javaapi.producer.Producer;
  4. import kafka.producer.KeyedMessage;
  5. import kafka.producer.ProducerConfig;
  6. public class ProducerTest {
  7. public static void main(String[] args) throws ClassNotFoundException, SQLException
  8. {
  9. Properties props = new Properties();
  10. props.put("zk.connect","localhost:2181");
  11. props.put("serializer.class","kafka.serializer.StringEncoder");
  12. props.put("metadata.broker.list","localhost:9092");
  13. ProducerConfig config = new ProducerConfig(props);
  14. Producer producer = new Producer(config);
  15. try
  16. {
  17. Class.forName("com.mysql.jdbc.Driver");
  18. Connection con = DriverManager.getConnection(
  19. "jdbc:mysql://172.18.67.8:3306/big_data","root","root");
  20. Statement stmt = con.createStatement();
  21. ResultSet rs = stmt.executeQuery("select * from content_log");
  22. while(rs.next())
  23. {
  24. producer.send(new KeyedMessage("lamtest",rs.getString(1) + " " + rs.getString(2)+" "+rs.getString(3)+" "+rs.getString(4)+" "+rs.getString(5)+ " "+ rs.getString(6)
  25. +" "+ rs.getString(7)
  26. +" "+ rs.getString(8)
  27. +" "+ rs.getString(9)
  28. +" "+ rs.getString(10)
  29. +" "+ rs.getString(11)
  30. +" "+ rs.getString(12)
  31. +" "+ rs.getString(13)
  32. +" "+ rs.getString(14)
  33. +" "+ rs.getString(15)
  34. +" "+ rs.getString(17)
  35. +" "+ rs.getString(18)
  36. +" "+ rs.getString(19)
  37. +" "+ rs.getString(21)
  38. +" "+ rs.getString(22)
  39. ));
  40.  
  41. }
  42. con.close();
  43. }
  44. catch(Exception e)
  45. {
  46. System.out.println(e);
  47. }
  48.  
  49. }
  50. }
  51.  
  52. import kafka.consumer.Consumer;
  53. import kafka.consumer.ConsumerConfig;
  54. import kafka.consumer.ConsumerIterator;
  55. import kafka.consumer.KafkaStream;
  56. import kafka.javaapi.consumer.ConsumerConnector;
  57. import java.util.HashMap;
  58. import java.util.List;
  59. import java.util.Map;
  60. import java.util.Properties;
  61.  
  62. public class SimpleHLConsumer {
  63.  
  64. private final ConsumerConnector consumer;
  65. private final String topic;
  66.  
  67. public SimpleHLConsumer(String zookeeper, String groupId, String topic) {
  68. Properties props = new Properties();
  69. props.put("zookeeper.connect", zookeeper);
  70. props.put("group.id", groupId);
  71. props.put("zookeeper.session.timeout.ms", "500");
  72. props.put("zookeeper.sync.time.ms", "250");
  73. props.put("auto.commit.interval.ms", "1000");
  74.  
  75. consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
  76. this.topic = topic;
  77. }
  78.  
  79. public void testConsumer() {
  80. Map<String, Integer> topicCount = new HashMap<>();
  81. topicCount.put(topic, 1);
  82.  
  83. Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
  84. List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
  85. for (final KafkaStream stream : streams) {
  86. ConsumerIterator<byte[], byte[]> it = stream.iterator();
  87. while (it.hasNext()) {
  88. System.out.println("Message from Single Topic: " + new String(it.next().message()));
  89. }
  90. }
  91. if (consumer != null) {
  92. consumer.shutdown();
  93. }
  94. }
  95.  
  96. public static void main(String[] args) {
  97. String topic = "lamtest";
  98. SimpleHLConsumer simpleHLConsumer = new SimpleHLConsumer("localhost:2181", "testgroup", topic);
  99. simpleHLConsumer.testConsumer();
  100. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement