因为自己平时写计算的代码比较多,很少写树结构的(决策树算法的实现零零整整花了近两周),所以数据结构和代码效率上待优化的地方应该还有很多,仅提供给大家借鉴。代码建议在文本编辑器上浏览。
自己网上搜到的代码很多都没有定义类,我自己对于这样的代码很是不喜欢,一点点看算法流程是看不下去的。因为是树,所以一定存在节点类和树类。
class Node():
def __init__(self, entropy=-1, label=None, divide_info=None):
self.entropy = entropy
self.label = label
self.divide_info = divide_info
self.left_child = None
self.right_child = None
self.except_error = None
def set_left(self, node):
self.left_child = node
def set_right(self, node):
self.right_child = node
class Tree():
def __init__(self):
self.root = Node()
def breadth_traversal(self):
queue = []
result = []
if self.root.entropy == -1:
return
else:
queue.append(self.root)
while queue:
next_queue = []
next_result = []
for node in queue:
next_result.append(node.label)
if node.left_child:
next_queue.append(node.left_child)
next_queue.append(node.right_child)
result.append(next_result)
queue = next_queue
for i in range(len(result)):
print("第", i, "层信息", result[i])
return
def query_layer(self, node):
if self.root.entropy != -1:
queue = [self.root]
layer = 0
while queue:
next_queue = []
for i in queue:
if i == node:
return layer
if i.left_child:
next_queue.append(i.left_child)
next_queue.append(i.right_child)
layer = layer +1
queue = next_queue
return
else:
return
class Decision_tree():
def __init__(self):
self.tree = Tree()
def fit(self, X, y, minimum_entropy=0, cur_node=None, maximum_layer = np.inf):
""" build decision tree according to data
param:
X:features, array_like
y:label, array_like
minimun_entropy:minimum entropy difference allowed
cur_node: current node
maximum_layer: maximumn layer allowed, count from 0
"""
X = pd.DataFrame(X)
y = pd.DataFrame(y)
minimum_entropy = minimum_entropy
maximum_layer = maximum_layer
current_node = cur_node
if cur_node:
cur_layer = self.tree.query_layer(cur_node)
if cur_layer >= maximum_layer:
return
label = y.iloc[:, 0].value_counts().index[0]
current_entropy = entropy(y)
divided_info, left_index, right_index, divide_column = find_variable(X, y, minimum_entropy)
if self.tree.root.entropy == -1:
if divided_info == None:
print("无需决策分类")
else:
self.tree.root = Node(entropy=current_entropy, label=label, divide_info=divided_info)
X_next = X[X.columns.difference([divide_column])]
X_left = X_next[left_index]
y_left = y[left_index]
X_right = X_next[right_index]
y_right = y[right_index]
self.fit(X_left, y_left, minimum_entropy,self.tree.root, maximum_layer)
self.fit(X_right, y_right, minimum_entropy, self.tree.root, maximum_layer)
else:
if divided_info == None:
if cur_node.left_child == None:
cur_node.left_child = Node(entropy=None, label=label)
return
else:
cur_node.right_child = Node(entropy=None, label=label)
return
else:
X_next = X[X.columns.difference([divide_column])]
X_left = X_next[left_index]
y_left = y[left_index]
X_right = X_next[right_index]
y_right = y[right_index]
if cur_node.left_child == None:
cur_node.set_left(Node(current_entropy, label, divided_info))
self.fit(X_left, y_left,minimum_entropy, cur_node.left_child, maximum_layer)
self.fit(X_right, y_right, minimum_entropy, cur_node.left_child, maximum_layer)
else:
cur_node.set_right(Node(current_entropy, label, divided_info))
self.fit(X_left, y_left, minimum_entropy, cur_node.right_child, maximum_layer)
self.fit(X_right, y_right, minimum_entropy, cur_node.right_child, maximum_layer)
def post_pruning(self, X_test, y_test):
self.add_error(X_test, y_test)
queue = [self.tree.root]
node_layer = [queue]
while queue:
next_queue = []
for node in queue:
if node.divide_info:
next_queue.append(node.left_child)
next_queue.append(node.right_child)
node_layer.append(next_queue)
queue = next_queue
for i in range(len(node_layer)-2, -1, -1):
current_layer = node_layer[i]
for node in current_layer:
if node.divide_info:
if node.except_error < node.left_child.except_error + node.right_child.except_error:
node.left_child = None
node.right_child = None
node.divide_info = None
def predict(self, X, y, node=None):
X = pd.DataFrame(X)
y = pd.DataFrame(y)
if node==None:
cur_node = self.tree.root
X["predict"] = y.copy()
if self.tree.root.divide_info:
column, value = self.tree.root.divide_info["column"], self.tree.root.divide_info["value"]
# 判定变量是分类还是连续
if len(X[column].unique()) <= 4:
left_index = (X[column] == value).values
else:
left_index = (X[column] <= value).values
right_index = ~left_index
X_left, y_left = X[left_index], y[left_index]
X_right, y_right = X[right_index], y[right_index]
X[left_index]["predict"] = cur_node.left_child.label
X[right_index]["predict"] = cur_node.right_child.label
X[left_index]["predict"] = self.predict(X_left, y_left, cur_node.left_child)
X[right_index]["predict"] = self.predict(X_right, y_right, cur_node.right_child)
else:
return
else:
cur_node = node
if cur_node.divide_info:
column, value = cur_node.divide_info["column"], cur_node.divide_info["value"]
if len(X[column].unique()) <= 4:
left_index = (X[column] == value).values
else:
left_index = (X[column] <= value).values
right_index = ~left_index
X_left, y_left = X[left_index], y[left_index]
X_right, y_right = X[right_index], y[right_index]
X[left_index]["predict"] = cur_node.left_child.label
X[right_index]["predict"] = cur_node.right_child.label
X[left_index]["predict"] = self.predict(X_left, y_left, cur_node.left_child)
X[right_index]["predict"] = self.predict(X_right, y_right, cur_node.right_child)
else:
return
return X["predict"]
def add_error(self, X_test, y_test, node=None):
X = pd.DataFrame(X_test)
y = pd.DataFrame(y_test)
N = len(X)
if node == None:
cur_node = self.tree.root
error = error_rate(y, cur_node.label)
cur_node.except_error = N * (error + 1.15 * np.sqrt(error * (1 - error) / N))
if self.tree.root.divide_info:
column, value = self.tree.root.divide_info["column"], self.tree.root.divide_info["value"]
# 判定变量是分类还是连续
if len(X[column].unique()) <= 4:
left_index = (X[column] == value).values
else:
left_index = (X[column] <= value).values
right_index = ~left_index
X_left, y_left = X[left_index], y[left_index]
X_right, y_right = X[right_index], y[right_index]
self.add_error(X_left, y_left, cur_node.left_child)
self.add_error(X_right, y_right, cur_node.right_child)
else:
return
else:
cur_node = node
error = error_rate(y, cur_node.label)
cur_node.except_error = error + 1.15 * np.sqrt(error * (1 - error) / N)
if cur_node.divide_info:
column, value = cur_node.divide_info["column"], cur_node.divide_info["value"]
if len(X[column].unique()) <= 4:
left_index = (X[column] == value).values
else:
left_index = (X[column] <= value).values
right_index = ~left_index
X_left, y_left = X[left_index], y[left_index]
X_right, y_right = X[right_index], y[right_index]
self.add_error(X_left, y_left, cur_node.left_child)
self.add_error(X_right, y_right, cur_node.right_child)
else:
return
def entropy(y):
category = y.apply(pd.value_counts) / len(y)
return (-category * np.log2(category)).sum()[0]
def find_variable(X, y, minimum_entropy=0):
""" find the variable that split the data set getting minmum entropy
param:
X:array_like
y:array_like
minimum_entropy:minimum difference allowed
return:
variable_name: str, split information
X_left_index: array_like, left dateset index
X_right_index:array_like, right dataset index
column:str or int depending on X's columns, split column
"""
X = pd.DataFrame(X)
y = pd.DataFrame(y)
minimum_entropy = minimum_entropy
assert len(y.iloc[:, 0].unique()) <= 2
label = y.iloc[:, 0].value_counts().index[0]
current_entropy = entropy(y)
if (current_entropy == 0) | (X.shape[1] == 0):
return (None, None, None, None)
min_divide_entropy = np.inf # 标记最小交叉熵
divide_variable = None # 标记分组信息
divide_column = None # 标记分组column名称
X_left_index = None # 左子树序列
X_right_index = None # 右子树序列
for column in X.columns:
category_index = X.loc[:, column].unique() # 当前属性的种类索引
category_number = len(category_index) # 种类数量
# 如果变量是0-1变量
if category_number == 2:
temp_left_index = (X[column] == category_index[0]).values
temp_right_index = ~temp_left_index
temp_y1 = y[temp_left_index]
temp_y2 = y[temp_right_index]
temp_entropy = len(temp_y1) / len(X) * entropy(temp_y1) + len(temp_y2) / len(X) * entropy(
temp_y2) # 计算当前变脸的熵
if temp_entropy < min_divide_entropy:
X_left_index = temp_left_index
X_right_index = temp_right_index
min_divide_entropy = temp_entropy
divide_variable = {"column":column, "value":category_index[0]}
divide_column = column
# 当是无序的分类变量时
elif (category_number >= 3) & (category_number <= 4):
best_divide_level = category_index[0]
best_divide_value_entropy = np.inf
for level in category_index:
set1_index = (X[column] == level).values
set2_index = ~set1_index
temp_y1 = y[set1_index]
temp_y2 = y[set2_index]
temp_entropy = len(temp_y1) / len(X) * entropy(temp_y1) + len(temp_y2) / len(X) * entropy(temp_y2)
if temp_entropy < best_divide_value_entropy:
best_divide_level = level
best_divide_value_entropy = temp_entropy
if best_divide_value_entropy < min_divide_entropy:
X_left_index = (X[column] == best_divide_level).values
X_right_index = ~X_left_index
min_divide_entropy = best_divide_value_entropy
divide_variable = {"column":column, "value":best_divide_level}
divide_column = column
# 连续变量
else:
all_possible_value = np.sort(X[column].unique())
best_divide_value = np.inf
best_divide_value_entropy = np.inf
for i in all_possible_value:
set1_index = (X[column] <= i).values
set2_index = ~set1_index
temp_y1 = y[set1_index]
temp_y2 = y[set2_index]
temp_entropy = len(temp_y1) / len(X) * entropy(temp_y1) + len(temp_y2) / len(X) * entropy(temp_y2)
if temp_entropy < best_divide_value_entropy:
best_divide_value_entropy = temp_entropy
best_divide_value = i
if best_divide_value_entropy < min_divide_entropy:
X_left_index = (X[column] <= best_divide_value).values
X_right_index = ~X_right_index
min_divide_entropy = best_divide_value_entropy
divide_variable = {"column":column, "value":best_divide_value}
divide_column = column
if (current_entropy - min_divide_entropy) < minimum_entropy:
return (None, None, None, None)
return (divide_variable, X_left_index, X_right_index, divide_column)
def error_rate(y_true, label):
y = pd.DataFrame(y_true)
return len(y[y != label])/len(y)
到这里,算法基本编完了,可以小测一下:
if __name__ == "__main__":
X = pd.DataFrame([["Rainy", "Saturday"],
["Sunny", "Saturday"],
["Windy", "Tuesday"],
["Sunny", "Saturday"],
["Sunny", "Monday"],
["Windy", "Saturday"]], columns=["Weather", "Dow"])
y = pd.DataFrame([["No"],
["Yes"],
["No"],
["Yes"],
["No"],
["No"]], columns=["Play"])
decision_tree = Decision_tree()
decision_tree.fit(X, y, 0.4, maximum_layer=2)
decision_tree.tree.breadth_traversal()
decision_tree.post_pruning(X, y)
decision_tree.tree.breadth_traversal()
result = decision_tree.predict(X, y)
print(result)
没想到吧,猛男的测试就是这么的温柔。因为实在是太懒了,所以请你们加大测试力度,如果有错误记得提醒我。
对于日后想从事数据分析甚至算法岗位的我来说,数据结构有必要补一补了,还有放在文件夹里的C++。
原文:https://www.cnblogs.com/DemonHunter/p/10820654.html