Guest User

Untitled

a guest
Oct 21st, 2017
93
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.09 KB | None | 0 0
  1. import java.io.IOException;
  2. import java.util.*;
  3.  
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.conf.*;
  6. import org.apache.hadoop.io.*;
  7. import org.apache.hadoop.io.Writable;
  8. import org.apache.hadoop.mapred.*;
  9. import org.apache.hadoop.util.*;
  10. import java.io.DataInput;
  11. import java.io.DataOutput;
  12. import java.lang.Thread;
  13. import java.sql.PreparedStatement;
  14. import java.sql.ResultSet;
  15. import java.sql.SQLException;
  16. import org.apache.hadoop.mapred.lib.db.DBWritable;
  17. import org.apache.hadoop.mapred.lib.IdentityReducer;
  18. import org.apache.hadoop.mapred.lib.db.DBConfiguration;
  19. import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
  20. import org.apache.hadoop.mapred.lib.db.DBInputFormat;
  21. /*
  22. MySQL DB Schema:
  23.  
  24. DROP TABLE IF EXISTS `WordCount`.`Counting`;
  25. CREATE TABLE `WordCount`.`Counting` (
  26. `name` char(48) default NULL,
  27. `count` int(11) default NULL
  28. ) ENGINE=InnoDB DEFAULT CHARSET=latin1;
  29.  
  30. */
  31.  
  32. public class DBWordCount extends Thread
  33. {
  34. public void run() //throws Exception
  35. {
  36. /* Start ! */
  37. try
  38. {
  39. JobClient.runJob(conf);
  40. }
  41. catch(Exception e)
  42. {
  43. // do nothing
  44. }
  45. }
  46.  
  47. public void fnSetJob1(String[] args) throws Exception
  48. {
  49. conf.setJobName("MySQL DB Wordcount Job1");
  50.  
  51.  
  52. Class.forName("com.mysql.jdbc.Driver");
  53.  
  54. conf.setInputFormat(TextInputFormat.class);
  55. conf.setOutputFormat(DBOutputFormat.class);
  56. FileInputFormat.setInputPaths(conf, new Path(args[0]));
  57.  
  58. // Set up your host name and account
  59. String[] MyDBPath={"jdbc:mysql://MySQL主機位置:3306/WordCount","帳號", "密碼"};
  60. DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",MyDBPath[0], MyDBPath[1], MyDBPath[2]);
  61.  
  62. // Setup Output MySQL Format
  63. DBOutputFormat.setOutput(conf, "Counting","name", "count");
  64.  
  65. // Set Mapper and Reducer Class
  66. conf.setMapperClass(Map.class);
  67. //conf.setCombinerClass(Reduce.class);
  68. conf.setReducerClass(Reduce.class);
  69.  
  70. conf.setMapOutputKeyClass(Text.class);
  71. conf.setMapOutputValueClass(IntWritable.class);
  72. conf.setOutputKeyClass(WordCountInfoRecord.class);
  73. conf.setOutputValueClass(NullWritable.class);
  74. }
  75.  
  76. public void fnSetJob2(String[] args) throws Exception
  77. {
  78. //JobConf conf = new JobConf(DBWordCount.class);
  79. conf.setJobName("MySQL DB Wordcount Job2");
  80.  
  81.  
  82. Class.forName("com.mysql.jdbc.Driver");
  83. // Set up your host name and account
  84. String[] MyDBPath={"jdbc:mysql://MySQL主機位置:3306/WordCount","帳號", "密碼"};
  85.  
  86. conf.setInputFormat(TextInputFormat.class);
  87. conf.setOutputFormat(DBOutputFormat.class);
  88. FileInputFormat.setInputPaths(conf, new Path(args[0]));
  89.  
  90. // Setup MySQL Connection , default account:root , no password
  91. DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",MyDBPath[0], MyDBPath[1], MyDBPath[2]);
  92.  
  93. // Setup Output MySQL Format
  94. DBOutputFormat.setOutput(conf, "Counting","name", "count");
  95.  
  96. // Set Mapper and Reducer Class
  97. conf.setMapperClass(Map.class);
  98. //conf.setCombinerClass(Reduce.class);
  99. conf.setReducerClass(Reduce2.class);
  100.  
  101. // I've tried all combinations , but the bug still happen.
  102. conf.setMapOutputKeyClass(Text.class);
  103. conf.setMapOutputValueClass(IntWritable.class);
  104. conf.setOutputKeyClass(WordCountInfoRecord.class);
  105. conf.setOutputValueClass(NullWritable.class);
  106. }
  107.  
  108. JobConf conf = new JobConf(DBWordCount.class);
  109. //JobConf conf2 = new JobConf(DBWordCount.class);
  110.  
  111. // Output Record Object
  112. static class WordCountInfoRecord implements Writable, DBWritable
  113. {
  114. public String name;
  115. public int count;
  116. public WordCountInfoRecord() {
  117.  
  118. }
  119.  
  120. public WordCountInfoRecord(String str, int c)
  121. {
  122. this.name = str;
  123. this.count = c;
  124. }
  125.  
  126. public void readFields(DataInput in) throws IOException {
  127. this.name = Text.readString(in);
  128. this.count = in.readInt();
  129. }
  130. public void write(DataOutput out) throws IOException {
  131. Text.writeString(out, this.name);
  132. out.writeInt(this.count);
  133. }
  134.  
  135. public void readFields(ResultSet result) throws SQLException {
  136. this.name = result.getString(1);
  137. this.count = result.getInt(2);
  138. }
  139. public void write(PreparedStatement stmt) throws SQLException {
  140. stmt.setString(1, this.name);
  141. stmt.setInt(2, this.count);
  142. }
  143. public String toString() {
  144. return new String(this.name + " " + this.count);
  145. }
  146. }
  147.  
  148. public static class Map extends MapReduceBase implements Mapper
  149. {
  150.  
  151. private final static IntWritable one = new IntWritable(1);
  152. private Text word = new Text();
  153.  
  154. public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException
  155. {
  156. String line = value.toString();
  157. StringTokenizer tokenizer = new StringTokenizer(line);
  158. while (tokenizer.hasMoreTokens()) {
  159. word.set(tokenizer.nextToken());
  160. output.collect(word, one);
  161. }
  162. }
  163. }
  164.  
  165.  
  166. public static class Reduce extends MapReduceBase implements Reducer
  167. {
  168. public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException
  169. {
  170. int sum = 0;
  171. while (values.hasNext()) {
  172. sum += values.next().get();
  173. }
  174. // Output Data into MySQL
  175. output.collect(new WordCountInfoRecord(key.toString(),sum), NullWritable.get());
  176. }
  177. }
  178.  
  179. public static class Reduce2 extends MapReduceBase implements Reducer
  180. {
  181. public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException
  182. {
  183. int sum = 0;
  184. while (values.hasNext()) {
  185. sum += values.next().get();
  186. }
  187. // Output Data into MySQL
  188. output.collect(new WordCountInfoRecord("Job2_"+key.toString(),sum), NullWritable.get());
  189. }
  190. }
  191.  
  192. public static void main(String[] args) throws Exception
  193. {
  194. DBWordCount thread1=new DBWordCount();
  195. // Set Thread1
  196. thread1.fnSetJob1(args);
  197. DBWordCount thread2=new DBWordCount();
  198. // Set Thread2
  199. thread2.fnSetJob2(args);
  200. // Thread 1 Start
  201. thread1.start();
  202. // Thread 2 Start
  203. thread2.start();
  204.  
  205. }
  206. }
Add Comment
Please, Sign In to add comment