Advertisement
Guest User

Untitled

a guest
Sep 3rd, 2017
88
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.47 KB | None | 0 0
  1. package com.kingdee.finance.flume.sink;
  2.  
  3. import java.sql.Connection;
  4. import java.sql.DriverManager;
  5. import java.sql.PreparedStatement;
  6. import java.sql.SQLException;
  7. import java.util.List;
  8.  
  9. import org.apache.commons.compress.utils.Charsets;
  10. import org.apache.flume.Channel;
  11. import org.apache.flume.Context;
  12. import org.apache.flume.Event;
  13. import org.apache.flume.EventDeliveryException;
  14. import org.apache.flume.Transaction;
  15. import org.apache.flume.conf.Configurable;
  16. import org.apache.flume.sink.AbstractSink;
  17. import org.slf4j.Logger;
  18. import org.slf4j.LoggerFactory;
  19.  
  20. import com.google.common.base.Preconditions;
  21. import com.google.common.collect.Lists;
  22.  
  23. /**
  24. * sink for mysql
  25. *
  26. * @author jinyu
  27. *
  28. */
  29. public class MySQLSink extends AbstractSink implements Configurable {
  30. private final Logger log = LoggerFactory.getLogger(MySQLSink.class);
  31.  
  32. private String host;
  33. private String port;
  34. private String user;
  35. private String password;
  36. private String database;
  37. private String table;
  38. private int batchSize;
  39.  
  40. private PreparedStatement ps;
  41. private Connection conn;
  42.  
  43. @Override
  44. public void configure(Context context) {
  45. host = context.getString("host");
  46. Preconditions.checkNotNull(host, "host must be set!!");
  47. port = context.getString("port");
  48. Preconditions.checkNotNull(port, "port must be set!!");
  49. database = context.getString("database");
  50. Preconditions.checkNotNull(database, "database must be set!!");
  51. table = context.getString("table");
  52. Preconditions.checkNotNull(table, "table must be set!!");
  53. user = context.getString("user");
  54. Preconditions.checkNotNull(user, "user must be set!!");
  55. password = context.getString("password");
  56. Preconditions.checkNotNull(password, "password must be set!!");
  57. batchSize = context.getInteger("batchSize", 100);
  58. Preconditions.checkNotNull(batchSize > 0, "batchSize must be a positive number!!");
  59. }
  60.  
  61. @Override
  62. public void start() {
  63. super.start();
  64. try {
  65. Class.forName("com.mysql.jdbc.Driver");
  66. } catch (ClassNotFoundException e) {
  67. ; // never
  68. }
  69. String url = String.format("jdbc:mysql://%s:%s/%s?useUnicode=true&characterEncoding=UTF-8", host, port,
  70. database);
  71. try {
  72. conn = DriverManager.getConnection(url, user, password);
  73. conn.setAutoCommit(false);
  74. ps = conn.prepareStatement("insert into " + table + " (content) values (?)");
  75. } catch (SQLException e) {
  76. e.printStackTrace();
  77. System.exit(1);
  78. }
  79. }
  80.  
  81. @Override
  82. public Status process() throws EventDeliveryException {
  83. Status result = Status.READY;
  84. Channel channel = getChannel();
  85. Transaction transaction = channel.getTransaction();
  86. List<String> events = Lists.newArrayList();
  87. transaction.begin();
  88. try {
  89. Event event;
  90. for (int i = 0; i < batchSize; i++) {
  91. event = channel.take();
  92. if (event != null) {
  93. events.add(new String(event.getBody(), Charsets.UTF_8));
  94. } else {
  95. result = Status.BACKOFF;
  96. break;
  97. }
  98. }
  99. if (!events.isEmpty()) {
  100. ps.clearBatch();
  101. for (String content : events) {
  102. ps.setString(1, content);
  103. ps.addBatch();
  104. }
  105. ps.executeBatch();
  106. conn.commit();
  107. }
  108. transaction.commit();
  109. } catch (Throwable e) {
  110. log.error("Failed to commit transaction", e);
  111. transaction.rollback();
  112. } finally {
  113. transaction.close();
  114. }
  115. return result;
  116. }
  117.  
  118. @Override
  119. public void stop() {
  120. super.stop();
  121. if (ps != null) {
  122. try {
  123. ps.close();
  124. } catch (SQLException e) {
  125. e.printStackTrace();
  126. }
  127. }
  128. if (conn != null) {
  129. try {
  130. conn.close();
  131. } catch (SQLException e) {
  132. e.printStackTrace();
  133. }
  134. }
  135. }
  136. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement