Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- 'use strict'
- const reekoh = require('reekoh')
- const plugin = new reekoh.plugins.Storage()
- const async = require('async')
- const Connection = require('tedious').Connection
- const Request = require('tedious').Request
- const TYPES = require('tedious').TYPES
- const isPlainObject = require('lodash.isplainobject')
- const isEmpty = require('lodash.isempty')
- let _query = null
- let _parseFields = null
- let _connection = null
- plugin.on('data', (data) => {
- if (!isPlainObject(data)) {
- return plugin.logException(new Error(`Invalid data received. Must be a valid JSON Object. Data: ${data}`))
- }
- if (isEmpty(data)) {
- return plugin.logException(new Error('Invalid data received. Data must not be empty.'))
- }
- let request = new Request(_query, function (error) {
- if (error) {
- console.error(error)
- return plugin.logException(new Error('Error creating request.'))
- }
- })
- async.forEachOf(_parseFields, (field, key, callback) => {
- request.addParameter(field.source_field, TYPES[field.data_type], data[field.source_field])
- callback()
- }, () => {
- _connection.on('debug', function (msg) {
- if (msg === 'State change: SentClientRequest -> LoggedIn') {
- plugin.log(JSON.stringify({
- title: 'Data has been sent to Azure Table Storage',
- data: data
- }))
- plugin.emit('processed')
- }
- })
- _connection.on('error', function (error) {
- console.log('error', error)
- plugin.logException(error)
- })
- _connection.execSql(request)
- })
- })
- plugin.once('ready', () => {
- console.log('ready')
- let options = plugin.config
- let first = true
- let columnList = null
- let valueList = null
- try {
- _parseFields = JSON.parse(options.fields)
- } catch (ex) {
- console.log(ex)
- plugin.logException(new Error('Error parsing Field Mapping configuration parameter. Must be a valid JSON String.'))
- setTimeout(() => {
- process.exit(1)
- }, 2000)
- return
- }
- async.forEachOf(_parseFields, (field, key, callback) => {
- if (field.source_field === undefined || field.source_field === null) {
- callback(new Error(`Source field is missing for ${key} in the fields configuration parameter.`))
- } else if (!TYPES.hasOwnProperty(field.data_type)) {
- callback(new Error(`Invalid Data Type for ${key}, please refer to http://pekim.github.io/tedious/api-datatypes.html for valid values.`))
- } else {
- if (!first) {
- valueList = `${valueList}, @${field.source_field}`
- columnList = `${columnList}, ${key}`
- } else {
- first = false
- valueList = `@${field.source_field}`
- columnList = key
- }
- callback()
- }
- }, (error) => {
- if (error) {
- console.error('Error parsing field mapping configuration for Azure SQL Plugin.', error)
- plugin.logException(new Error('Error parsing field mapping configuration for Azure SQL Plugin.'))
- return setTimeout(() => {
- process.exit(1)
- }, 2000)
- }
- _query = `INSERT ${options.table} (${columnList}) VALUES (${valueList})`
- let config = {
- userName: options.user,
- password: options.password,
- server: options.server,
- options: {
- port: options.port || 1433,
- database: options.database,
- encrypt: true
- }
- }
- _connection = new Connection(config)
- _connection.on('connect', function (error) {
- if (error) {
- console.error('Error connecting to Azure SQL DB Connection.', error)
- plugin.logException(error)
- } else {
- plugin.log('Azure SQL DB Connection initialized.')
- plugin.emit('init')
- }
- })
- _connection.on('error', function (error) {
- console.error('Error on Azure SQL DB Connection.', error)
- plugin.logException(error)
- })
- _connection.on('end', function () {
- plugin.log('Azure SQL connection has ended.')
- })
- })
- })
- module.exports = plugin
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement