Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.kingdee.finance.flume.sink;
- import java.sql.Connection;
- import java.sql.DriverManager;
- import java.sql.PreparedStatement;
- import java.sql.SQLException;
- import java.util.List;
- import org.apache.commons.compress.utils.Charsets;
- import org.apache.flume.Channel;
- import org.apache.flume.Context;
- import org.apache.flume.Event;
- import org.apache.flume.EventDeliveryException;
- import org.apache.flume.Transaction;
- import org.apache.flume.conf.Configurable;
- import org.apache.flume.sink.AbstractSink;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import com.google.common.base.Preconditions;
- import com.google.common.collect.Lists;
- /**
- * sink for mysql
- *
- * @author jinyu
- *
- */
- public class MySQLSink extends AbstractSink implements Configurable {
- private final Logger log = LoggerFactory.getLogger(MySQLSink.class);
- private String host;
- private String port;
- private String user;
- private String password;
- private String database;
- private String table;
- private int batchSize;
- private PreparedStatement ps;
- private Connection conn;
- @Override
- public void configure(Context context) {
- host = context.getString("host");
- Preconditions.checkNotNull(host, "host must be set!!");
- port = context.getString("port");
- Preconditions.checkNotNull(port, "port must be set!!");
- database = context.getString("database");
- Preconditions.checkNotNull(database, "database must be set!!");
- table = context.getString("table");
- Preconditions.checkNotNull(table, "table must be set!!");
- user = context.getString("user");
- Preconditions.checkNotNull(user, "user must be set!!");
- password = context.getString("password");
- Preconditions.checkNotNull(password, "password must be set!!");
- batchSize = context.getInteger("batchSize", 100);
- Preconditions.checkNotNull(batchSize > 0, "batchSize must be a positive number!!");
- }
- @Override
- public void start() {
- super.start();
- try {
- Class.forName("com.mysql.jdbc.Driver");
- } catch (ClassNotFoundException e) {
- ; // never
- }
- String url = String.format("jdbc:mysql://%s:%s/%s?useUnicode=true&characterEncoding=UTF-8", host, port,
- database);
- try {
- conn = DriverManager.getConnection(url, user, password);
- conn.setAutoCommit(false);
- ps = conn.prepareStatement("insert into " + table + " (content) values (?)");
- } catch (SQLException e) {
- e.printStackTrace();
- System.exit(1);
- }
- }
- @Override
- public Status process() throws EventDeliveryException {
- Status result = Status.READY;
- Channel channel = getChannel();
- Transaction transaction = channel.getTransaction();
- List<String> events = Lists.newArrayList();
- transaction.begin();
- try {
- Event event;
- for (int i = 0; i < batchSize; i++) {
- event = channel.take();
- if (event != null) {
- events.add(new String(event.getBody(), Charsets.UTF_8));
- } else {
- result = Status.BACKOFF;
- break;
- }
- }
- if (!events.isEmpty()) {
- ps.clearBatch();
- for (String content : events) {
- ps.setString(1, content);
- ps.addBatch();
- }
- ps.executeBatch();
- conn.commit();
- }
- transaction.commit();
- } catch (Throwable e) {
- log.error("Failed to commit transaction", e);
- transaction.rollback();
- } finally {
- transaction.close();
- }
- return result;
- }
- @Override
- public void stop() {
- super.stop();
- if (ps != null) {
- try {
- ps.close();
- } catch (SQLException e) {
- e.printStackTrace();
- }
- }
- if (conn != null) {
- try {
- conn.close();
- } catch (SQLException e) {
- e.printStackTrace();
- }
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement