SHARE
TWEET

Untitled

a guest Jun 8th, 2017 76 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. package com.loghelper;
  2.  
  3. import java.io.IOException;
  4. import java.sql.Connection;
  5. import java.sql.DriverManager;
  6. import java.sql.SQLException;
  7. import java.sql.Statement;
  8. import java.util.ArrayList;
  9.  
  10. import org.apache.hadoop.fs.FileStatus;
  11. import org.apache.hadoop.fs.FileSystem;
  12. import org.apache.hadoop.fs.Path;
  13. import org.apache.hadoop.hive.conf.HiveConf;
  14.  
  15. public class Hdfs2Hive {
  16.     private static FileSystem fileSystem = null;
  17.     private static ArrayList<String> queries = new ArrayList<String>();
  18.     static {
  19.         try {
  20.             Class.forName("org.apache.hadoop.hive.jdbc.HiveDriver");
  21.         } catch (Exception e) {
  22.             e.printStackTrace();
  23.         }
  24.  
  25.         try {
  26.             fileSystem = FileSystem.get(new HiveConf());
  27.         } catch (IOException e) {
  28.             e.printStackTrace();
  29.         }
  30.         queries
  31.             .add("FROM (SELECT email, site, FROM_UNIXTIME(epoch, 'yyyy-MM-dd') ts, remote_addr, COUNT(1) c FROM ping GROUP BY email, site, FROM_UNIXTIME(epoch, 'yyyy-MM-dd'), remote_addr) p SELECT TRANSFORM('log_site_hits', p.email, p.site, p.ts, p.remote_addr, p.c) USING '/var/my_reducer'");
  32.         queries
  33.             .add("FROM (SELECT email, site, FROM_UNIXTIME(epoch, 'yyyy-MM-dd') ts, remote_addr, referer, COUNT(1) c FROM ping GROUP BY email, site, FROM_UNIXTIME(epoch, 'yyyy-MM-dd'), remote_addr, referer) p SELECT TRANSFORM('log_site_referers', p.email, p.site, p.ts, p.remote_addr, p.referer, p.c) USING '/var/my_reducer'");
  34.         queries
  35.             .add("FROM (SELECT email, site, FROM_UNIXTIME(epoch, 'yyyy-MM-dd') ts, remote_addr, uri, COUNT(1) c FROM ping GROUP BY email, site, FROM_UNIXTIME(epoch, 'yyyy-MM-dd'), remote_addr, uri) p SELECT TRANSFORM('log_site_uris', p.email, p.site, p.ts, p.remote_addr, p.uri, p.c) USING '/var/my_reducer'");
  36.         queries
  37.             .add("FROM (SELECT email, site, FROM_UNIXTIME(epoch, 'yyyy-MM-dd') ts, remote_addr, user_agent, COUNT(1) c FROM ping GROUP BY email, site, FROM_UNIXTIME(epoch, 'yyyy-MM-dd'), remote_addr, user_agent) p SELECT TRANSFORM('log_site_user_agent', p.email, p.site, p.ts, p.remote_addr, p.user_agent, p.c) USING '/var/my_reducer'");
  38.  
  39.     }
  40.     private static final String PING_DIRECTORY = "/ping";
  41.     private static final String RAW_DIRECTORY = PING_DIRECTORY + "/raw";
  42.     private static final String PROCESSING_DIRECTORY = PING_DIRECTORY + "/processing";
  43.     private static final String PROCESSED_DIRECTORY = PING_DIRECTORY + "/processed";
  44.     private static final int RAW_PERIOD = 60000;
  45.  
  46.     // private static final int PROCESSING_PERIOD = 60000;
  47.  
  48.     public static void main(String[] args) {
  49.         Hdfs2Hive hdfs2Hive = new Hdfs2Hive();
  50.  
  51.         while (true) {
  52.             hdfs2Hive.raw2processing();
  53.             hdfs2Hive.processing2processed();
  54.             try {
  55.                 System.out.println("sleeping . . .");
  56.                 Thread.sleep(5000);
  57.             } catch (InterruptedException e) {
  58.                 e.printStackTrace();
  59.             }
  60.         }
  61.     }
  62.  
  63.     @SuppressWarnings("deprecation")
  64.     public void raw2processing() {
  65.         Path dirPath = new Path(RAW_DIRECTORY + "/*");
  66.  
  67.         try {
  68.             FileStatus[] files = fileSystem.globStatus(dirPath);
  69.             System.out.println("found " + files.length + " raw files");
  70.             if (files.length == 0) {
  71.                 System.out.println("no raw files");
  72.                 return;
  73.             }
  74.  
  75.             if (!fileSystem.exists(new Path(PROCESSING_DIRECTORY))) {
  76.                 fileSystem.mkdirs(new Path(PROCESSING_DIRECTORY));
  77.             }
  78.  
  79.             for (FileStatus file : files) {
  80.                 long age = System.currentTimeMillis() - file.getModificationTime();
  81.  
  82.                 if (age < RAW_PERIOD) {
  83.                     System.out.println("skipping " + file.getPath().getName() + ", age: " + age);
  84.                     continue;
  85.                 }
  86.                 System.out.println("path: " + file.getPath().getName() + ", age: " + age);
  87.                 long size = file.getLen();
  88.                 if (size == 0) {
  89.                     System.out.println("deleting " + file.getPath().getName());
  90.                     fileSystem.delete(file.getPath());
  91.                     continue;
  92.                 }
  93.  
  94.                 String to = PROCESSING_DIRECTORY + "/" + file.getPath().getName();
  95.                 boolean success = fileSystem.rename(file.getPath(), new Path(to));
  96.                 System.out.println(file.getPath() + " -> " + to + ", success: " + success);
  97.             }
  98.         } catch (IOException e) {
  99.             e.printStackTrace();
  100.         }
  101.     }
  102.  
  103.     private void processing2processed() {
  104.         FileStatus[] files = null;
  105.  
  106.         try {
  107.             files = fileSystem.globStatus(new Path("/ping/processing/*"));
  108.  
  109.             System.out.println("found " + files.length + " processing files");
  110.             if (files.length == 0) {
  111.                 System.out.println("no processing files");
  112.                 return;
  113.             }
  114.         } catch (IOException e) {
  115.             e.printStackTrace();
  116.         }
  117.  
  118.         try {
  119.             Connection connection = DriverManager.getConnection("jdbc:hive://", "", "");
  120.             Statement statement = connection.createStatement();
  121.             statement.executeQuery("LOAD DATA INPATH '" + PROCESSING_DIRECTORY + "/*' OVERWRITE INTO TABLE ping");
  122.  
  123.             for (String query : queries) {
  124.                 MyQuery myQuery = new MyQuery(query);
  125.                 myQuery.start();
  126.             }
  127.         } catch (SQLException e) {
  128.             e.printStackTrace();
  129.             System.exit(0);
  130.         }
  131.  
  132.         try {
  133.             files = fileSystem.globStatus(new Path("/ping/processing*"));
  134.         } catch (IOException e1) {
  135.             // TODO Auto-generated catch block
  136.             e1.printStackTrace();
  137.         }
  138.  
  139.         for (FileStatus file : files) {
  140.             System.out.println("path: " + file.getPath().getName());
  141.  
  142.             String to = PROCESSED_DIRECTORY + "/" + file.getPath().getName();
  143.             boolean success = false;
  144.             try {
  145.                 success = fileSystem.rename(file.getPath(), new Path(to));
  146.             } catch (IOException e) {
  147.                 // TODO Auto-generated catch block
  148.                 e.printStackTrace();
  149.             }
  150.             System.out.println(file.getPath() + " -> " + to + ", success: " + success);
  151.         }
  152.     }
  153.  
  154.     private class MyQuery extends Thread {
  155.         private String query;
  156.  
  157.         public MyQuery(String query) {
  158.             this.query = query;
  159.         }
  160.  
  161.         public void run() {
  162.             Statement statement;
  163.             try {
  164.                 Connection connection = DriverManager.getConnection("jdbc:hive://", "", "");
  165.                 statement = connection.createStatement();
  166.                 statement.executeQuery(query);
  167.             } catch (SQLException e) {
  168.                 // TODO Auto-generated catch block
  169.                 e.printStackTrace();
  170.             }
  171.         }
  172.     }
  173. }
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top