Guest User

Untitled

a guest
Nov 15th, 2014
188
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. import java.io.*;
  2. import java.text.DecimalFormat;
  3. import java.util.ArrayList;
  4. import java.util.Iterator;
  5. import java.util.PriorityQueue;
  6.  
  7. import org.apache.hadoop.conf.Configuration;
  8. import org.apache.hadoop.conf.Configured;
  9. import org.apache.hadoop.fs.Path;
  10. import org.apache.hadoop.io.DoubleWritable;
  11. import org.apache.hadoop.io.LongWritable;
  12. import org.apache.hadoop.io.NullWritable;
  13. import org.apache.hadoop.io.Text;
  14. import org.apache.hadoop.mapred.JobConf;
  15. import org.apache.hadoop.mapred.TextOutputFormat;
  16. import org.apache.hadoop.mapreduce.Job;
  17. import org.apache.hadoop.mapreduce.Mapper;
  18. import org.apache.hadoop.mapreduce.Reducer;
  19. import org.apache.hadoop.mapreduce.OutputFormat;
  20. import org.apache.hadoop.mapreduce.Reducer.Context;
  21. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  22. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  23. import org.apache.hadoop.util.GenericOptionsParser;
  24. import org.apache.hadoop.util.Tool;
  25. import org.apache.hadoop.util.ToolRunner;
  26.  
  27. public class BitcoinMapReduceTax {
  28.  
  29. public static final double CURRENT_PRICE = 429.91D;
  30. public static final int ONE_YEAR = (int)(60 * 60 * 24 * 365.25);
  31. public static final double LONG_TERM_TAX_RATE = 0.15D;
  32. public static final double SHORT_TERM_TAX_RATE = 0.35D;
  33. public static final DecimalFormat df = new DecimalFormat("$#,##0.00");
  34.    
  35.     public static void main(String[] args) throws Exception{
  36.        
  37.         JobConf conf = new JobConf();
  38.         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  39.        
  40.         if (otherArgs.length != 2) {
  41.             System.err.println("Usage: <in> <out>");
  42.             System.exit(2);
  43.         }
  44.        
  45.         Job job = new Job(conf, "BitcoinMapReduceTax");
  46.         job.setJarByClass(BitcoinMapReduceTax.class);
  47.         job.setMapperClass(Map.class);
  48.         job.setReducerClass(Reduce.class);
  49.         job.setOutputKeyClass(Text.class); //This corresponds to mapper's output key
  50.         job.setOutputValueClass(Text.class); //This corresponds to mapper's output value
  51.        
  52.         FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  53.         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  54.         job.setOutputFormatClass(org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.class);
  55.         job.waitForCompletion(true);
  56.         System.out.println("*******************Number of Lines skipped: " + job.getCounters().findCounter(Counter.LINESKIP).getValue() + "******************");
  57.         System.exit(job.isSuccessful() ? 0 : 1);
  58.     }
  59.    
  60.     //Record the number of illegal input lines
  61.     enum Counter {
  62.        
  63.         LINESKIP;
  64.     }
  65.    
  66.     public static class Map extends Mapper<Object, Text, Text, Text> {
  67.        
  68.         public void map(Object key, Text Value, Context context) throws IOException, InterruptedException{
  69.            
  70.             String line = Value.toString();
  71.        
  72.             try {
  73.                 String[] lineSplit = line.split(";|\\s|,");
  74.                
  75.                 String address = lineSplit[0];
  76.                 String timeStamp = lineSplit[1];
  77.                 String amount = lineSplit[3];
  78.                 String price = lineSplit[5];
  79.                 String total = "";
  80.                
  81.                 if (lineSplit.length == 6) {
  82.                     total = timeStamp + "@" + amount + "@" + price; //Pass 3 parameters to reducer, '@' is used to split
  83.                        
  84.                     context.write(new Text(address), new Text(total));
  85.                 } else {
  86.                    
  87.                     System.err.println("The number of columns in this line is wrong");
  88.                     throw new NumberFormatException();
  89.                 }
  90.                
  91.             } catch (ArrayIndexOutOfBoundsException e) {
  92.                
  93.                 System.err.println("This line's Column Number exceeded the limit.");
  94.                 context.getCounter(Counter.LINESKIP).increment(1);
  95.                 return;
  96.                
  97.             } catch (NumberFormatException e) {
  98.                
  99.                 System.err.println("This line's input format or number of columns is incorrect.");
  100.                 context.getCounter(Counter.LINESKIP).increment(1);
  101.                 return;
  102.             }
  103.         }
  104.     }
  105.    
  106.     public static class Reduce extends Reducer<Text, Text, Text, Text> {
  107.        
  108.         public void reduce(Text key, Iterable<Text> values, Context context) throws InterruptedException, IOException{
  109.            
  110.             double shortTermProfit = 0;
  111.             double longTermProfit = 0;
  112.            
  113.             PriorityQueue<Record> boughtRecord = new PriorityQueue<Record>();
  114.             PriorityQueue<Record> soldRecord = new PriorityQueue<Record>();
  115.            
  116.             try {
  117.                 for (Text value : values) {
  118.                    
  119.                     String[] amountAndPrice = value.toString().split("@");
  120.    
  121.                     int timestamp = Integer.parseInt(amountAndPrice[0]); //4 bytes
  122.                     float amount = Float.parseFloat(amountAndPrice[1]); //4 bytes
  123.                     float price = Float.parseFloat(amountAndPrice[2]); // 4 bytes
  124.                    
  125.                     if (amount > 0) {
  126.                         Record r = new Record(timestamp, amount, price);
  127.                         boughtRecord.add(r);
  128.                     }
  129.                     else if (amount < 0) {
  130.                         Record r = new Record(timestamp, amount, price);
  131.                         soldRecord.add(r);
  132.                     }
  133.                     else if(amount == 0)
  134.                         throw new NumberFormatException();
  135.  
  136.                 }
  137.                
  138.             } catch(NumberFormatException e) {
  139.                
  140.                 e.printStackTrace();
  141.                 context.getCounter(Counter.LINESKIP).increment(1);
  142.                 return;
  143.             }
  144.            
  145.             while(soldRecord.size() > 0 && boughtRecord.size() > 0) {
  146.                
  147.                 Record soldTransaction = soldRecord.remove();
  148.                 Record boughtTransaction = boughtRecord.remove();
  149.                            
  150.                 Float soldAmount = soldTransaction.amount;
  151.                 Float boughtAmount = boughtTransaction.amount;         
  152.                 Float soldPrice = soldTransaction.price;
  153.                 Float boughtPrice = boughtTransaction.price;
  154.                                
  155.                 long ownershipPeriod = soldTransaction.timestamp - boughtTransaction.timestamp;
  156.  
  157.                 if(boughtAmount.compareTo(Math.abs(soldAmount)) > 0) {
  158.                    
  159.                     if(ownershipPeriod <= ONE_YEAR)
  160.                         shortTermProfit += Math.abs(soldAmount) * (soldPrice - boughtPrice);
  161.                     else
  162.                         longTermProfit += Math.abs(soldAmount) * (soldPrice - boughtPrice);
  163.                    
  164.                     boughtAmount += soldAmount;
  165.                     Record r = new Record(boughtTransaction.timestamp,boughtAmount,boughtPrice);
  166.                     boughtRecord.add(r);
  167.                 }
  168.                
  169.                 else if(boughtAmount.compareTo(Math.abs(soldAmount)) < 0) {
  170.                    
  171.                     if(ownershipPeriod <= ONE_YEAR)
  172.                         shortTermProfit += boughtAmount * (soldPrice - boughtPrice);
  173.                     else
  174.                         longTermProfit += boughtAmount * (soldPrice - boughtPrice);
  175.                    
  176.                     soldAmount += boughtAmount;
  177.                     Record r = new Record(soldTransaction.timestamp,soldAmount,soldPrice);
  178.                     soldRecord.add(r);
  179.                 }
  180.                
  181.                 else if(boughtAmount.compareTo(Math.abs(soldAmount)) == 0) {
  182.                    
  183.                     if (ownershipPeriod <= ONE_YEAR)
  184.                         shortTermProfit += boughtAmount * (soldPrice - boughtPrice);
  185.                     else
  186.                         longTermProfit += boughtAmount * (soldPrice - boughtPrice);
  187.                 }
  188.                
  189.             }
  190.            
  191.             double shortTermTax = (shortTermProfit <= 0)? 0 : shortTermProfit * SHORT_TERM_TAX_RATE;
  192.             double longTermTax = (longTermProfit <= 0)? 0 : longTermProfit * LONG_TERM_TAX_RATE;
  193.             double totalProfit = longTermProfit + shortTermProfit - shortTermTax - longTermTax;
  194.            
  195.             if (shortTermProfit >= 0.01D || longTermProfit >= 0.01D)
  196.             context.write(new Text(key), new Text(       df.format(shortTermProfit) +
  197.                                                   "\t" + df.format(shortTermTax) +
  198.                                                   "\t" + df.format(longTermProfit) +
  199.                                                   "\t" + df.format(longTermTax) +
  200.                                                   "\t" + df.format(totalProfit)));
  201.         }
  202.     }
  203. }
  204.  
  205. class Record implements Comparable<Record>{
  206.     //String address;
  207.     Integer timestamp;
  208.     //Long timeStamp;
  209.     Float amount;
  210.     Float price;
  211.    
  212.     public Record() {
  213.        
  214.     }
  215.    
  216.     public Record(Integer timestamp, Float amount, Float price) {
  217.         this.timestamp = timestamp;
  218.         this.amount = amount;
  219.         this.price = price;
  220.     }
  221.    
  222.     public int compareTo(Record r) {
  223.         if (this.timestamp < r.timestamp)
  224.             return -1;
  225.         if (this.timestamp > r.timestamp)
  226.             return 1;
  227.        
  228.         return 0;
  229.     }
  230.    
  231.     public String toString(){
  232.         return timestamp + " " + amount + " " + price;
  233.     }
  234. }
RAW Paste Data

Adblocker detected! Please consider disabling it...

We've detected AdBlock Plus or some other adblocking software preventing Pastebin.com from fully loading.

We don't have any obnoxious sound, or popup ads, we actively block these annoying types of ads!

Please add Pastebin.com to your ad blocker whitelist or disable your adblocking software.

×