Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import numpy as np
- import pandas as pd
- class CART_R(object):
- # node_min_num 停止划分的最小节点样本数
- # tol 停止划分的最小损失值
- def __init__(self, node_min_num, tol):
- self.node_min_num = node_min_num
- self.tol = tol
- def fit(self, x_train, y_train):
- self.x_train = x_train
- self.y_train = y_train
- x_train = pd.DataFrame(x_train)
- self.Tree = self.__create_tree(x_train, y_train)
- def predict(self,x_test):
- x_test = np.array(x_test)
- y_pre = []
- for x in x_test:
- y = self.__find_y(self.Tree, x)
- y_pre.append(y)
- return y_pre
- def __find_y(self, tree, x):
- if isinstance(tree, dict):
- keys = list(tree.keys())
- j = keys[0][0]
- s = keys[0][1]
- if x[j] <= s:
- return self.__find_y(tree[(j, s, 'left')], x)
- else:
- return self.__find_y(tree[(j, s, 'right')], x)
- else:
- return tree
- def __create_tree(self, x_train, y_train):
- # 如果节点样本数小于最小叶节样本数,停止树的增长,返回节点对应值
- if self.__stop_split_node_num(x_train.shape[0], self.node_min_num):
- return y_train.mean()
- else:
- # 如果没有特征可以划分了,停止划分
- if self.__j_is_none(x_train):
- return y_train.mean()
- else:
- result = self.__find_js(x_train, y_train, self.tol)
- # 如果返回的结果是'stop_split',表示损失值小于阈值,停止划分
- if result == 'stop_split':
- return y_train.mean()
- else:
- best_j, best_s = result
- l_x, l_y, r_x, r_y = self.__split_data(x_train, y_train, best_j, best_s)
- Tree = {(best_j, best_s, 'left'): self.__create_tree(l_x, l_y),
- (best_j, best_s, 'right'): self.__create_tree(r_x, r_y)}
- return Tree
- # 当叶节点样本数量小于规定的最小值时,停止划分
- def __stop_split_node_num(self, node_num, min_num):
- if node_num <= min_num:
- return True
- else:
- return False
- # 当寻找j,s时,算出的损失值低于阈值,停止划分
- def __stop_split_tol(self, js_values, tol):
- if js_values <= tol:
- return True
- else:
- return False
- # 当没有特征可以用来划分时,停止划分
- def __j_is_none(self,x_train):
- for j in x_train.columns:
- if x_train[j].unique().size != 1:
- return False
- return True
- # 寻找j,s
- def __find_js(self, x_train, y_train, tol):
- min_loss = np.inf
- best_j = 0
- best_s = 0
- for j in x_train.columns:
- # 找到特征j的所有可能值,并进行排序
- s_sort = x_train[j].unique()
- s_sort.sort()
- # 如果j只有一个值了,跳过该循环
- if s_sort.size == 1:
- continue
- else:
- for s in s_sort:
- # 算出js对应损失,以及左右单元的对应值
- loss, y_left, y_right = self.__js_loss(x_train, y_train, j, s)
- # 如果算出的损失低于阈值,停止循环
- if self.__stop_split_tol(loss, tol):
- return 'stop_split'
- else:
- if loss < min_loss:
- min_loss = loss
- best_j = j
- best_s = s
- return best_j, best_s
- # 根据j,s算出损失
- def __js_loss(self, x_train, y_train, j, s):
- left = y_train[x_train[j] <= s]
- y_left = left.mean()
- loss_left = ((left - y_left)**2).sum()
- right = y_train[x_train[j] > s]
- y_right = right.mean()
- right_left = ((right - y_right) ** 2).sum()
- loss = loss_left+right_left
- return loss, y_left, y_right
- # 按best_j和best_s进行划分
- def __split_data(self, x_train, y_train, best_j, best_s):
- # 左划分
- left_split = x_train[best_j] <= best_s
- l_x, l_y = x_train[left_split], y_train[left_split]
- # 右划分
- right_split = -left_split
- r_x, r_y = x_train[right_split], y_train[right_split]
- return l_x, l_y, r_x, r_y
- if __name__ == '__main__':
- from sklearn.datasets import load_boston
- from sklearn.metrics import mean_squared_error
- X, y = load_boston(True)
- mytree = CART_R(5, 0.01)
- mytree.fit(X, y)
- y_pre = mytree.predict(X)
- mse = mean_squared_error(y, y_pre)
- print('mse为%f' % mse)
Add Comment
Please, Sign In to add comment