Don't like ads? PRO users don't see any ads ;-)
Guest

Untitled

By: a guest on Aug 10th, 2012  |  syntax: None  |  size: 6.09 KB  |  hits: 11  |  expires: Never
download  |  raw  |  embed  |  report abuse  |  print
Text below is selected. Please press Ctrl+C to copy to your clipboard. (⌘+C on Mac)
  1. import java.io.IOException;
  2. import java.util.*;
  3.  
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.conf.*;
  6. import org.apache.hadoop.io.*;
  7. import org.apache.hadoop.io.Writable;
  8. import org.apache.hadoop.mapred.*;
  9. import org.apache.hadoop.util.*;
  10. import java.io.DataInput;
  11. import java.io.DataOutput;
  12. import java.lang.Thread;
  13. import java.sql.PreparedStatement;
  14. import java.sql.ResultSet;
  15. import java.sql.SQLException;
  16. import org.apache.hadoop.mapred.lib.db.DBWritable;
  17. import org.apache.hadoop.mapred.lib.IdentityReducer;
  18. import org.apache.hadoop.mapred.lib.db.DBConfiguration;
  19. import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
  20. import org.apache.hadoop.mapred.lib.db.DBInputFormat;
  21. /*
  22. MySQL DB Schema:
  23.  
  24. DROP TABLE IF EXISTS `WordCount`.`Counting`;
  25. CREATE TABLE  `WordCount`.`Counting` (
  26. `name` char(48) default NULL,
  27. `count` int(11) default NULL
  28. ) ENGINE=InnoDB DEFAULT CHARSET=latin1;
  29.  
  30. */
  31.  
  32. public class DBWordCount extends Thread
  33. {
  34.     public void run() //throws Exception
  35.     {
  36.  /* Start ! */
  37.  try
  38.  {
  39.      JobClient.runJob(conf);
  40.  }
  41.  catch(Exception e)
  42.  {
  43.      // do nothing
  44.  }
  45.     }
  46.  
  47.     public void fnSetJob1(String[] args) throws Exception
  48.     {
  49.  conf.setJobName("MySQL DB Wordcount Job1");
  50.  
  51.  
  52.  Class.forName("com.mysql.jdbc.Driver");
  53.  
  54.  conf.setInputFormat(TextInputFormat.class);
  55.  conf.setOutputFormat(DBOutputFormat.class);
  56.  FileInputFormat.setInputPaths(conf, new Path(args[0]));
  57.  
  58.  // Set up your host name and account
  59.  String[] MyDBPath={"jdbc:mysql://MySQL主機位置:3306/WordCount","帳號", "密碼"};
  60.  DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",MyDBPath[0], MyDBPath[1], MyDBPath[2]);
  61.  
  62.  // Setup Output MySQL Format
  63.  DBOutputFormat.setOutput(conf, "Counting","name", "count");
  64.  
  65.  // Set Mapper and Reducer Class
  66.  conf.setMapperClass(Map.class);
  67.  //conf.setCombinerClass(Reduce.class);
  68.  conf.setReducerClass(Reduce.class);
  69.  
  70.  conf.setMapOutputKeyClass(Text.class);
  71.  conf.setMapOutputValueClass(IntWritable.class);
  72.  conf.setOutputKeyClass(WordCountInfoRecord.class);
  73.  conf.setOutputValueClass(NullWritable.class);
  74.     }
  75.  
  76.     public void fnSetJob2(String[] args) throws Exception
  77.     {
  78.      //JobConf conf = new JobConf(DBWordCount.class);
  79.      conf.setJobName("MySQL DB Wordcount Job2");
  80.  
  81.  
  82.      Class.forName("com.mysql.jdbc.Driver");
  83.      // Set up your host name and account
  84.      String[] MyDBPath={"jdbc:mysql://MySQL主機位置:3306/WordCount","帳號", "密碼"};
  85.  
  86.      conf.setInputFormat(TextInputFormat.class);
  87.      conf.setOutputFormat(DBOutputFormat.class);
  88.      FileInputFormat.setInputPaths(conf, new Path(args[0]));
  89.  
  90.      // Setup MySQL Connection , default account:root , no password
  91.      DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",MyDBPath[0], MyDBPath[1], MyDBPath[2]);
  92.  
  93.      // Setup Output MySQL Format
  94.      DBOutputFormat.setOutput(conf, "Counting","name", "count");
  95.  
  96.      // Set Mapper and Reducer Class
  97.      conf.setMapperClass(Map.class);
  98.      //conf.setCombinerClass(Reduce.class);
  99.      conf.setReducerClass(Reduce2.class);
  100.  
  101.      // I've tried all combinations , but the bug still happen.
  102.      conf.setMapOutputKeyClass(Text.class);
  103.      conf.setMapOutputValueClass(IntWritable.class);
  104.      conf.setOutputKeyClass(WordCountInfoRecord.class);
  105.      conf.setOutputValueClass(NullWritable.class);
  106.     }
  107.  
  108.     JobConf conf = new JobConf(DBWordCount.class);
  109.     //JobConf conf2 = new JobConf(DBWordCount.class);
  110.  
  111.     // Output Record Object
  112.     static class WordCountInfoRecord implements Writable,  DBWritable
  113.     {
  114.  public String name;
  115.  public int count;
  116.  public WordCountInfoRecord() {
  117.  
  118.  }
  119.  
  120.  public WordCountInfoRecord(String str, int c)
  121.  {
  122.      this.name = str;
  123.      this.count = c;
  124.  }
  125.  
  126.  public void readFields(DataInput in) throws IOException {
  127.      this.name = Text.readString(in);
  128.      this.count = in.readInt();
  129.  }
  130.  public void write(DataOutput out) throws IOException {
  131.      Text.writeString(out, this.name);
  132.      out.writeInt(this.count);
  133.  }
  134.  
  135.  public void readFields(ResultSet result) throws SQLException {
  136.      this.name = result.getString(1);
  137.      this.count = result.getInt(2);
  138.  }
  139.  public void write(PreparedStatement stmt) throws SQLException {
  140.      stmt.setString(1, this.name);
  141.      stmt.setInt(2, this.count);
  142.  }
  143.  public String toString() {
  144.      return new String(this.name + " " + this.count);
  145.  }
  146.     }
  147.  
  148.     public static class Map extends MapReduceBase implements Mapper
  149.     {
  150.  
  151.       private final static IntWritable one = new IntWritable(1);
  152.       private Text word = new Text();
  153.  
  154.       public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException
  155.       {
  156.         String line = value.toString();
  157.         StringTokenizer tokenizer = new StringTokenizer(line);
  158.         while (tokenizer.hasMoreTokens()) {
  159.           word.set(tokenizer.nextToken());
  160.           output.collect(word, one);
  161.         }
  162.       }
  163.     }
  164.  
  165.  
  166.     public static class Reduce extends MapReduceBase implements Reducer
  167.     {
  168.       public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException
  169.       {
  170.         int sum = 0;
  171.         while (values.hasNext()) {
  172.           sum += values.next().get();
  173.         }
  174.  // Output Data into MySQL
  175.  output.collect(new WordCountInfoRecord(key.toString(),sum), NullWritable.get());
  176.       }
  177.     }
  178.  
  179.     public static class Reduce2 extends MapReduceBase implements Reducer
  180.     {
  181.       public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException
  182.       {
  183.         int sum = 0;
  184.         while (values.hasNext()) {
  185.           sum += values.next().get();
  186.         }
  187.  // Output Data into MySQL
  188.  output.collect(new WordCountInfoRecord("Job2_"+key.toString(),sum), NullWritable.get());
  189.       }
  190.     }
  191.  
  192.     public static void main(String[] args) throws Exception
  193.     {
  194.  DBWordCount thread1=new DBWordCount();
  195.  // Set Thread1
  196.  thread1.fnSetJob1(args);
  197.  DBWordCount thread2=new DBWordCount();
  198.  // Set Thread2
  199.  thread2.fnSetJob2(args);
  200.  // Thread 1 Start
  201.  thread1.start();
  202.  // Thread 2 Start
  203.  thread2.start();
  204.  
  205.     }
  206. }