Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # Kyle Andrus
- # CS 4375 PS3
- # 10.17.2019
- from csv import reader
- import random
- import os
- import sys
- # Load a CSV file into list 'dataset'
- def load_csv(filename):
- dataset = list()
- with open(os.path.join(os.path.dirname(sys.argv[0]), 'heart_train.data')) as file:
- csv_reader = reader(file)
- for row in csv_reader:
- if not row:
- continue
- dataset.append(row)
- return dataset
- # Choose 3 attributes from data to split on randomly
- def choose_attributes(data):
- num_of_attributes = (len(data[0])) - 1 # -1 for class label
- chosen_attributes = [0,0,0]
- # Randomly pick 3 attributes to split on
- for i in range(3):
- chosen_attributes[i] = random.randrange(0, num_of_attributes)
- return chosen_attributes
- # Return the majority class label in a bucket
- def majority_vote(bucket):
- zeros = 0
- ones = 0
- for i in (range(len(bucket))):
- if (int)(bucket[i][0]) == 0:
- zeros = zeros + 1
- if (int)(bucket[i][0]) == 1:
- ones = ones + 1
- if(zeros >= ones):
- return 0
- elif(ones > zeros):
- return 1
- # Chose next split based on most uncertain branch
- def choose_next_split(left_bucket, right_bucket):
- print("to do (^:")
- def calculate_error(bucket):
- # First majority vote the bucket to determine it's overall class
- result = majority_vote(bucket)
- # Now, compare the result with each class label in the bucket
- error_sum = 0
- for i in range(1, len(bucket)):
- if (int)(bucket[i][0]) != result:
- error_sum = error_sum + 1
- true_error = (error_sum / len(bucket)) #normalized by # of elements in bucket
- error_results = list()
- error_results.append(result)
- error_results.append(true_error)
- return error_results
- def average_errors(errors):
- error_sum = 0 # sum of all error values to be avg'd
- error_num = 0 # number of buckets that have any error
- for i in range(len(errors)):
- if float(errors[i][1]) > 0:
- error_sum += float(errors[i][1])
- error_num += 1
- if error_num != 0:
- error_epsilon = error_sum / error_num
- else:
- error_num = 1
- error_epsilon = error_sum / error_num
- return error_epsilon
- def calculate_alpha(epsilon):
- alpha = .5 * ((1 - epsilon)/epsilon)
- return alpha
- def init_weights(data):
- for i in range(len(data)):
- data[i].append(1)
- def update_weights(alpha, data):
- for i in range(len(data)):
- data[i][23] = alpha
- def build_tree_with_one_split(data, attribute):
- left = list()
- right = list()
- for i in range(1, len(data)-1):
- if ((int)(data[i][attribute]) == 0):
- left.append(data[i])
- if ((int)(data[i][attribute]) == 1):
- right.append(data[i])
- print("------Nodes------")
- print("split on attribute {}".format(attribute))
- print("top left")
- print(len(left))
- print("top right")
- print(len(right))
- print("------------------")
- errors = list()
- left_error = calculate_error(left)
- errors.append(left_error)
- right_error = calculate_error(right)
- errors.append(right_error)
- print("Split: ")
- print(errors)
- epsilon = average_errors(errors)
- print("Epsilon = {}".format(epsilon))
- alpha = calculate_alpha(epsilon)
- print("Alpha = {}".format(alpha))
- # Construct a decision tree with three attribute splits, and
- # calculate the alpha weight and epsilon error for that tree
- def build_tree_with_three_splits(data, attributes):
- # Save the original data for later (;
- original_data = list(data)
- # First split
- top_left = list()
- top_right = list()
- for i in range(1, len(data)-1):
- if ((int)(data[i][attributes[0]]) == 0):
- top_left.append(data[i])
- if ((int)(data[i][attributes[0]]) == 1):
- top_right.append(data[i])
- # Figure out which branch to split on next based on uncertainty
- # For now, just split the branch with the most data
- if (len(top_left) >= len(top_right)):
- data = list(top_left)
- del top_left[:] # empty the list since it's contents will be split into 2 new buckets
- elif (len(top_left) < len(top_right)):
- data = list(top_right)
- del top_right[:]
- # Second split
- middle_left = list()
- middle_right = list()
- for i in range(1, len(data)-1):
- if ((int)(data[i][attributes[1]]) == 0):
- middle_left.append(data[i])
- if ((int)(data[i][attributes[1]]) == 1):
- middle_right.append(data[i])
- # Figure out which branch to split on next based on uncertainty
- # For now, just split the branch with the most data
- if (len(middle_left) >= len(middle_right)):
- data = list(middle_left)
- del middle_left[:] # empty the list since it's contents will be split into 2 new buckets
- elif (len(middle_left) < len(middle_right)):
- data = list(middle_right)
- del middle_right[:]
- # Third and final split
- bottom_left = list()
- bottom_right = list()
- for i in range(1, len(data)-1):
- if ((int)(data[i][attributes[2]]) == 0):
- bottom_left.append(data[i])
- if ((int)(data[i][attributes[2]]) == 1):
- bottom_right.append(data[i])
- print("------Nodes------")
- print("split on attribute {}".format(attributes[0]))
- print("top left")
- print(len(top_left))
- print("top right")
- print(len(top_right))
- print("split on attribute {}".format(attributes[1]))
- print("middle left")
- print(len(middle_left))
- print("middle right")
- print(len(middle_right))
- print("split on attribute {}".format(attributes[2]))
- print("bottom left")
- print(len(bottom_left))
- print("bottom right")
- print(len(bottom_right))
- print("------------------")
- # We now have the data seperated into six buckets,
- # next we calculate the error that our selected attributes achieved
- # calculate_error returns tuple of predicted label and actual error (% of bucket misclassified)
- # errors is a list of these tuples for all buckets containing points
- errors = list()
- if len(top_left) > 0:
- tl_error = calculate_error(top_left)
- errors.append(tl_error)
- if len(top_right) > 0:
- tr_error = calculate_error(top_right)
- errors.append(tr_error)
- if len(middle_left) > 0:
- ml_error = calculate_error(middle_left)
- errors.append(ml_error)
- if len(middle_right) > 0:
- mr_error = calculate_error(middle_right)
- errors.append(mr_error)
- if len(bottom_left) > 0:
- bl_error = calculate_error(bottom_left)
- errors.append(bl_error)
- if len(bottom_right) > 0:
- br_error = calculate_error(bottom_right)
- errors.append(br_error)
- print("Resulting 4 data buckets: (format [class label, bucket error]")
- print(errors)
- epsilon = average_errors(errors)
- print("Epsilon = {}".format(epsilon))
- alpha = calculate_alpha(epsilon)
- print("Alpha = {}".format(alpha))
- update_weights(alpha, data)
- def ada_boost(data, iterations):
- for i in range(0, iterations):
- init_weights(data)
- attributes = choose_attributes(data)
- print("\n\nIteration {}\n".format(i+1))
- build_tree_with_three_splits(data, attributes)
- def coordinate_descent(data, iterations):
- for i in range(1, iterations):
- print("\n\nIteration {}\n".format(i+1))
- build_tree_with_one_split(data, i)
- data = load_csv("heart_train.data")
- #adaboost(data, 5)
- coordinate_descent(data, 22)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement