Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import java.io.IOException;
- import java.util.*;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.conf.*;
- import org.apache.hadoop.io.*;
- import org.apache.hadoop.io.Writable;
- import org.apache.hadoop.mapred.*;
- import org.apache.hadoop.util.*;
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.lang.Thread;
- import java.sql.PreparedStatement;
- import java.sql.ResultSet;
- import java.sql.SQLException;
- import org.apache.hadoop.mapred.lib.db.DBWritable;
- import org.apache.hadoop.mapred.lib.IdentityReducer;
- import org.apache.hadoop.mapred.lib.db.DBConfiguration;
- import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
- import org.apache.hadoop.mapred.lib.db.DBInputFormat;
- /*
- MySQL DB Schema:
- DROP TABLE IF EXISTS `WordCount`.`Counting`;
- CREATE TABLE `WordCount`.`Counting` (
- `name` char(48) default NULL,
- `count` int(11) default NULL
- ) ENGINE=InnoDB DEFAULT CHARSET=latin1;
- */
- public class DBWordCount extends Thread
- {
- public void run() //throws Exception
- {
- /* Start ! */
- try
- {
- JobClient.runJob(conf);
- }
- catch(Exception e)
- {
- // do nothing
- }
- }
- public void fnSetJob1(String[] args) throws Exception
- {
- conf.setJobName("MySQL DB Wordcount Job1");
- Class.forName("com.mysql.jdbc.Driver");
- conf.setInputFormat(TextInputFormat.class);
- conf.setOutputFormat(DBOutputFormat.class);
- FileInputFormat.setInputPaths(conf, new Path(args[0]));
- // Set up your host name and account
- String[] MyDBPath={"jdbc:mysql://MySQL主機位置:3306/WordCount","帳號", "密碼"};
- DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",MyDBPath[0], MyDBPath[1], MyDBPath[2]);
- // Setup Output MySQL Format
- DBOutputFormat.setOutput(conf, "Counting","name", "count");
- // Set Mapper and Reducer Class
- conf.setMapperClass(Map.class);
- //conf.setCombinerClass(Reduce.class);
- conf.setReducerClass(Reduce.class);
- conf.setMapOutputKeyClass(Text.class);
- conf.setMapOutputValueClass(IntWritable.class);
- conf.setOutputKeyClass(WordCountInfoRecord.class);
- conf.setOutputValueClass(NullWritable.class);
- }
- public void fnSetJob2(String[] args) throws Exception
- {
- //JobConf conf = new JobConf(DBWordCount.class);
- conf.setJobName("MySQL DB Wordcount Job2");
- Class.forName("com.mysql.jdbc.Driver");
- // Set up your host name and account
- String[] MyDBPath={"jdbc:mysql://MySQL主機位置:3306/WordCount","帳號", "密碼"};
- conf.setInputFormat(TextInputFormat.class);
- conf.setOutputFormat(DBOutputFormat.class);
- FileInputFormat.setInputPaths(conf, new Path(args[0]));
- // Setup MySQL Connection , default account:root , no password
- DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",MyDBPath[0], MyDBPath[1], MyDBPath[2]);
- // Setup Output MySQL Format
- DBOutputFormat.setOutput(conf, "Counting","name", "count");
- // Set Mapper and Reducer Class
- conf.setMapperClass(Map.class);
- //conf.setCombinerClass(Reduce.class);
- conf.setReducerClass(Reduce2.class);
- // I've tried all combinations , but the bug still happen.
- conf.setMapOutputKeyClass(Text.class);
- conf.setMapOutputValueClass(IntWritable.class);
- conf.setOutputKeyClass(WordCountInfoRecord.class);
- conf.setOutputValueClass(NullWritable.class);
- }
- JobConf conf = new JobConf(DBWordCount.class);
- //JobConf conf2 = new JobConf(DBWordCount.class);
- // Output Record Object
- static class WordCountInfoRecord implements Writable, DBWritable
- {
- public String name;
- public int count;
- public WordCountInfoRecord() {
- }
- public WordCountInfoRecord(String str, int c)
- {
- this.name = str;
- this.count = c;
- }
- public void readFields(DataInput in) throws IOException {
- this.name = Text.readString(in);
- this.count = in.readInt();
- }
- public void write(DataOutput out) throws IOException {
- Text.writeString(out, this.name);
- out.writeInt(this.count);
- }
- public void readFields(ResultSet result) throws SQLException {
- this.name = result.getString(1);
- this.count = result.getInt(2);
- }
- public void write(PreparedStatement stmt) throws SQLException {
- stmt.setString(1, this.name);
- stmt.setInt(2, this.count);
- }
- public String toString() {
- return new String(this.name + " " + this.count);
- }
- }
- public static class Map extends MapReduceBase implements Mapper
- {
- private final static IntWritable one = new IntWritable(1);
- private Text word = new Text();
- public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException
- {
- String line = value.toString();
- StringTokenizer tokenizer = new StringTokenizer(line);
- while (tokenizer.hasMoreTokens()) {
- word.set(tokenizer.nextToken());
- output.collect(word, one);
- }
- }
- }
- public static class Reduce extends MapReduceBase implements Reducer
- {
- public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException
- {
- int sum = 0;
- while (values.hasNext()) {
- sum += values.next().get();
- }
- // Output Data into MySQL
- output.collect(new WordCountInfoRecord(key.toString(),sum), NullWritable.get());
- }
- }
- public static class Reduce2 extends MapReduceBase implements Reducer
- {
- public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException
- {
- int sum = 0;
- while (values.hasNext()) {
- sum += values.next().get();
- }
- // Output Data into MySQL
- output.collect(new WordCountInfoRecord("Job2_"+key.toString(),sum), NullWritable.get());
- }
- }
- public static void main(String[] args) throws Exception
- {
- DBWordCount thread1=new DBWordCount();
- // Set Thread1
- thread1.fnSetJob1(args);
- DBWordCount thread2=new DBWordCount();
- // Set Thread2
- thread2.fnSetJob2(args);
- // Thread 1 Start
- thread1.start();
- // Thread 2 Start
- thread2.start();
- }
- }
Add Comment
Please, Sign In to add comment