SHARE
TWEET

Untitled

a guest Nov 15th, 2014 118 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. import java.io.*;
  2. import java.text.DecimalFormat;
  3. import java.util.ArrayList;
  4. import java.util.Iterator;
  5. import java.util.PriorityQueue;
  6.  
  7. import org.apache.hadoop.conf.Configuration;
  8. import org.apache.hadoop.conf.Configured;
  9. import org.apache.hadoop.fs.Path;
  10. import org.apache.hadoop.io.DoubleWritable;
  11. import org.apache.hadoop.io.LongWritable;
  12. import org.apache.hadoop.io.NullWritable;
  13. import org.apache.hadoop.io.Text;
  14. import org.apache.hadoop.mapred.JobConf;
  15. import org.apache.hadoop.mapred.TextOutputFormat;
  16. import org.apache.hadoop.mapreduce.Job;
  17. import org.apache.hadoop.mapreduce.Mapper;
  18. import org.apache.hadoop.mapreduce.Reducer;
  19. import org.apache.hadoop.mapreduce.OutputFormat;
  20. import org.apache.hadoop.mapreduce.Reducer.Context;
  21. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  22. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  23. import org.apache.hadoop.util.GenericOptionsParser;
  24. import org.apache.hadoop.util.Tool;
  25. import org.apache.hadoop.util.ToolRunner;
  26.  
  27. public class BitcoinMapReduceTax {
  28.  
  29. public static final double CURRENT_PRICE = 429.91D;
  30. public static final int ONE_YEAR = (int)(60 * 60 * 24 * 365.25);
  31. public static final double LONG_TERM_TAX_RATE = 0.15D;
  32. public static final double SHORT_TERM_TAX_RATE = 0.35D;
  33. public static final DecimalFormat df = new DecimalFormat("$#,##0.00");
  34.        
  35.         public static void main(String[] args) throws Exception{
  36.                
  37.                 JobConf conf = new JobConf();
  38.         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  39.        
  40.         if (otherArgs.length != 2) {
  41.             System.err.println("Usage: <in> <out>");
  42.             System.exit(2);
  43.         }
  44.                
  45.                 Job job = new Job(conf, "BitcoinMapReduceTax");
  46.                 job.setJarByClass(BitcoinMapReduceTax.class);
  47.                 job.setMapperClass(Map.class);
  48.                 job.setReducerClass(Reduce.class);
  49.                 job.setOutputKeyClass(Text.class); //This corresponds to mapper's output key
  50.                 job.setOutputValueClass(Text.class); //This corresponds to mapper's output value
  51.                
  52.                 FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  53.         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  54.                 job.setOutputFormatClass(org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.class);
  55.                 job.waitForCompletion(true);
  56.                 System.out.println("*******************Number of Lines skipped: " + job.getCounters().findCounter(Counter.LINESKIP).getValue() + "******************");
  57.                 System.exit(job.isSuccessful() ? 0 : 1);
  58.         }
  59.        
  60.         //Record the number of illegal input lines
  61.         enum Counter {
  62.                
  63.                 LINESKIP;
  64.         }
  65.        
  66.         public static class Map extends Mapper<Object, Text, Text, Text> {
  67.                
  68.                 public void map(Object key, Text Value, Context context) throws IOException, InterruptedException{
  69.                        
  70.                         String line = Value.toString();
  71.                
  72.                         try {
  73.                                 String[] lineSplit = line.split(";|\\s|,");
  74.                                
  75.                                 String address = lineSplit[0];
  76.                                 String timeStamp = lineSplit[1];
  77.                                 String amount = lineSplit[3];
  78.                                 String price = lineSplit[5];
  79.                                 String total = "";
  80.                                
  81.                                 if (lineSplit.length == 6) {
  82.                                         total = timeStamp + "@" + amount + "@" + price; //Pass 3 parameters to reducer, '@' is used to split
  83.                                                
  84.                                         context.write(new Text(address), new Text(total));
  85.                                 } else {
  86.                                        
  87.                                         System.err.println("The number of columns in this line is wrong");
  88.                                         throw new NumberFormatException();
  89.                                 }
  90.                                
  91.                         } catch (ArrayIndexOutOfBoundsException e) {
  92.                                
  93.                                 System.err.println("This line's Column Number exceeded the limit.");
  94.                                 context.getCounter(Counter.LINESKIP).increment(1);
  95.                                 return;
  96.                                
  97.                         } catch (NumberFormatException e) {
  98.                                
  99.                                 System.err.println("This line's input format or number of columns is incorrect.");
  100.                                 context.getCounter(Counter.LINESKIP).increment(1);
  101.                                 return;
  102.                         }
  103.                 }
  104.         }
  105.        
  106.         public static class Reduce extends Reducer<Text, Text, Text, Text> {
  107.                
  108.                 public void reduce(Text key, Iterable<Text> values, Context context) throws InterruptedException, IOException{
  109.                        
  110.                         double shortTermProfit = 0;
  111.                         double longTermProfit = 0;
  112.                        
  113.                         PriorityQueue<Record> boughtRecord = new PriorityQueue<Record>();
  114.                         PriorityQueue<Record> soldRecord = new PriorityQueue<Record>();
  115.                        
  116.                         try {
  117.                                 for (Text value : values) {
  118.                                        
  119.                                         String[] amountAndPrice = value.toString().split("@");
  120.        
  121.                                         int timestamp = Integer.parseInt(amountAndPrice[0]); //4 bytes
  122.                                         float amount = Float.parseFloat(amountAndPrice[1]); //4 bytes
  123.                                         float price = Float.parseFloat(amountAndPrice[2]); // 4 bytes
  124.                                        
  125.                                         if (amount > 0) {
  126.                                                 Record r = new Record(timestamp, amount, price);
  127.                                                 boughtRecord.add(r);
  128.                                         }
  129.                                         else if (amount < 0) {
  130.                                                 Record r = new Record(timestamp, amount, price);
  131.                                                 soldRecord.add(r);
  132.                                         }
  133.                                         else if(amount == 0)
  134.                                                 throw new NumberFormatException();
  135.  
  136.                                 }
  137.                                
  138.                         } catch(NumberFormatException e) {
  139.                                
  140.                                 e.printStackTrace();
  141.                                 context.getCounter(Counter.LINESKIP).increment(1);
  142.                                 return;
  143.                         }
  144.                        
  145.                         while(soldRecord.size() > 0 && boughtRecord.size() > 0) {
  146.                                
  147.                                 Record soldTransaction = soldRecord.remove();
  148.                                 Record boughtTransaction = boughtRecord.remove();
  149.                                                        
  150.                                 Float soldAmount = soldTransaction.amount;
  151.                                 Float boughtAmount = boughtTransaction.amount;                 
  152.                                 Float soldPrice = soldTransaction.price;
  153.                                 Float boughtPrice = boughtTransaction.price;
  154.                                                                
  155.                                 long ownershipPeriod = soldTransaction.timestamp - boughtTransaction.timestamp;
  156.  
  157.                                 if(boughtAmount.compareTo(Math.abs(soldAmount)) > 0) {
  158.                                        
  159.                                         if(ownershipPeriod <= ONE_YEAR)
  160.                                                 shortTermProfit += Math.abs(soldAmount) * (soldPrice - boughtPrice);
  161.                                         else
  162.                                                 longTermProfit += Math.abs(soldAmount) * (soldPrice - boughtPrice);
  163.                                        
  164.                                         boughtAmount += soldAmount;
  165.                                         Record r = new Record(boughtTransaction.timestamp,boughtAmount,boughtPrice);
  166.                                         boughtRecord.add(r);
  167.                                 }
  168.                                
  169.                                 else if(boughtAmount.compareTo(Math.abs(soldAmount)) < 0) {
  170.                                        
  171.                                         if(ownershipPeriod <= ONE_YEAR)
  172.                                                 shortTermProfit += boughtAmount * (soldPrice - boughtPrice);
  173.                                         else
  174.                                                 longTermProfit += boughtAmount * (soldPrice - boughtPrice);
  175.                                        
  176.                                         soldAmount += boughtAmount;
  177.                                         Record r = new Record(soldTransaction.timestamp,soldAmount,soldPrice);
  178.                                         soldRecord.add(r);
  179.                                 }
  180.                                
  181.                                 else if(boughtAmount.compareTo(Math.abs(soldAmount)) == 0) {
  182.                                        
  183.                                         if (ownershipPeriod <= ONE_YEAR)
  184.                                                 shortTermProfit += boughtAmount * (soldPrice - boughtPrice);
  185.                                         else
  186.                                                 longTermProfit += boughtAmount * (soldPrice - boughtPrice);
  187.                                 }
  188.                                
  189.                         }
  190.                        
  191.                         double shortTermTax = (shortTermProfit <= 0)? 0 : shortTermProfit * SHORT_TERM_TAX_RATE;
  192.                         double longTermTax = (longTermProfit <= 0)? 0 : longTermProfit * LONG_TERM_TAX_RATE;
  193.                         double totalProfit = longTermProfit + shortTermProfit - shortTermTax - longTermTax;
  194.                        
  195.                         if (shortTermProfit >= 0.01D || longTermProfit >= 0.01D)
  196.                         context.write(new Text(key), new Text(           df.format(shortTermProfit) +
  197.                                                                                                   "\t" + df.format(shortTermTax) +
  198.                                                                                                   "\t" + df.format(longTermProfit) +
  199.                                                                                                   "\t" + df.format(longTermTax) +
  200.                                                                                                   "\t" + df.format(totalProfit)));
  201.                 }
  202.         }
  203. }
  204.  
  205. class Record implements Comparable<Record>{
  206.         //String address;
  207.         Integer timestamp;
  208.         //Long timeStamp;
  209.         Float amount;
  210.         Float price;
  211.        
  212.         public Record() {
  213.                
  214.         }
  215.        
  216.         public Record(Integer timestamp, Float amount, Float price) {
  217.                 this.timestamp = timestamp;
  218.                 this.amount = amount;
  219.                 this.price = price;
  220.         }
  221.        
  222.         public int compareTo(Record r) {
  223.                 if (this.timestamp < r.timestamp)
  224.                         return -1;
  225.                 if (this.timestamp > r.timestamp)
  226.                         return 1;
  227.                
  228.                 return 0;
  229.         }
  230.        
  231.         public String toString(){
  232.                 return timestamp + " " + amount + " " + price;
  233.         }
  234. }
RAW Paste Data
Pastebin PRO Summer Special!
Get 60% OFF on Pastebin PRO accounts!
Top