Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import numpy as np
- from collections import Counter
- # я честно не знаю, почему код в прошлой задаче работал, а в этой задаче не работает
- # поэтому использую маску, чтобы брать векторы, которые не константные
- def find_best_split(feature_vector, target_vector):
- sorted_inds = np.argsort(feature_vector)
- feature_sorted = np.array(feature_vector)[sorted_inds]
- mask = feature_sorted[1:] != feature_sorted[:-1]
- target_sorted = np.array(target_vector)[sorted_inds]
- thresholds = ((feature_sorted[1:] + feature_sorted[:-1]) / 2)[mask]
- sz = len(target_sorted)
- l_sz = np.arange(1, sz)
- l_1 = np.cumsum(target_sorted)
- p_l_1 = l_1[:-1] / l_sz
- p_l_0 = 1 - p_l_1
- h_l = 1 - p_l_0**2 - p_l_1**2
- r_sz = sz - l_sz
- r_1 = l_1[-1] - l_1[:-1]
- p_r_1 = r_1 / (sz - l_sz)
- p_r_0 = 1 - p_r_1
- h_r = 1 - p_r_0**2 - p_r_1**2
- ginis = (-(h_l) * l_sz / sz -(h_r) * r_sz / sz)[mask]
- best = np.argmax(ginis)
- threshold_best = thresholds[best]
- gini_best = ginis[best]
- return thresholds, ginis, threshold_best, gini_best
- class DecisionTree:
- def __init__(self, feature_types, max_depth=None, min_samples_split=None, min_samples_leaf=None):
- if np.any(list(map(lambda x: x != "real" and x != "categorical", feature_types))):
- raise ValueError("There is unknown feature type")
- self._tree = {}
- self._feature_types = feature_types
- self._max_depth = max_depth
- self._min_samples_split = min_samples_split
- self._min_samples_leaf = min_samples_leaf
- def _fit_node(self, sub_X, sub_y, node):
- if np.all(sub_y == sub_y[0]):
- node["type"] = "terminal"
- node["class"] = sub_y[0]
- return
- feature_best, threshold_best, gini_best, split = None, None, None, None
- for feature in range(sub_X.shape[1]):
- if len(np.unique(sub_X[:, feature])) == 1:
- continue
- feature_type = self._feature_types[feature]
- categories_map = {}
- if feature_type == "real":
- feature_vector = sub_X[:, feature]
- elif feature_type == "categorical":
- counts = Counter(sub_X[:, feature])
- clicks = Counter(sub_X[sub_y == 1, feature])
- ratio = {}
- for key, current_count in counts.items():
- if key in clicks:
- current_click = clicks[key]
- else:
- current_click = 0
- ratio[key] = current_click / current_count
- sorted_categories = list(map(lambda x: x[0], sorted(ratio.items(), key=lambda x: x[1])))
- categories_map = dict(zip(sorted_categories, list(range(len(sorted_categories)))))
- feature_vector = np.array(list(map(lambda x: categories_map[x], sub_X[:, feature])))
- else:
- raise ValueError
- _, _, threshold, gini = find_best_split(feature_vector, sub_y)
- if gini_best is None or gini > gini_best:
- feature_best = feature
- gini_best = gini
- split = feature_vector < threshold
- if feature_type == "real":
- threshold_best = threshold
- elif feature_type == "categorical":
- threshold_best = list(map(lambda x: x[0],
- filter(lambda x: x[1] < threshold, categories_map.items())))
- else:
- raise ValueError
- if feature_best is None:
- node["type"] = "terminal"
- node["class"] = Counter(sub_y).most_common(1)[0][0]
- return
- node["type"] = "nonterminal"
- node["feature_split"] = feature_best
- if self._feature_types[feature_best] == "real":
- node["threshold"] = threshold_best
- elif self._feature_types[feature_best] == "categorical":
- node["categories_split"] = threshold_best
- else:
- raise ValueError
- node["left_child"], node["right_child"] = {}, {}
- self._fit_node(sub_X[split], sub_y[split], node["left_child"])
- self._fit_node(sub_X[np.logical_not(split)], sub_y[np.logical_not(split)], node["right_child"])
- def _predict_node(self, x, node):
- if node["type"] == "terminal":
- return node["class"]
- feature = node["feature_split"]
- feature_type = self._feature_types[feature]
- if feature_type == "real":
- if x[feature] < node["threshold"]:
- return self._predict_node(x, node["left_child"])
- return self._predict_node(x, node["right_child"])
- elif feature_type == "categorical":
- if x[feature] in node["categories_split"]:
- return self._predict_node(x, node["left_child"])
- return self._predict_node(x, node["right_child"])
- def fit(self, X, y):
- self._fit_node(X, y, self._tree)
- def predict(self, X):
- predicted = []
- for x in X:
- predicted.append(self._predict_node(x, self._tree))
- return np.array(predicted)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement