Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- local oracle = require('oracle')
- local checks = require('checks')
- local fiber = require('fiber')
- local fun = require('fun')
- local const = require('oracle-replicator.const')
- local log = require('log')
- local OracleConnector = {}
- -- https://docs.oracle.com/database/121/ERRMG
- local ORA_CONNECTION_ERR = {
- 3113,
- 3114,
- }
- local function validate_config(_)
- checks({
- connect = {
- db = 'string',
- username = 'string',
- password = 'string'
- },
- connect_opts = {
- reconnect_try = '?number',
- timeout = '?number'
- },
- queries = {
- first_batch = 'string',
- next_batch = 'string',
- check_notif_id = 'string'
- }
- })
- end
- local function new(cfg)
- local _, err = pcall(validate_config, cfg)
- if err ~= nil then
- return nil, err
- end
- local connector = {}
- setmetatable(connector, { __index = OracleConnector })
- connector.cfg = cfg
- if not connector.cfg.connect_opts then
- connector.cfg.connect_opts = {
- timeout = 0,
- reconnect_try = 1
- }
- end
- log.info('[oracle-connector] Connecting to data channel...')
- local c, err = oracle.connect(connector.cfg.connect) -- connect to Oracle
- if not c then
- log.error(const.ERROR.FAIL_ORACLE_CONN, err or 'Unknown error')
- return nil, err or 'Unknown error'
- end
- log.info('[oracle-connector] Connecting to metrics channel...')
- local metr_c, err = oracle.connect(connector.cfg.connect)
- if not metr_c then
- log.error(const.ERROR.FAIL_ORACLE_CONN, err or 'Unknown error')
- return nil, err or 'Unknown error'
- end
- connector.conn = c
- connector.conn_metrics = metr_c
- return connector
- end
- local function query_repeater(self, conn, query)
- local try_count = self.cfg.connect_opts.reconnect_try or 1
- local timeout = self.cfg.connect_opts.timeout or 0
- local function is_connection_error(code)
- return fun.any(function(x) return code == x end, ORA_CONNECTION_ERR)
- end
- local status, rc, result, row_count
- for try = 1, try_count do
- status, rc, result, row_count = pcall(conn.exec, conn, query)
- if rc and rc ~= 0 and result.type == oracle.ERROR_TYPES.ORACLE then
- if is_connection_error(result.code) then
- log.info('[oracle-connector] Problems with connection! Try to reconnect #%d...', try)
- fiber.sleep(timeout)
- else
- break
- end
- else
- break
- end
- end
- return status, rc, result, row_count
- end
- local function send_query(self, conn, query)
- -- oracle-connector return result = nil if procedure return nothing
- -- return result = {} if select query return nothing
- local status, rc, result, row_count = query_repeater(self, conn, query)
- if not status then
- log.error(const.ERROR.FAIL_DB_CONNECTOR, rc or 'Unknown error')
- return nil, { msg = rc or 'Unknown error' }
- elseif rc ~= 0 then
- log.error(const.ERROR.ORACLE_QUERY_ERROR, result.msg or 'Unknown error')
- return nil, { msg = result.msg or 'Unknown error', rc = rc }
- end
- return { data = result or {}, rc = rc, row_count = row_count or 0 }
- end
- function OracleConnector:validate_notif_id(unvalidated_notif_id)
- local query = self.cfg.queries.check_notif_id:gsub(':NOTIF_ID', tostring(unvalidated_notif_id))
- local result, err = send_query(self, self.conn, query)
- if not result then
- return nil, err.msg
- end
- if result.row_count ~= 1 then
- return false
- end
- return true
- end
- function OracleConnector:next_batch(notif_id, batch_size)
- local first_batch_query = self.cfg.queries.first_batch:gsub(':BATCH_SIZE', tostring(batch_size))
- local next_batch_query = self.cfg.queries.next_batch:gsub(':BATCH_SIZE', tostring(batch_size))
- -- ...
- -- Waits that oracle-connector return at least empty table
- local result, err = send_query(self, self.conn, query)
- if not result then
- return nil, err.msg
- end
- return {
- records = result.data,
- size = #result.data
- }
- end
- function OracleConnector:ack_batch(batch)
- return true
- end
- function OracleConnector:get_statistics()
- return nil, "Unsupported"
- end
- return {
- new = new
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement