Guest User

Untitled

a guest
Sep 18th, 2018
133
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.70 KB | None | 0 0
  1. require 'bundler/setup'
  2. require 'rubygems'
  3. require 'sequel'
  4. require 'jdbc/dss'
  5. require 'logger'
  6. require 'pathname'
  7. require 'date'
  8. require 'json'
  9. require 'peach'
  10. require 'zlib'
  11.  
  12. Jdbc::DSS.load_driver
  13. Java.com.gooddata.dss.jdbc.driver.DssDriver
  14.  
  15. @ads_instance = 'e754dad464ce3602dd44230afb57cb85'
  16.  
  17. @logger = Logger.new($stdout)
  18.  
  19. files_to_load = {
  20. "sessions" => [],
  21. "custom_events" => [],
  22. "device_events" => [],
  23. "notifications" => [],
  24. "impressions"=>[]
  25. }
  26.  
  27. dir_content = Dir.glob('mobile_events_v1/**/*').reject {|fn| File.directory?(fn) }
  28.  
  29. dir_content.each do |f|
  30. @logger.info "Procesing and ungzipping #{f}"
  31. new_name = f.gsub('.gz','.json')
  32. system("gzip -c -d #{f} > #{new_name}")
  33.  
  34. #File.open(f.gsub('.gz','.json'),'w').write(Zlib::GzipReader.open(f).read)
  35. file_name = File.basename(f)
  36. entity = f.split('/')[1]
  37. files_to_load[entity] << {
  38. "file_name" => file_name,
  39. "file_path" => new_name,
  40. "file_size_origin" => File.size(f),
  41. "file_size" => File.size(new_name),
  42. "copy_cmd" => "COPY mobile_events_#{entity} FROM LOCAL '#{new_name}' PARSER fjsonparser();"
  43. }
  44. end
  45.  
  46. def refresh_connection(entity)
  47. @logger.info "Refresh ##{entity}"
  48. Sequel.connect('jdbc:dss://na1.analytics.sailthru.com/gdc/dss/instances/' + @ads_instance, :username => 'ps-etl+sailthru@gooddata.com',
  49. :password => '',
  50. :pool_timeout => 240,
  51. :max_connections => 4)
  52. end
  53.  
  54.  
  55. @logger.info "SUMMARY"
  56. files_to_load.keys.each do |k|
  57. @logger.info "ENTITY #{k} contain #{files_to_load[k].length} files"
  58. end
  59.  
  60. connection = Sequel.connect('jdbc:dss://na1.analytics.sailthru.com/gdc/dss/instances/' + @ads_instance,
  61. :username => 'ps-etl+sailthru@gooddata.com',
  62. :password => '',
  63. :pool_timeout => 240,
  64. :max_connections => 4)
  65.  
  66. queue = Queue.new
  67. threads = []
  68. files_to_load.keys.each {|x| queue << x}
  69. queue.size.times do
  70. threads << Thread.new do
  71. entity = queue.pop(true) rescue nil
  72. files_to_load[entity].each_with_index do |file, index|
  73. index = index + 1
  74. if index % 80 == 0
  75. connection = refresh_connection(entity)
  76. end
  77. file_name = file['file_name']
  78. file_size = file['file_size']
  79. file_size_origin = file['file_size_origin']
  80. sql_stmnt = file['copy_cmd']
  81. file_path = file['file_path']
  82. @logger.info "#{Thread.current.object_id} || Executing #{sql_stmnt}. #{index}/#{files_to_load[entity].length}"
  83. connection.execute sql_stmnt
  84. connection.execute "insert INTO mobile_events_metadata (entity, file_name, file_size, row_count, inserted_at, file_size_origin)
  85. values ('#{entity}', '#{file_name}',#{file_size},#{`wc -l "#{file_path}" `.strip.split(' ')[0].to_i}, '#{Time.now()}', #{file_size_origin})"
  86. end
  87. end
  88. end
  89.  
  90. threads.each {|t| t.join}
  91.  
  92. files_to_load.keys.each do |entity|
  93. row_count = connection.fetch("SELECT count(*) from mobile_events_#{entity}").all[0][:count]
  94. if row_count > 0
  95. sql_stm = "SELECT compute_flextable_keys_and_build_view('mobile_events_#{entity}')"
  96. @logger.info sql_stm
  97. connection.fetch(sql_stm).all
  98. end
  99. end
  100.  
  101.  
  102.  
  103. #Preparing payloads for mobile notifications
  104.  
  105. #get all payload columns
  106.  
  107. sql_stmnt = "SELECT key_name AS key_name FROM mobile_events_notifications_keys WHERE REGEXP_LIKE(key_name, 'payload.')"
  108.  
  109. payload_columns_raw = connection.fetch(sql_stmnt).all
  110. payload_columns = []
  111. payload_columns_raw.each {|c| payload_columns << c[:key_name]}
  112.  
  113. payload_columns.each do |pc|
  114. #sql_stmnt = "INSERT /*+direct*/ INTO src_json_mobile_events_notifications(type, client_id, app_id, notification_id, message_id, platform, payload, payload_value, date, status)
  115. # Select type, client_id, app_id, notification_id, message_id, platform, '#{pc.gsub('payload.', '')}', \"#{pc}\"::VARCHAR(10000), date, status from mobile_events_notifications_view
  116. # group by type, client_id, app_id, notification_id, message_id, platform,'#{pc.gsub('payload.', '')}', \"#{pc}\", date, status;";
  117. sql_stmnt = "INSERT /*+direct*/ INTO src_json_mobile_events_notifications (user_id, type, token, status, sid, platform, notification_id, message_id, device_id, date, client_id, app_id, payload, payload_value)
  118. SELECT user_id, type, token, status, sid, platform, notification_id, message_id, device_id, date, client_id, app_id, '#{pc.gsub('payload. ', '')}', \"#{pc}\":: VARCHAR (10000) FROM mobile_events_notifications_view
  119. GROUP BY user_id, type, token, status, sid, platform, notification_id, message_id, device_id, date, client_id, app_id, '#{pc.gsub('payload.', '')}', \"#{pc}\";"
  120. @logger.info sql_stmnt
  121. connection.execute(sql_stmnt)
  122. end
  123.  
  124. @logger.info "Job's done."
Add Comment
Please, Sign In to add comment