Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- require 'bundler/setup'
- require 'rubygems'
- require 'sequel'
- require 'jdbc/dss'
- require 'logger'
- require 'pathname'
- require 'date'
- require 'json'
- require 'peach'
- require 'zlib'
- Jdbc::DSS.load_driver
- Java.com.gooddata.dss.jdbc.driver.DssDriver
- @ads_instance = 'e754dad464ce3602dd44230afb57cb85'
- @logger = Logger.new($stdout)
- files_to_load = {
- "sessions" => [],
- "custom_events" => [],
- "device_events" => [],
- "notifications" => [],
- "impressions"=>[]
- }
- dir_content = Dir.glob('mobile_events_v1/**/*').reject {|fn| File.directory?(fn) }
- dir_content.each do |f|
- @logger.info "Procesing and ungzipping #{f}"
- new_name = f.gsub('.gz','.json')
- system("gzip -c -d #{f} > #{new_name}")
- #File.open(f.gsub('.gz','.json'),'w').write(Zlib::GzipReader.open(f).read)
- file_name = File.basename(f)
- entity = f.split('/')[1]
- files_to_load[entity] << {
- "file_name" => file_name,
- "file_path" => new_name,
- "file_size_origin" => File.size(f),
- "file_size" => File.size(new_name),
- "copy_cmd" => "COPY mobile_events_#{entity} FROM LOCAL '#{new_name}' PARSER fjsonparser();"
- }
- end
- def refresh_connection(entity)
- @logger.info "Refresh ##{entity}"
- Sequel.connect('jdbc:dss://na1.analytics.sailthru.com/gdc/dss/instances/' + @ads_instance, :username => 'ps-etl+sailthru@gooddata.com',
- :password => '',
- :pool_timeout => 240,
- :max_connections => 4)
- end
- @logger.info "SUMMARY"
- files_to_load.keys.each do |k|
- @logger.info "ENTITY #{k} contain #{files_to_load[k].length} files"
- end
- connection = Sequel.connect('jdbc:dss://na1.analytics.sailthru.com/gdc/dss/instances/' + @ads_instance,
- :username => 'ps-etl+sailthru@gooddata.com',
- :password => '',
- :pool_timeout => 240,
- :max_connections => 4)
- queue = Queue.new
- threads = []
- files_to_load.keys.each {|x| queue << x}
- queue.size.times do
- threads << Thread.new do
- entity = queue.pop(true) rescue nil
- files_to_load[entity].each_with_index do |file, index|
- index = index + 1
- if index % 80 == 0
- connection = refresh_connection(entity)
- end
- file_name = file['file_name']
- file_size = file['file_size']
- file_size_origin = file['file_size_origin']
- sql_stmnt = file['copy_cmd']
- file_path = file['file_path']
- @logger.info "#{Thread.current.object_id} || Executing #{sql_stmnt}. #{index}/#{files_to_load[entity].length}"
- connection.execute sql_stmnt
- connection.execute "insert INTO mobile_events_metadata (entity, file_name, file_size, row_count, inserted_at, file_size_origin)
- values ('#{entity}', '#{file_name}',#{file_size},#{`wc -l "#{file_path}" `.strip.split(' ')[0].to_i}, '#{Time.now()}', #{file_size_origin})"
- end
- end
- end
- threads.each {|t| t.join}
- files_to_load.keys.each do |entity|
- row_count = connection.fetch("SELECT count(*) from mobile_events_#{entity}").all[0][:count]
- if row_count > 0
- sql_stm = "SELECT compute_flextable_keys_and_build_view('mobile_events_#{entity}')"
- @logger.info sql_stm
- connection.fetch(sql_stm).all
- end
- end
- #Preparing payloads for mobile notifications
- #get all payload columns
- sql_stmnt = "SELECT key_name AS key_name FROM mobile_events_notifications_keys WHERE REGEXP_LIKE(key_name, 'payload.')"
- payload_columns_raw = connection.fetch(sql_stmnt).all
- payload_columns = []
- payload_columns_raw.each {|c| payload_columns << c[:key_name]}
- payload_columns.each do |pc|
- #sql_stmnt = "INSERT /*+direct*/ INTO src_json_mobile_events_notifications(type, client_id, app_id, notification_id, message_id, platform, payload, payload_value, date, status)
- # Select type, client_id, app_id, notification_id, message_id, platform, '#{pc.gsub('payload.', '')}', \"#{pc}\"::VARCHAR(10000), date, status from mobile_events_notifications_view
- # group by type, client_id, app_id, notification_id, message_id, platform,'#{pc.gsub('payload.', '')}', \"#{pc}\", date, status;";
- sql_stmnt = "INSERT /*+direct*/ INTO src_json_mobile_events_notifications (user_id, type, token, status, sid, platform, notification_id, message_id, device_id, date, client_id, app_id, payload, payload_value)
- SELECT user_id, type, token, status, sid, platform, notification_id, message_id, device_id, date, client_id, app_id, '#{pc.gsub('payload. ', '')}', \"#{pc}\":: VARCHAR (10000) FROM mobile_events_notifications_view
- GROUP BY user_id, type, token, status, sid, platform, notification_id, message_id, device_id, date, client_id, app_id, '#{pc.gsub('payload.', '')}', \"#{pc}\";"
- @logger.info sql_stmnt
- connection.execute(sql_stmnt)
- end
- @logger.info "Job's done."
Add Comment
Please, Sign In to add comment