Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /*
- The MIT License (MIT)
- Copyright (c) 2014 sinfonier-project
- Permission is hereby granted, free of charge, to any person obtaining a copy
- of this software and associated documentation files (the "Software"), to deal
- in the Software without restriction, including without limitation the rights
- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- copies of the Software, and to permit persons to whom the Software is
- furnished to do so, subject to the following conditions:
- The above copyright notice and this permission notice shall be included in
- all copies or substantial portions of the Software.
- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
- THE SOFTWARE.
- */
- package com.sinfonier.bolts;
- import java.sql.Connection;
- import java.sql.DatabaseMetaData;
- import java.sql.DriverManager;
- import java.sql.PreparedStatement;
- import java.sql.ResultSet;
- import java.sql.SQLException;
- import java.util.ArrayList;
- import java.util.Date;
- import java.util.List;
- import java.util.Map;
- /**
- * JDBC Bolt to insert or update row
- *
- * @author jmdiez
- */
- public class JDBCBolt extends BaseSinfonierBolt {
- private static final long serialVersionUID = 7799073747630789816L;
- private static final boolean CHECKNOTNULL = true;
- private boolean debug = true;
- private String driver; // p.e. "com.mysql.jdbc.Driver"
- private String url; // p.e. "jdbc:mysql://localhost:3306/mydb"
- private String user;
- private String password;
- private String tableName;
- private String[] fields;
- private List<String> pks;
- private String errorField;
- private transient Connection conn;
- private boolean error = false;
- private Date lastConn;
- public JDBCBolt(String xmlFile) {
- super(xmlFile);
- }
- @Override
- public void userprepare() {
- conn = null;
- driver = getParam("driver");
- url = getParam("url");
- user = getParam("user");
- password = getParam("password");
- this.tableName = this.getParam("tablename", JDBCBolt.CHECKNOTNULL);
- this.errorField = this.getParam("errorField");
- if (errorField == null) {
- errorField = "error";
- }
- List<Object> list = this.getParamList("fields");
- fields = new String[list.size()];
- list.toArray(fields);
- open();
- pks = getPks();
- }
- @Override
- public void userexecute() {
- if (!existsField(errorField)) {
- Map<String,Object> tuple = getJson();
- if (existsRow(tuple))
- {
- createAndExecuteUpdate(tuple);
- } else
- {
- createAndExecuteInsert(tuple);
- }
- }
- emit();
- }
- public void usercleanup() {
- closedb();
- }
- /**
- * Connect to a jdbc database
- *
- */
- public Connection open() {
- try {
- Class.forName(driver);
- } catch (ClassNotFoundException e) {
- debug("JDBC driver not found");
- e.printStackTrace();
- error = true;
- return null;
- }
- try {
- conn = DriverManager.getConnection(url, user, password);
- } catch (SQLException e) {
- debug("Connection Failed! Check output console");
- e.printStackTrace();
- error = true;
- return conn;
- }
- if (conn != null) {
- debug("Connected To Database");
- } else {
- error = true;
- debug("Failed to make connection!");
- }
- lastConn = new Date();
- error = false;
- return conn;
- }
- private boolean existsRow(Map<String, Object> tuple)
- {
- String firstPk = pks.get(0);
- StringBuilder query = new StringBuilder(String.format("SELECT %s FROM %s ",firstPk, tableName));
- query.append(buildWherePks());
- try {
- PreparedStatement preparedStatement = conn.prepareStatement(query.toString());
- int i=1;
- for ( String key: pks )
- {
- preparedStatement.setObject(i++, tuple.get(key));
- }
- ResultSet rs = preparedStatement.executeQuery();
- while(rs.next())
- {
- return true;
- }
- } catch (Exception ex) {
- addField(errorField, ex.getMessage());
- addField(errorField + "Query", query.toString());
- }
- return false;
- }
- private String buildWherePks() {
- String sep = " WHERE ";
- StringBuilder where = new StringBuilder(" ");
- for ( String key: pks )
- {
- where.append(sep);
- where.append(String.format("%s = ?", key));
- sep = " AND ";
- }
- return where.toString();
- }
- /**
- * Create a valid query for JDBC from a Map<String, Object> and executes
- * it
- *
- * @param tuple
- * Map with the keys and values for each field.
- */
- private void createAndExecuteInsert(Map<String, Object> tuple) {
- StringBuilder query = new StringBuilder(String.format("INSERT INTO %s (",tableName));
- List<Object> fieldValues = new ArrayList<Object>();
- List<String> questions = new ArrayList<String>(fields.length);
- if (fields != null && fields.length > 0) {
- query.append(String.join(",",fields));
- query.append(") VALUES (");
- for (int i = 0; i < fields.length; i++) {
- fieldValues.add(tuple.get(fields[i]));
- questions.add("?");
- }
- query.append(String.join(",",questions));
- query.append(") ");
- } else {
- List<String> fieldNames = new ArrayList<String>(tuple.size());
- for (Map.Entry<String, Object> entry : tuple.entrySet()) {
- fieldNames.add(entry.getKey());
- questions.add("?");
- fieldValues.add(entry.getValue());
- }
- query.append(String.join(",",fieldNames));
- query.append(") VALUES (");
- query.append(String.join(",",questions));
- query.append(") ");
- }
- try {
- PreparedStatement preparedStatement = conn.prepareStatement(query.toString());
- int i=1;
- for (Object obj: fieldValues)
- {
- preparedStatement.setObject(i++, obj);
- }
- preparedStatement.executeUpdate();
- } catch (Exception ex) {
- addField(errorField, ex.getMessage());
- addField(errorField + "Query", query.toString());
- }
- }
- /**
- * Gets the primary keys of the table;
- */
- private List<String> getPks()
- {
- List<String> lsPks = new ArrayList<String>();
- try {
- DatabaseMetaData dmd = conn.getMetaData();
- ResultSet rs = dmd.getPrimaryKeys(null, null, tableName);
- while(rs.next()){
- lsPks.add(rs.getString("COLUMN_NAME"));
- }
- }catch (SQLException e) {
- e.printStackTrace();
- }
- return lsPks;
- }
- /**
- * Create a valid query for JDBC from a Map<String, Object> and executes
- * it
- *
- * @param tuple
- * Map with the keys and values for each field.
- */
- private void createAndExecuteUpdate(Map<String, Object> tuple) {
- StringBuilder query = new StringBuilder(String.format("UPDATE %s SET ",tableName));
- List<Object> fieldValues = new ArrayList<Object>();
- String sep = "";
- if (fields != null && fields.length > 0) {
- for (int i = 0; i < fields.length; i++) {
- if (! pks.contains(fields[i]))
- {
- query.append(String.format("%s %s = ?",sep,fields[i]));
- fieldValues.add(tuple.get(fields[i]));
- sep = ",";
- }
- }
- } else {
- for (Map.Entry<String, Object> entry : tuple.entrySet()) {
- if (!pks.contains(entry.getKey()))
- {
- query.append(String.format("%s %s = ?",sep, entry.getKey()));
- fieldValues.add(entry.getValue());
- sep = ",";
- }
- }
- }
- query.append(buildWherePks());
- try {
- PreparedStatement preparedStatement = conn.prepareStatement(query.toString());
- int i=1;
- for (Object obj: fieldValues)
- {
- preparedStatement.setObject(i++, obj);
- }
- for (String pk: pks)
- {
- preparedStatement.setObject(i++, tuple.get(pk));
- }
- preparedStatement.executeUpdate();
- } catch (Exception ex) {
- addField(errorField, ex.getMessage());
- addField(errorField + "Query", query.toString());
- }
- }
- public void closedb()
- {
- if (conn != null)
- {
- try {
- conn.close();
- } catch (SQLException e) {
- error = true;
- e.printStackTrace();
- }
- }
- }
- public void reopen()
- {
- debug("Reconnecting to Database");
- closedb();
- open();
- }
- private void debug(String str)
- {
- if (debug)
- System.out.println(str);
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement