Advertisement
Guest User

Untitled

a guest
May 15th, 2019
131
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Lua 4.51 KB | None | 0 0
  1. local oracle = require('oracle')
  2. local checks = require('checks')
  3. local fiber = require('fiber')
  4. local fun = require('fun')
  5. local const = require('oracle-replicator.const')
  6. local log = require('log')
  7.  
  8. local OracleConnector = {}
  9.  
  10. -- https://docs.oracle.com/database/121/ERRMG
  11. local ORA_CONNECTION_ERR = {
  12.     3113,
  13.     3114,
  14. }
  15.  
  16. local function validate_config(_)
  17.     checks({
  18.             connect = {
  19.                 db = 'string',
  20.                 username = 'string',
  21.                 password = 'string'
  22.             },
  23.             connect_opts = {
  24.                 reconnect_try = '?number',
  25.                 timeout = '?number'
  26.             },
  27.             queries = {
  28.                 first_batch = 'string',
  29.                 next_batch = 'string',
  30.                 check_notif_id = 'string'
  31.             }
  32.         })
  33. end
  34.  
  35. local function new(cfg)
  36.     local _, err = pcall(validate_config, cfg)
  37.     if err ~= nil then
  38.         return nil, err
  39.     end
  40.  
  41.     local connector = {}
  42.     setmetatable(connector, { __index = OracleConnector })
  43.     connector.cfg = cfg
  44.  
  45.     if not connector.cfg.connect_opts then
  46.         connector.cfg.connect_opts = {
  47.             timeout = 0,
  48.             reconnect_try = 1
  49.         }
  50.     end
  51.  
  52.     log.info('[oracle-connector] Connecting to data channel...')
  53.     local c, err = oracle.connect(connector.cfg.connect) -- connect to Oracle
  54.     if not c then
  55.         log.error(const.ERROR.FAIL_ORACLE_CONN, err or 'Unknown error')
  56.         return nil, err or 'Unknown error'
  57.     end
  58.  
  59.     log.info('[oracle-connector] Connecting to metrics channel...')
  60.     local metr_c, err = oracle.connect(connector.cfg.connect)
  61.     if not metr_c then
  62.         log.error(const.ERROR.FAIL_ORACLE_CONN, err or 'Unknown error')
  63.         return nil, err or 'Unknown error'
  64.     end
  65.  
  66.     connector.conn = c
  67.     connector.conn_metrics = metr_c
  68.  
  69.     return connector
  70. end
  71.  
  72. local function query_repeater(self, conn, query)
  73.     local try_count = self.cfg.connect_opts.reconnect_try or 1
  74.     local timeout = self.cfg.connect_opts.timeout or 0
  75.  
  76.     local function is_connection_error(code)
  77.         return fun.any(function(x) return code == x end, ORA_CONNECTION_ERR)
  78.     end
  79.  
  80.     local status, rc, result, row_count
  81.     for try = 1, try_count do
  82.         status, rc, result, row_count = pcall(conn.exec, conn, query)
  83.         if rc and rc ~= 0 and result.type == oracle.ERROR_TYPES.ORACLE then
  84.             if is_connection_error(result.code) then
  85.                 log.info('[oracle-connector] Problems with connection! Try to reconnect #%d...', try)
  86.                 fiber.sleep(timeout)
  87.             else
  88.                 break
  89.             end
  90.         else
  91.             break
  92.         end
  93.     end
  94.  
  95.     return status, rc, result, row_count
  96. end
  97.  
  98. local function send_query(self, conn, query)
  99.     -- oracle-connector return result = nil if procedure return nothing
  100.     --                  return result = {} if select query return nothing
  101.     local status, rc, result, row_count = query_repeater(self, conn, query)
  102.     if not status then
  103.         log.error(const.ERROR.FAIL_DB_CONNECTOR, rc or 'Unknown error')
  104.         return nil, { msg = rc or 'Unknown error' }
  105.     elseif rc ~= 0 then
  106.         log.error(const.ERROR.ORACLE_QUERY_ERROR, result.msg or 'Unknown error')
  107.         return nil, { msg = result.msg or 'Unknown error', rc = rc }
  108.     end
  109.  
  110.     return { data = result or {}, rc = rc, row_count = row_count or 0 }
  111. end
  112.  
  113. function OracleConnector:validate_notif_id(unvalidated_notif_id)
  114.     local query = self.cfg.queries.check_notif_id:gsub(':NOTIF_ID', tostring(unvalidated_notif_id))
  115.  
  116.     local result, err = send_query(self, self.conn, query)
  117.     if not result then
  118.         return nil, err.msg
  119.     end
  120.  
  121.     if result.row_count ~= 1 then
  122.         return false
  123.     end
  124.  
  125.     return true
  126. end
  127.  
  128. function OracleConnector:next_batch(notif_id, batch_size)
  129.     local first_batch_query = self.cfg.queries.first_batch:gsub(':BATCH_SIZE', tostring(batch_size))
  130.     local next_batch_query = self.cfg.queries.next_batch:gsub(':BATCH_SIZE', tostring(batch_size))
  131.  
  132.     -- ...
  133.    
  134.     -- Waits that oracle-connector return at least empty table
  135.     local result, err = send_query(self, self.conn, query)
  136.     if not result then
  137.         return nil, err.msg
  138.     end
  139.  
  140.     return {
  141.         records = result.data,
  142.         size = #result.data
  143.     }
  144. end
  145.  
  146. function OracleConnector:ack_batch(batch)
  147.     return true
  148. end
  149.  
  150. function OracleConnector:get_statistics()
  151.     return nil, "Unsupported"
  152. end
  153.  
  154. return {
  155.     new = new
  156. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement