一、朴素贝叶斯算法的实现
naive_bayes_classifier.py
import numpy as np
import collections as cc # 集合的计数功能
from scipy.stats import norm # 极大似然估计样本的均值和标准方差
from data_bin_wrapper import DataBinsWrapperclass NaiveBayesClassifier:"""朴素贝叶斯分类器:对于连续属性两种方式操作,1是分箱处理,2是直接进行高斯分布的参数估计"""def __init__(self, is_binned=False, is_feature_all_R=False, feature_R_idx=None, max_bins=10):self.is_binned = is_binned # 连续特征变量数据是否进行分箱操作,离散化if is_binned:self.is_feature_all_R = is_feature_all_R # 是否所有特征变量都是连续数值,boolself.max_bins = max_bins # 最大分箱数self.dbw = DataBinsWrapper() # 分箱对象self.dbw_XrangeMap = dict() # 存储训练样本特征分箱的段点self.feature_R_idx = feature_R_idx # 混合式数据中连续特征变量的索引self.class_values, self.n_class = None, 0 # 类别取值以及类别数self.prior_prob = dict() # 先验分布,键是类别取值,键是类别取值self.classified_feature_prob = dict() # 存储每个类所对应的特征变量取值频次或者连续属性的高斯分布参数self.feature_values_num = dict() # 训练样本中每个特征不同的取值数,针对离散数据self.class_values_num = dict() # 目标集中每个类别的样本量,Dcdef _prior_probability(self, y_train):"""计算类别的先验概率:param y_train: 目标集:return:"""n_samples = len(y_train) # 总样本量self.class_values_num = cc.Counter(y_train) # Counter({'否': 9, '是': 8})# print(self.class_values_num)for key in self.class_values_num.keys():self.prior_prob[key] = (self.class_values_num[key] + 1) / (n_samples + self.n_class)# print(self.prior_prob)def _data_bin_wrapper(self, x_samples):"""针对特定的连续特征属性索引dbw_feature_idx,分别进行分箱,考虑测试样本与训练样本使用同一个XrangeMap:param x_samples: 样本:即可以是训练样本,也可以是测试样本:return:"""self.feature_R_idx = np.asarray(self.feature_R_idx)x_samples_prop = [] # 分箱之后的数据if not self.dbw_XrangeMap:# 为空,即创建决策树前所做的分箱操作for i in range(x_samples.shape[1]):if i in self.feature_R_idx: # 说明当前特征是连续数值self.dbw.fit(x_samples[:, i])self.dbw_XrangeMap[i] = self.dbw.XrangeMapx_samples_prop.append(self.dbw.transform(x_samples[:, i]))else:x_samples_prop.append(x_samples[:, i])else: # 针对测试样本的分箱操作for i in range(x_samples.shape[1]):if i in self.feature_R_idx: # 说明当前特征是连续数值x_samples_prop.append(self.dbw.transform(x_samples[:, i], self.dbw_XrangeMap[i]))else:x_samples_prop.append(x_samples[:, i])return np.asarray(x_samples_prop).Tdef fit(self, x_train, y_train):"""朴素贝叶斯分类器训练,可将朴素贝叶斯分类器涉及的所有概率估值事先计算好存储起来:param x_train: 训练集:param y_train: 目标集:return:"""x_train, y_train = np.asarray(x_train), np.asarray(y_train)self.class_values = np.unique(y_train) # 类别取值self.n_class = len(self.class_values) # 类别数if self.n_class < 2:print("仅有一个类别,不进行贝叶斯分类器估计...")exit(0)self._prior_probability(y_train) # 先验概率# 每个特征变量不同的取值数,类条件概率的分子D(x, xi)for i in range(x_train.shape[1]):self.feature_values_num[i] = len(np.unique(x_train[:, i]))if self.is_binned:self._binned_fit(x_train, y_train) # 分箱处理else:self._gaussian_fit(x_train, y_train) # 直接进行高斯分布估计def _binned_fit(self, x_train, y_train):"""对连续特征属性进行分箱操作,然后计算各概率值:param x_train::param y_train::return:"""if self.is_feature_all_R: # 全部是连续self.dbw.fit(x_train)x_train = self.dbw.transform(x_train)elif self.feature_R_idx is not None:x_train = self._data_bin_wrapper(x_train)for c in self.class_values:class_x = x_train[y_train == c] # 获取对应类别的样本feature_counter = dict() # 每个离散变量特征中特定值的出现的频次,连续特征变量存u、sigmafor i in range(x_train.shape[1]):feature_counter[i] = cc.Counter(class_x[:, i])self.classified_feature_prob[c] = feature_counterprint(self.classified_feature_prob)def _gaussian_fit(self, x_train, y_train):"""连续特征变量不进行分箱,直接进行高斯分布估计,离散特征变量取值除外:param x_train::param y_train::return:"""for c in self.class_values:class_x = x_train[y_train == c] # 获取对应类别的样本feature_counter = dict() # 每个离散变量特征中特定值的出现的频次,连续特征变量存u、sigmafor i in range(x_train.shape[1]):if self.feature_R_idx is not None and (i in self.feature_R_idx): # 连续特征# 极大似然估计均值和方差mu, sigma = norm.fit(np.asarray(class_x[:, i], dtype=np.float64))feature_counter[i] = {"mu": mu, "sigma": sigma}else: # 离散特征feature_counter[i] = cc.Counter(class_x[:, i])self.classified_feature_prob[c] = feature_counterprint(self.classified_feature_prob)def predict_proba(self, x_test):"""预测测试样本所属类别的概率:param x_test: 测试样本集:return:"""x_test = np.asarray(x_test)if self.is_binned:return self._binned_predict_proba(x_test)else:return self._gaussian_predict_proba(x_test)def _binned_predict_proba(self, x_test):"""连续特征变量进行分箱离散化,预测:param x_test: 测试样本集:return:"""if self.is_feature_all_R:x_test = self.dbw.transform(x_test)elif self.feature_R_idx is not None:x_test = self._data_bin_wrapper(x_test)y_test_hat = np.zeros((x_test.shape[0], self.n_class)) # 存储测试样本所属各个类别概率for i in range(x_test.shape[0]):test_sample = x_test[i, :] # 当前测试样本y_hat = [] # 当前测试样本所属各个类别的概率for c in self.class_values:prob_ln = np.log(self.prior_prob[c]) # 当前类别的先验概率,取对数# 当前类别下不同特征变量不同取值的频次,构成字典feature_frequency = self.classified_feature_prob[c]for j in range(x_test.shape[1]): # 针对每个特征变量value = test_sample[j] # 当前测试样本的当前特征取值cur_feature_freq = feature_frequency[j] # Counter({'浅白': 4, '青绿': 3, '乌黑': 2})# 按照拉普拉斯修正方法计算prob_ln += np.log((cur_feature_freq.get(value, 0) + 1) /(self.class_values_num[c] + self.feature_values_num[j]))y_hat.append(prob_ln) # 输入第c个类别的概率y_test_hat[i, :] = self.softmax_func(np.asarray(y_hat)) # 适合多分类,且归一化return y_test_hat@staticmethoddef softmax_func(x):"""softmax函数,为避免上溢或下溢,对参数x做限制:param x: 数组: 1 * n_classes:return:"""exps = np.exp(x - np.max(x)) # 避免溢出,每个数减去其最大值return exps / np.sum(exps)def _gaussian_predict_proba(self, x_test):"""连续特征变量不进行分箱,直接按高斯分布估计:param x_test: 测试样本集:return:"""y_test_hat = np.zeros((x_test.shape[0], self.n_class)) # 存储测试样本所属各个类别概率for i in range(x_test.shape[0]):test_sample = x_test[i, :] # 当前测试样本y_hat = [] # 当前测试样本所属各个类别的概率for c in self.class_values:prob_ln = np.log(self.prior_prob[c]) # 当前类别的先验概率,取对数# 当前类别下不同特征变量不同取值的频次,构成字典feature_frequency = self.classified_feature_prob[c]for j in range(x_test.shape[1]): # 针对每个特征变量value = test_sample[j] # 当前测试样本的当前特征取值if self.feature_R_idx is not None and (j in self.feature_R_idx): # 连续特征# 取极大似然估计的均值和方差# print(feature_frequency[j].values())mu, sigma = feature_frequency[j].values()prob_ln += np.log(norm.pdf(value, mu, sigma) + 1e-8)else:cur_feature_freq = feature_frequency[j] # Counter({'浅白': 4, '青绿': 3, '乌黑': 2})# 按照拉普拉斯修正方法计算prob_ln += np.log((cur_feature_freq.get(value, 0) + 1) /(self.class_values_num[c] + self.feature_values_num[j]))y_hat.append(prob_ln) # 输入第c个类别的概率y_test_hat[i, :] = self.softmax_func(np.asarray(y_hat)) # 适合多分类,且归一化return y_test_hatdef predict(self, x_test):"""预测测试样本所属类别:param x_test: 测试样本集:return:"""return np.argmax(self.predict_proba(x_test), axis=1)
二、可视化分类边界函数
plt_decision_function.py
import matplotlib.pyplot as plt
import numpy as npdef plot_decision_function(X, y, clf, is_show=True):"""可视化分类边界函数:param X: 测试样本:param y: 测试样本的类别:param clf: 分类模型:param is_show: 是否在当前显示图像,用于父函数绘制子图:return:"""if is_show:plt.figure(figsize=(7, 5))x_min, x_max = X[:, 0].min() - 1, X[:, 0].max() + 1y_min, y_max = X[:, 1].min() - 1, X[:, 1].max() + 1xi, yi = np.meshgrid(np.linspace(x_min, x_max, 100),np.linspace(y_min, y_max, 100))y_pred = clf.predict(np.c_[xi.ravel(), yi.ravel()]) # 模型预测值y_pred = y_pred.reshape(xi.shape)plt.contourf(xi, yi, y_pred, cmap="winter", alpha=0.4)plt.scatter(X[:, 0], X[:, 1], c=y, edgecolors="k")plt.xlabel("Feature 1", fontdict={"fontsize": 12})plt.ylabel("Feature 2", fontdict={"fontsize": 12})plt.title("NativeBayes Model Classification Boundary", fontdict={"fontsize": 14})if is_show:plt.show()
三、朴素贝叶斯算法的测试
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from naive_bayes_classifier import NaiveBayesClassifier
from sklearn.datasets import make_blobs
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from plt_decision_function import plot_decision_function# wm = pd.read_csv("watermelon.csv").dropna()
# X, y = np.asarray(wm.iloc[:, 1:-1]), np.asarray(wm.iloc[:, -1])
# # print(X)
# # print(y)
# nbc = NaiveBayesClassifier(is_binned=True, feature_R_idx=[6, 7], max_bins=10)
# nbc.fit(X, y)
# y_proba = nbc.predict_proba(X)
# print(y_proba)
# y_hat = nbc.predict(X)
# print(y_hat)X, y = make_blobs(n_samples=500, n_features=2, centers=4, cluster_std=0.85, random_state=0)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=0, stratify=y)nbc = NaiveBayesClassifier(is_binned=True, max_bins=20, is_feature_all_R=True)
nbc.fit(X_train, y_train)
y_pred = nbc.predict(X_test)
print(classification_report(y_test, y_pred))
plt.figure(figsize=(14, 5))
plt.subplot(121)
plot_decision_function(X_train, y_train, nbc, is_show=False)nbc = NaiveBayesClassifier(is_binned=False, feature_R_idx=[0, 1])
nbc.fit(X_train, y_train)
y_pred = nbc.predict(X_test)
print(classification_report(y_test, y_pred))
plt.subplot(122)
plot_decision_function(X_train, y_train, nbc, is_show=False)
plt.show()# al = pd.read_csv("mushroom/agaricus-lepiota.data").dropna()