ID3 算法, 即 Iterative Dichotomiser 3(迭代二分器第三代)。算法通过迭代地选择具有最高信息增益的特征来分割数据集,从而递归地生成决策树模型,直至数据集被完美分类或无可分割的特征为止。
ID3 构建决策树的流程:
- 找到最优特征(信息增益值最高)。
- 最优特征,特征值作为有向边,判断子数据集
- 全部是同一类:输出叶子节点。
- 多类:
- 如果子数据集中还有特征,那么再次找寻最优特征。
- 如果子数据集中只剩下最后的分类结果了,那么输出最多的一类作为叶子节点。
- 直到叶子节点全部输出,或者无数据可遍历。
ID3 算法的特点:
- 只能处理离散型数据的学习计算。不能处理连续型数据。
- 容易产生过拟合现象(overfitting),泛化能力弱。
使用到的公式
- 信息熵(Entropy) H ( X ) = − ∑ i = 1 n p i log 2 p i H(X)=-\sum\limits_{i=1}^{n} p_i\log_{2}{p_i} H(X)=−i=1∑npilog2pi 其中, X X X表示样本, p i p_i pi表示概率,并且 ∑ i = 1 n p i = 1 \sum\limits_{i=1}^{n}p_i=1 i=1∑npi=1。例如投掷硬币,正面朝上的概率为1/2,反面朝上的概率为1/2,那么 H ( X ) = − ( 1 2 log 2 1 2 + 1 2 log 2 1 2 ) = 1 H(X)=-(\frac{1}{2}\log_2\frac{1}{2}+\frac{1}{2}\log_2 \frac{1}{2})=1 H(X)=−(21log221+21log221)=1。
- 条件熵(Condition Entropy) H ( Y ∣ X ) = ∑ p i H ( Y ∣ X = x i ) H(Y|X)=\sum p_iH(Y|X=x_i) H(Y∣X)=∑piH(Y∣X=xi) 表示在 X 发生的前提下,Y 的信息熵。
- 信息增益(Information Gain) G ( D ∣ A ) = H ( D ) − H ( D ∣ A ) G(D|A)=H(D)-H(D|A) G(D∣A)=H(D)−H(D∣A) 表示 A 发生时,的信息增益。
下面将按照如下步骤实现一个简单的 ID3 决策树:
- 准备训练数据。
- 从可选的特征中选出一个最佳特征作树的节点。
- 先计算数据中的初始熵。
- 然后计算每个特征下的信息熵。
- 计算各个特征的熵增,熵增最大的将作为树的节点。
- 根据选择出的特征节点,对数据进行分组,对于每个分组,计算其信息熵,选择出信息熵最小的分组值作为决策特征值,然后获取该特征值对应的
y
值。 - 将非最小熵值的其他分组作为树的子节点,重复 2 - 3 步骤,继续构建决策树。
实例代码如下:
import numpy as np
import pandas as pd
from pandas import DataFrame# 计算 D 的信息熵
def cal_h(p_arr):return sum(-p * np.log2(p) for p in p_arr)# 计算分组的信息熵
def cal_grouped_h(groups_p):return dict((k, cal_h(p_arr)) for k, p_arr in groups_p.items())def cal_c_h(p_arr_info):"""计算条件熵:param p_arr_info:输入的概率信息,字典类型,对应的值用列表表示,列表的第一个元素表示该条件在整体样本中的概率,从第二个元素开始,表示该条件下各个分量的概率:return: 条件熵的总和"""return np.sum([p_arr[0] * cal_h(p_arr[1:]) for p_arr in p_arr_info.values()])# 根据特征值进行分组,对每个分组中 y_label 的概率进行统计
def stat_y_label_percentage(df: DataFrame, feature, y_label, is_to_dict=False):condition_stat = df.groupby(feature)[y_label].value_counts(normalize=True).to_frame(name='percentage').reset_index()if is_to_dict:return condition_stat.groupby(feature)['percentage'].apply(list).to_dict()return condition_stat# 计算各个特征的条件信息熵
def cal_features_c_h(df, features, y_label):result = {}for feature in features:# 根据特征值分组,对每个分组中 y_label 的概率进行统计grouped_p_items = stat_y_label_percentage(df, feature, y_label,is_to_dict=True)print(f"'{feature}'的概率分布:{grouped_p_items}")# 计算各个分组占总体样本的概率grouped_p = df[feature].value_counts(normalize=True).to_dict()# 将分组概率插入到各组概率数组的第一项中[grouped_p_items[k].insert(0, p) for k, p in grouped_p.items()]# 计算特征下的条件熵c_h = cal_c_h(grouped_p_items)print(f"'{feature}'的条件熵为:{c_h}")result[feature] = c_hreturn result# 计算熵增
def cal_gain_entropy(pre_entropy, cur_entropy: dict):return dict((k, pre_entropy - v) for k, v in cur_entropy.items())# 熵增最大的特征,作为节点
def select_max_gain_entropy(entropy_gain_info):return max(entropy_gain_info.items(), key=lambda x: x[1])[0]# 定义决策树
class DecisionTree:y_label = Nonedef __init__(self, df, features, y_label):self.y_label_val = Noneself.decision_condition_val = Noneself.other_y_label_val = Noneself.df = dfself.features = featuresself.children = []self.y_label = y_labeldef select_decision_node(self):# 计算初始信息熵percentages = self.df[self.y_label].value_counts(normalize=True)h_D = cal_h(percentages)# 计算信息熵info_h = cal_features_c_h(self.df, self.features, self.y_label)# 计算熵增gain_entropy = cal_gain_entropy(h_D, info_h)print(gain_entropy)# 选择熵增最大的节点作为分割点select_node = select_max_gain_entropy(gain_entropy)print(select_node)# 构造树的节点self.name = select_node# 计算决策值,即获取决策值,和对应的训练值def cal_decision_val(self):grouped_p = stat_y_label_percentage(self.df, self.name, self.y_label,True)grouped_h = cal_grouped_h(grouped_p)# 熵值最小的值作为叶子的判断条件self.decision_condition_val = min(grouped_h.items(),key=lambda x: x[1])[0]# 决策条件下的概率数据,即 yes 和 no 的占比decision_condition_data = self.df[self.df[self.name] ==self.decision_condition_val]decision_condition_data_p = stat_y_label_percentage(decision_condition_data, self.name,self.y_label)# 概率大的作为预测值self.y_label_val = self.df.loc[decision_condition_data_p["percentage"].idxmax(),self.y_label]# 临时变量,当没有子节点时,预测树的另一个分支值self.temp_other_y_label_val = "no" if self.y_label_val == 'yes' else "yes"def create_children(self):# 树的分支,需要排除已经决策的点children_nodes = [val for val in self.df[self.name].unique() ifval != self.decision_condition_val]self.features.remove(self.name)if self.features:for node in children_nodes:child_df = self.df[self.df[self.name] == node].reset_index(drop=True)child = DecisionTree(child_df, self.features, self.y_label)child.fit()self.children.append(child)# 当特征值为空时,停止添加子树if not self.features:break# 当无特征时,直接决策else:self.other_y_label_val = self.temp_other_y_label_val# 训练决策树def fit(self):# 选择决策点self.select_decision_node()# 计算当前节点的决策self.cal_decision_val()# 递归构建树self.create_children()# 决策def decide(self, row):if row[self.name] == self.decision_condition_val:row[self.y_label] = self.y_label_valelse:if not self.children:row[self.y_label] = self.other_y_label_valelse:for child in self.children:child.decide(row)if __name__ == '__main__':df = pd.DataFrame({"outlook":["overcast", "overcast", "overcast", "overcast","rainy", "rainy", "rainy", "rainy", "rainy","sunny", "sunny", "sunny", "sunny", "sunny"],"temperature": ["hot", "cool", "mild", "hot", "mild","cool", "cool", "mild","mild", "hot", "hot", "mild", "cool","mild"],"humidity": ["high", "normal", "high", "normal", "high","normal", "normal", "normal","high", "high", "high", "high", "normal","normal"],"windy": ["FALSE", "TRUE", "TRUE", "FALSE", "FALSE", "FALSE","TRUE", "FALSE", "TRUE", "FALSE","TRUE", "FALSE", "FALSE", "TRUE"],"play": ["yes", "yes", "yes", "yes", "yes", "yes", "no","yes", "no", "no", "no", "no", "yes","yes"]})tree = DecisionTree(df, ["outlook", "temperature", "humidity", "windy"],"play")# 训练tree.fit()test_df = df = pd.DataFrame({"outlook": ["rainy", "sunny"],"temperature": ["hot", "cool"],"humidity": ["high", "high"],"windy": ["TRUE", "TRUE"],"play": ["", ""]})# 决策for _, row in df.iterrows():tree.decide(row)print(test_df)
上述代码是一个简单的 ID3 决策树实现,你可以根据实际情况,对测试数据进行适当的修改,然后使用决策树进行训练和预测。