Guest User

Untitled

a guest
Nov 15th, 2014
158
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. import java.io.*;
  2. import java.text.DecimalFormat;
  3. import java.util.ArrayList;
  4. import java.util.Iterator;
  5. import java.util.PriorityQueue;
  6.  
  7. import org.apache.hadoop.conf.Configuration;
  8. import org.apache.hadoop.conf.Configured;
  9. import org.apache.hadoop.fs.Path;
  10. import org.apache.hadoop.io.DoubleWritable;
  11. import org.apache.hadoop.io.LongWritable;
  12. import org.apache.hadoop.io.NullWritable;
  13. import org.apache.hadoop.io.Text;
  14. import org.apache.hadoop.mapred.JobConf;
  15. import org.apache.hadoop.mapred.TextOutputFormat;
  16. import org.apache.hadoop.mapreduce.Job;
  17. import org.apache.hadoop.mapreduce.Mapper;
  18. import org.apache.hadoop.mapreduce.Reducer;
  19. import org.apache.hadoop.mapreduce.OutputFormat;
  20. import org.apache.hadoop.mapreduce.Reducer.Context;
  21. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  22. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  23. import org.apache.hadoop.util.GenericOptionsParser;
  24. import org.apache.hadoop.util.Tool;
  25. import org.apache.hadoop.util.ToolRunner;
  26.  
  27. public class BitcoinMapReduceTax {
  28.  
  29. public static final double CURRENT_PRICE = 429.91D;
  30. public static final int ONE_YEAR = (int)(60 * 60 * 24 * 365.25);
  31. public static final double LONG_TERM_TAX_RATE = 0.15D;
  32. public static final double SHORT_TERM_TAX_RATE = 0.35D;
  33. public static final DecimalFormat df = new DecimalFormat("$#,##0.00");
  34.    
  35.     public static void main(String[] args) throws Exception{
  36.        
  37.         JobConf conf = new JobConf();
  38.         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  39.        
  40.         if (otherArgs.length != 2) {
  41.             System.err.println("Usage: <in> <out>");
  42.             System.exit(2);
  43.         }
  44.        
  45.         Job job = new Job(conf, "BitcoinMapReduceTax");
  46.         job.setJarByClass(BitcoinMapReduceTax.class);
  47.         job.setMapperClass(Map.class);
  48.         job.setReducerClass(Reduce.class);
  49.         job.setOutputKeyClass(Text.class); //This corresponds to mapper's output key
  50.         job.setOutputValueClass(Text.class); //This corresponds to mapper's output value
  51.        
  52.         FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  53.         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  54.         job.setOutputFormatClass(org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.class);
  55.         job.waitForCompletion(true);
  56.         System.out.println("*******************Number of Lines skipped: " + job.getCounters().findCounter(Counter.LINESKIP).getValue() + "******************");
  57.         System.exit(job.isSuccessful() ? 0 : 1);
  58.     }
  59.    
  60.     //Record the number of illegal input lines
  61.     enum Counter {
  62.        
  63.         LINESKIP;
  64.     }
  65.    
  66.     public static class Map extends Mapper<Object, Text, Text, Text> {
  67.        
  68.         public void map(Object key, Text Value, Context context) throws IOException, InterruptedException{
  69.            
  70.             String line = Value.toString();
  71.        
  72.             try {
  73.                 String[] lineSplit = line.split(";|\\s|,");
  74.                
  75.                 String address = lineSplit[0];
  76.                 String timeStamp = lineSplit[1];
  77.                 String amount = lineSplit[3];
  78.                 String price = lineSplit[5];
  79.                 String total = "";
  80.                
  81.                 if (lineSplit.length == 6) {
  82.                     total = timeStamp + "@" + amount + "@" + price; //Pass 3 parameters to reducer, '@' is used to split
  83.                        
  84.                     context.write(new Text(address), new Text(total));
  85.                 } else {
  86.                    
  87.                     System.err.println("The number of columns in this line is wrong");
  88.                     throw new NumberFormatException();
  89.                 }
  90.                
  91.             } catch (ArrayIndexOutOfBoundsException e) {
  92.                
  93.                 System.err.println("This line's Column Number exceeded the limit.");
  94.                 context.getCounter(Counter.LINESKIP).increment(1);
  95.                 return;
  96.                
  97.             } catch (NumberFormatException e) {
  98.                
  99.                 System.err.println("This line's input format or number of columns is incorrect.");
  100.                 context.getCounter(Counter.LINESKIP).increment(1);
  101.                 return;
  102.             }
  103.         }
  104.     }
  105.    
  106.     public static class Reduce extends Reducer<Text, Text, Text, Text> {
  107.        
  108.         public void reduce(Text key, Iterable<Text> values, Context context) throws InterruptedException, IOException{
  109.            
  110.             double shortTermProfit = 0;
  111.             double longTermProfit = 0;
  112.            
  113.             PriorityQueue<Record> boughtRecord = new PriorityQueue<Record>();
  114.             PriorityQueue<Record> soldRecord = new PriorityQueue<Record>();
  115.            
  116.             try {
  117.                 for (Text value : values) {
  118.                    
  119.                     String[] amountAndPrice = value.toString().split("@");
  120.    
  121.                     int timestamp = Integer.parseInt(amountAndPrice[0]); //4 bytes
  122.                     float amount = Float.parseFloat(amountAndPrice[1]); //4 bytes
  123.                     float price = Float.parseFloat(amountAndPrice[2]); // 4 bytes
  124.                    
  125.                     if (amount > 0) {
  126.                         Record r = new Record(timestamp, amount, price);
  127.                         boughtRecord.add(r);
  128.                     }
  129.                     else if (amount < 0) {
  130.                         Record r = new Record(timestamp, amount, price);
  131.                         soldRecord.add(r);
  132.                     }
  133.                     else if(amount == 0)
  134.                         throw new NumberFormatException();
  135.  
  136.                 }
  137.                
  138.             } catch(NumberFormatException e) {
  139.                
  140.                 e.printStackTrace();
  141.                 context.getCounter(Counter.LINESKIP).increment(1);
  142.                 return;
  143.             }
  144.            
  145.             while(soldRecord.size() > 0 && boughtRecord.size() > 0) {
  146.                
  147.                 Record soldTransaction = soldRecord.remove();
  148.                 Record boughtTransaction = boughtRecord.remove();
  149.                            
  150.                 Float soldAmount = soldTransaction.amount;
  151.                 Float boughtAmount = boughtTransaction.amount;         
  152.                 Float soldPrice = soldTransaction.price;
  153.                 Float boughtPrice = boughtTransaction.price;
  154.                                
  155.                 long ownershipPeriod = soldTransaction.timestamp - boughtTransaction.timestamp;
  156.  
  157.                 if(boughtAmount.compareTo(Math.abs(soldAmount)) > 0) {
  158.                    
  159.                     if(ownershipPeriod <= ONE_YEAR)
  160.                         shortTermProfit += Math.abs(soldAmount) * (soldPrice - boughtPrice);
  161.                     else
  162.                         longTermProfit += Math.abs(soldAmount) * (soldPrice - boughtPrice);
  163.                    
  164.                     boughtAmount += soldAmount;
  165.                     Record r = new Record(boughtTransaction.timestamp,boughtAmount,boughtPrice);
  166.                     boughtRecord.add(r);
  167.                 }
  168.                
  169.                 else if(boughtAmount.compareTo(Math.abs(soldAmount)) < 0) {
  170.                    
  171.                     if(ownershipPeriod <= ONE_YEAR)
  172.                         shortTermProfit += boughtAmount * (soldPrice - boughtPrice);
  173.                     else
  174.                         longTermProfit += boughtAmount * (soldPrice - boughtPrice);
  175.                    
  176.                     soldAmount += boughtAmount;
  177.                     Record r = new Record(soldTransaction.timestamp,soldAmount,soldPrice);
  178.                     soldRecord.add(r);
  179.                 }
  180.                
  181.                 else if(boughtAmount.compareTo(Math.abs(soldAmount)) == 0) {
  182.                    
  183.                     if (ownershipPeriod <= ONE_YEAR)
  184.                         shortTermProfit += boughtAmount * (soldPrice - boughtPrice);
  185.                     else
  186.                         longTermProfit += boughtAmount * (soldPrice - boughtPrice);
  187.                 }
  188.                
  189.             }
  190.            
  191.             double shortTermTax = (shortTermProfit <= 0)? 0 : shortTermProfit * SHORT_TERM_TAX_RATE;
  192.             double longTermTax = (longTermProfit <= 0)? 0 : longTermProfit * LONG_TERM_TAX_RATE;
  193.             double totalProfit = longTermProfit + shortTermProfit - shortTermTax - longTermTax;
  194.            
  195.             if (shortTermProfit >= 0.01D || longTermProfit >= 0.01D)
  196.             context.write(new Text(key), new Text(       df.format(shortTermProfit) +
  197.                                                   "\t" + df.format(shortTermTax) +
  198.                                                   "\t" + df.format(longTermProfit) +
  199.                                                   "\t" + df.format(longTermTax) +
  200.                                                   "\t" + df.format(totalProfit)));
  201.         }
  202.     }
  203. }
  204.  
  205. class Record implements Comparable<Record>{
  206.     //String address;
  207.     Integer timestamp;
  208.     //Long timeStamp;
  209.     Float amount;
  210.     Float price;
  211.    
  212.     public Record() {
  213.        
  214.     }
  215.    
  216.     public Record(Integer timestamp, Float amount, Float price) {
  217.         this.timestamp = timestamp;
  218.         this.amount = amount;
  219.         this.price = price;
  220.     }
  221.    
  222.     public int compareTo(Record r) {
  223.         if (this.timestamp < r.timestamp)
  224.             return -1;
  225.         if (this.timestamp > r.timestamp)
  226.             return 1;
  227.        
  228.         return 0;
  229.     }
  230.    
  231.     public String toString(){
  232.         return timestamp + " " + amount + " " + price;
  233.     }
  234. }
RAW Paste Data