Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- def radixsort(values):
- radix = 2
- maxLength = False
- temp = -1
- placement = 1
- indices = np.array([i for i in range(len(values))])
- while not maxLength:
- maxLength = True
- buckets = [[] for i in range(radix)]
- for i in indices:
- temp = int(values[i] / placement)
- buckets[temp % radix] += [i]
- if maxLength and temp > 0:
- maxLength = False
- a = 0
- for b in range(radix):
- bucket = buckets[b]
- for i in bucket:
- indices[a] = i
- a += 1
- placement *= radix
- return indices
- class DecisionTree:
- def __init__(self, attrBagging=1, prune=False, overSample=False, splitFine=False):
- self.attrBagging = attrBagging
- self.prune = prune
- self.overSample = overSample
- self.splitFine = splitFine
- def train(self, data, labels):
- # balancing
- if self.overSample:
- ys, counts = np.unique(labels, return_counts=True)
- extraData = []
- extraLabels = []
- maxCounts = max(counts)
- for i in range(len(ys)):
- if counts[i] < maxCounts:
- added = 0
- index = 0
- while(counts[i] + added < maxCounts):
- if(labels[index] == ys[i]):
- extraData += [data[index]]
- extraLabels += [labels[index]]
- added += 1
- index = (index + 1) % len(labels)
- data = np.array(list(data) + extraData)
- labels = np.array(list(labels) + extraLabels)
- self.numSamples = data.shape[0]
- if self.prune:
- indices = [i for i in range(data.shape[0])]
- random.shuffle(indices)
- xtrain = data[indices[:int(.7 * data.shape[0])]]
- ytrain = labels[indices[:int(.7 * data.shape[0])]]
- xvalid = data[indices[int(.7 * data.shape[0]):]]
- yvalid = labels[indices[int(.7 * data.shape[0]):]]
- self.root = self.growTree(xtrain, ytrain, xvalid, yvalid, 0)
- else:
- self.root = self.growTree(data, labels, None, None, 0)
- def predict(self, data):
- return self.root.predict(data)
- def predictPath(self, sample, features):
- self.root.predictPath(sample, features)
- def growTree(self, xtrain, ytrain, xvalid, yvalid, depth):
- ys, yCounts = np.unique(ytrain, return_counts=True)
- if self.prune:
- tooDeep = depth > 2 * np.log2(self.numSamples)
- else:
- tooDeep = depth > np.log2(self.numSamples)
- notEnoughSamples = ytrain.shape[0] < self.numSamples / 100
- allSame = max(yCounts) == ytrain.shape[0]
- if tooDeep or notEnoughSamples or allSame:
- return Node(None, None, None, ys[np.argmax(yCounts)])
- rule = self.segmenter(xtrain, ytrain)
- if rule == None:
- return Node(None, None, None, ys[np.argmax(yCounts)])
- # print(rule)
- leftIndicesTrain = []
- rightIndicesTrain = []
- for i in range(xtrain.shape[0]):
- if xtrain[i, rule[0]] > rule[1]:
- rightIndicesTrain += [i]
- else:
- leftIndicesTrain += [i]
- leftIndicesValid = []
- rightIndicesValid = []
- if self.prune:
- for i in range(xvalid.shape[0]):
- if xvalid[i, rule[0]] > rule[1]:
- rightIndicesValid += [i]
- else:
- leftIndicesValid += [i]
- if self.prune:
- leftNode = self.growTree(xtrain[leftIndicesTrain], ytrain[leftIndicesTrain], xvalid[leftIndicesValid], yvalid[leftIndicesValid], depth + 1)
- rightNode = self.growTree(xtrain[rightIndicesTrain], ytrain[rightIndicesTrain], xvalid[rightIndicesValid], yvalid[rightIndicesValid], depth + 1)
- else:
- leftNode = self.growTree(xtrain[leftIndicesTrain], ytrain[leftIndicesTrain], None, None, depth + 1)
- rightNode = self.growTree(xtrain[rightIndicesTrain], ytrain[rightIndicesTrain], None, None, depth + 1)
- split = Node(rule, leftNode, rightNode, None)
- if not self.prune:
- return split
- noSplit = Node(None, None, None, ys[np.argmax(yCounts)])
- def errs(pred, actual):
- errs = 0
- for i in range(len(pred)):
- if pred[i] != actual[i]:
- errs += 1
- return errs
- # pruning
- if leftNode.label != None or rightNode.label != None:
- noSplitErr = errs(noSplit.predict(xvalid), yvalid)
- splitErr = errs(split.predict(xvalid), yvalid)
- if(splitErr >= noSplitErr):
- return noSplit
- return split
- def segmenter(self, data, labels):
- maxEntropyDec = 0
- rule = None
- # check split for some or all feature depending on attribute bagging
- features = [i for i in range(data.shape[1])]
- if self.attrBagging != 1:
- random.shuffle(features)
- features = features[:int(data.shape[1] * self.attrBagging)]
- for i in features:
- sortedIndices = radixsort(data[:, i])
- ys, rightCounts = np.unique(labels, return_counts=True)
- labelToIndex = {}
- for j in range(len(ys)):
- labelToIndex[ys[j]] = j
- rightCounts = np.append(rightCounts, [sum(rightCounts)]) + 0.0
- leftCounts = np.zeros(rightCounts.shape) + 0.0
- baseEntropy = self.entropy(rightCounts)
- curEntropy = baseEntropy
- lowestEntropy = baseEntropy
- split = None
- j = 0
- end = False
- if not self.splitFine:
- fVals = np.unique(data[:, i])
- numSplitLocs = min(7, len(fVals))
- splitLoc = 0
- while j < len(sortedIndices) - 1:
- changed = set()
- changes = np.zeros(rightCounts.shape) + 0.0
- if self.splitFine:
- fVal = data[sortedIndices[j], i]
- else:
- fVal = min(fVals) + ((max(fVals) - min(fVals)) * splitLoc / numSplitLocs)
- while data[sortedIndices[j], i] <= fVal:
- changed.add(labelToIndex[labels[sortedIndices[j]]])
- changes[labelToIndex[labels[sortedIndices[j]]]] += 1
- changes[-1] += 1
- j += 1
- if j >= len(sortedIndices):
- end = True
- break
- if end:
- break
- if not self.splitFine:
- splitLoc += 1
- # update entropy
- curEntropy *= (leftCounts[-1] + rightCounts[-1])
- changed = np.array(list(changed) + [len(ys)])
- curEntropy -= leftCounts[-1] * self.entropy(leftCounts[changed])
- curEntropy -= rightCounts[-1] * self.entropy(rightCounts[changed])
- leftCounts += changes
- rightCounts -= changes
- curEntropy += leftCounts[-1] * self.entropy(leftCounts[changed])
- curEntropy += rightCounts[-1] * self.entropy(rightCounts[changed])
- curEntropy /= (leftCounts[-1] + rightCounts[-1])
- if curEntropy < lowestEntropy:
- lowestEntropy = curEntropy
- split = (i, data[sortedIndices[j - 1], i])
- if baseEntropy - lowestEntropy > maxEntropyDec:
- maxEntropyDec = baseEntropy - lowestEntropy
- rule = split
- return rule
- def impurity(self, leftLabelHist, rightLabelHist):
- leftEntropy = leftLabelHist[-1] * self.entropy(leftLabelHist)
- rightEntropy = rightLabelHist[-1] * self.entropy(rightLabelHist)
- return (leftEntropy + rightEntropy) / (leftLabelHist[-1] + rightLabelHist[-1])
- def entropy(self, counts):
- if counts[-1] == 0:
- return 0
- p = counts[:-1] / counts[-1]
- return np.nan_to_num(-np.dot(p, np.log2(p)))
- def printNodes(self):
- self.root.print(0)
- class Node:
- def __init__(self, splitRule, left, right, label):
- self.splitRule = splitRule
- self.left = left
- self.right = right
- self.label = label
- def predict(self, X):
- pred = np.zeros(X.shape[0])
- for i in range(X.shape[0]):
- pred[i] = self.predictSingle(X[i])
- return pred
- def predictSingle(self, x):
- if self.label != None:
- return self.label
- if x[self.splitRule[0]] > self.splitRule[1]:
- return self.right.predictSingle(x)
- else:
- return self.left.predictSingle(x)
- def predictPath(self, x, features):
- if self.label != None:
- print("final label:", self.label)
- return
- if x[self.splitRule[0]] > self.splitRule[0]:
- print(features[self.splitRule[0]], ">", self.splitRule[1])
- self.right.predictPath(x, features)
- else:
- print(features[self.splitRule[0]], "<=", self.splitRule[1])
- self.left.predictPath(x, features)
- def print(self, depth):
- if self.splitRule == None:
- print("depth:", depth, "label:", self.label)
- else:
- print("depth:", depth, "rule:", self.splitRule)
- if self.left != None:
- self.left.print(depth + 1)
- if self.right != None:
- self.right.print(depth + 1)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement