算法流程参考了《统计学习与方法》
import numpy as np import pandas as pd from graphviz import Digraph class BaseTree(object): def __init__(self, feature, label, title=None): """ 树基本属性 :param feature: 特征列 :param label: 目标列 :param title: 标题列 """ self.root = {} self.feature = feature self.label = label self.title = title self.n_classes = tuple(set(label)) self.n_features = len(feature) self._build_tree_(self.root, self.label, self.feature, self.title) def _build_tree_(self, n, label, feature, title): """ 递归构建决策树 :param n: 对每一层来说该参数是树的当前节点 字典树 :param label: 每一层递归中目标列会由于上一层中做出的选择而不断减小 :param feature: 每一层递归中特征也会由于前一层中做出的选择而不断减小 对于一条路径来说 会减少特征列 :param title: 标题原理同 label :return: """ if feature is not None: gain_feature = self._solve_gain_(label, feature, tuple(set(label)), len(feature)) select_feature = np.argmax(gain_feature) selected_feature = feature[:, select_feature] states = set(selected_feature) n[title[select_feature]] = {} feature = np.delete(feature, select_feature, axis=1) title_ = np.delete(title, select_feature) for i, _ in enumerate(states): inx = np.where(selected_feature == _)[0] label_ = label[inx] if len(set(label_)) == 1: n[title[select_feature]][str(label_[0])] = {} continue feature_ = feature[inx] self._build_tree_(n[title[select_feature]], label_, feature_, title_) @staticmethod def _solve_entropy_(label, n_classes, n_features): """ 计算经验熵 :param label: :param n_classes: :param n_features: :return: """ sigma = 0. for _ in n_classes: p = len(np.where(label == _)[0]) / n_features sigma += (p * np.log2(p) if p != 0 else 0) return -sigma def _solve_conditional_entropy_(self, label, feature, n_classes): """ 计算条件熵 :param label: :param feature: :param n_classes: :return: """ group_states = [tuple(set(_)) for _ in feature.transpose()] n_features = len(feature) ans = [] for i, branch in enumerate(group_states): res = 0. for state in branch: inx = np.where(feature[:, i] == state)[0] n_f = len(inx) sigma = self._solve_entropy_(label[inx], n_classes, n_f) res += n_f / n_features * sigma ans.append(res) return np.array(ans) def _solve_gain_(self, label, feature, n_classes, n_features): """ 计算信息增益 :param label: :param feature: :param n_classes: :param n_features: :return: """ return self._solve_entropy_(label, n_classes, n_features) - self._solve_conditional_entropy_(label, feature, n_classes) def get_root(self): """ 获取决策树 :return: """ return self.root class DecisionTree(object): def __init__(self, title=None): self.tree = None self.title = title self.que = [] self.nums = [] self.count = 1 def fit(self, X, y): """ 拟合接口 :param X: 数据集的特征 :param y: 数据集的目标 :return: """ self.tree = BaseTree(X, y, self.title) def predict(self, X): pass def decision_tree_struct(self): """ 获取决策树 :return: """ return self.tree.get_root() def _travl_dict_(self, dot, node): """ 层序遍历字典树以生成 graph :param dot: :param node: :return: """ for _ in node.keys(): s = str(self.nums.pop(0) if self.nums else ‘0‘) self.que.append(node[_]) for i, k in enumerate(node[_].keys()): dot.edge(_ + ‘_‘ + s, k + ‘_‘ + str(self.count), label=‘Yes‘ if i & 1 else ‘No‘) self.nums.append(self.count) self.count += 1 if self.que: self._travl_dict_(dot, self.que.pop(0)) def generate_graph(self): """ 生成决策树图 :return: """ dot = Digraph(name=‘tree‘, node_attr={‘shape‘: ‘circle‘}, format=‘png‘) self._travl_dict_(dot, self.tree.get_root()) dot.render(‘decision.gv‘, view=True) data = pd.read_csv(‘data.csv‘, encoding=‘gbk‘) y = data[‘index‘].astype(np.int) train_x = data.drop([‘index‘, ‘name‘], axis=1) for _ in train_x.keys(): train_x[_] = train_x[_].map({‘是‘: 1, ‘否‘: 0}) title = list(train_x.keys()) dt = DecisionTree(title) X = train_x.values.reshape((-1, len(title))) dt.fit(X, y.values) d = dt.decision_tree_struct() print(d) dt.generate_graph()
因为没有设置 graphviz 中文字体,所以无法正常显示,使用 index 代替了。
测试数据:
graphviz 查看生成的决策树结构:
原文:https://www.cnblogs.com/darkchii/p/13207599.html