Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import java.io.*;
- import java.text.DecimalFormat;
- import java.util.ArrayList;
- import java.util.Iterator;
- import java.util.PriorityQueue;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.DoubleWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapred.JobConf;
- import org.apache.hadoop.mapred.TextOutputFormat;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.OutputFormat;
- import org.apache.hadoop.mapreduce.Reducer.Context;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.util.GenericOptionsParser;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- public class BitcoinMapReduceTax {
- public static final double CURRENT_PRICE = 429.91D;
- public static final int ONE_YEAR = (int)(60 * 60 * 24 * 365.25);
- public static final double LONG_TERM_TAX_RATE = 0.15D;
- public static final double SHORT_TERM_TAX_RATE = 0.35D;
- public static final DecimalFormat df = new DecimalFormat("$#,##0.00");
- public static void main(String[] args) throws Exception{
- JobConf conf = new JobConf();
- String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
- if (otherArgs.length != 2) {
- System.err.println("Usage: <in> <out>");
- System.exit(2);
- }
- Job job = new Job(conf, "BitcoinMapReduceTax");
- job.setJarByClass(BitcoinMapReduceTax.class);
- job.setMapperClass(Map.class);
- job.setReducerClass(Reduce.class);
- job.setOutputKeyClass(Text.class); //This corresponds to mapper's output key
- job.setOutputValueClass(Text.class); //This corresponds to mapper's output value
- FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
- FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
- job.setOutputFormatClass(org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.class);
- job.waitForCompletion(true);
- System.out.println("*******************Number of Lines skipped: " + job.getCounters().findCounter(Counter.LINESKIP).getValue() + "******************");
- System.exit(job.isSuccessful() ? 0 : 1);
- }
- //Record the number of illegal input lines
- enum Counter {
- LINESKIP;
- }
- public static class Map extends Mapper<Object, Text, Text, Text> {
- public void map(Object key, Text Value, Context context) throws IOException, InterruptedException{
- String line = Value.toString();
- try {
- String[] lineSplit = line.split(";|\\s|,");
- String address = lineSplit[0];
- String timeStamp = lineSplit[1];
- String amount = lineSplit[3];
- String price = lineSplit[5];
- String total = "";
- if (lineSplit.length == 6) {
- total = timeStamp + "@" + amount + "@" + price; //Pass 3 parameters to reducer, '@' is used to split
- context.write(new Text(address), new Text(total));
- } else {
- System.err.println("The number of columns in this line is wrong");
- throw new NumberFormatException();
- }
- } catch (ArrayIndexOutOfBoundsException e) {
- System.err.println("This line's Column Number exceeded the limit.");
- context.getCounter(Counter.LINESKIP).increment(1);
- return;
- } catch (NumberFormatException e) {
- System.err.println("This line's input format or number of columns is incorrect.");
- context.getCounter(Counter.LINESKIP).increment(1);
- return;
- }
- }
- }
- public static class Reduce extends Reducer<Text, Text, Text, Text> {
- public void reduce(Text key, Iterable<Text> values, Context context) throws InterruptedException, IOException{
- double shortTermProfit = 0;
- double longTermProfit = 0;
- PriorityQueue<Record> boughtRecord = new PriorityQueue<Record>();
- PriorityQueue<Record> soldRecord = new PriorityQueue<Record>();
- try {
- for (Text value : values) {
- String[] amountAndPrice = value.toString().split("@");
- int timestamp = Integer.parseInt(amountAndPrice[0]); //4 bytes
- float amount = Float.parseFloat(amountAndPrice[1]); //4 bytes
- float price = Float.parseFloat(amountAndPrice[2]); // 4 bytes
- if (amount > 0) {
- Record r = new Record(timestamp, amount, price);
- boughtRecord.add(r);
- }
- else if (amount < 0) {
- Record r = new Record(timestamp, amount, price);
- soldRecord.add(r);
- }
- else if(amount == 0)
- throw new NumberFormatException();
- }
- } catch(NumberFormatException e) {
- e.printStackTrace();
- context.getCounter(Counter.LINESKIP).increment(1);
- return;
- }
- while(soldRecord.size() > 0 && boughtRecord.size() > 0) {
- Record soldTransaction = soldRecord.remove();
- Record boughtTransaction = boughtRecord.remove();
- Float soldAmount = soldTransaction.amount;
- Float boughtAmount = boughtTransaction.amount;
- Float soldPrice = soldTransaction.price;
- Float boughtPrice = boughtTransaction.price;
- long ownershipPeriod = soldTransaction.timestamp - boughtTransaction.timestamp;
- if(boughtAmount.compareTo(Math.abs(soldAmount)) > 0) {
- if(ownershipPeriod <= ONE_YEAR)
- shortTermProfit += Math.abs(soldAmount) * (soldPrice - boughtPrice);
- else
- longTermProfit += Math.abs(soldAmount) * (soldPrice - boughtPrice);
- boughtAmount += soldAmount;
- Record r = new Record(boughtTransaction.timestamp,boughtAmount,boughtPrice);
- boughtRecord.add(r);
- }
- else if(boughtAmount.compareTo(Math.abs(soldAmount)) < 0) {
- if(ownershipPeriod <= ONE_YEAR)
- shortTermProfit += boughtAmount * (soldPrice - boughtPrice);
- else
- longTermProfit += boughtAmount * (soldPrice - boughtPrice);
- soldAmount += boughtAmount;
- Record r = new Record(soldTransaction.timestamp,soldAmount,soldPrice);
- soldRecord.add(r);
- }
- else if(boughtAmount.compareTo(Math.abs(soldAmount)) == 0) {
- if (ownershipPeriod <= ONE_YEAR)
- shortTermProfit += boughtAmount * (soldPrice - boughtPrice);
- else
- longTermProfit += boughtAmount * (soldPrice - boughtPrice);
- }
- }
- double shortTermTax = (shortTermProfit <= 0)? 0 : shortTermProfit * SHORT_TERM_TAX_RATE;
- double longTermTax = (longTermProfit <= 0)? 0 : longTermProfit * LONG_TERM_TAX_RATE;
- double totalProfit = longTermProfit + shortTermProfit - shortTermTax - longTermTax;
- if (shortTermProfit >= 0.01D || longTermProfit >= 0.01D)
- context.write(new Text(key), new Text( df.format(shortTermProfit) +
- "\t" + df.format(shortTermTax) +
- "\t" + df.format(longTermProfit) +
- "\t" + df.format(longTermTax) +
- "\t" + df.format(totalProfit)));
- }
- }
- }
- class Record implements Comparable<Record>{
- //String address;
- Integer timestamp;
- //Long timeStamp;
- Float amount;
- Float price;
- public Record() {
- }
- public Record(Integer timestamp, Float amount, Float price) {
- this.timestamp = timestamp;
- this.amount = amount;
- this.price = price;
- }
- public int compareTo(Record r) {
- if (this.timestamp < r.timestamp)
- return -1;
- if (this.timestamp > r.timestamp)
- return 1;
- return 0;
- }
- public String toString(){
- return timestamp + " " + amount + " " + price;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement