前提:文中使用的数据是本人下载下来以后自己处理过的,就是把文件中的所有分隔符都换成了空格。所以load_data方法只能加载我自己的数据,想要加载原生数据的话需要自己写load_data方法。
两个算法的关键地方都需要判断当前的w在某个样本点x上是否犯错,而对于犯错的判断有两个版本,第一个版本就是直接使用 wx*y <= 0 就表示犯错;第二个版本是用 sign(wx) != y 就表示犯错。这两个版本对于训练的结果基本没有影响,个人看出来的 唯一区别在于初始化w为0的时候,第一个版本对于任意的样本都是犯错的;第二个版本,根据林老师的sign版本,sign(0)是-1,所以仅仅是可能犯错误。我选择了第二个版本,因为个人认为第二个版本更加符合林老师的sign定义。
代码部分:
util.py内容
# -*- coding:utf-8 -*-
# Author: Evan Mi
import numpy as npdef load_data(file_name):x = []y = []with open(file_name, 'r+') as f:for line in f:line = line.rstrip("\n")temp = line.split(" ")temp.insert(0, '1')x_temp = [float(val) for val in temp[:-1]]y_tem = [int(val) for val in temp[-1:]]x.append(x_temp)y.append(y_tem)nx = np.array(x)ny = np.array(y)return nx, nydef sign(value):if value > 0:return 1else:return -1
pla.py内容(这里的main中只是作业要求的随机打乱的测试)
# -*- coding:utf-8 -*-
# Author: Evan Mi
import numpy as np
from pla_and_pocket_pla import utildef pla(nx, ny, rate=1):""":param nx: 属性矩阵,格式是[[...],[...],[...]]:param ny: 值,格式是 [[.],[.],[.]]:param rate: 学习率,默认是1:return: 迭代次数"""total_update_nums = 0total_train_example_nums = np.size(nx, 0)continue_right_nums = 0 # 连续不犯错的次数,当continue_right_nums==total_train_example_nums的时候,程序结束w = np.zeros((1, 5)) # 初始化参数为0loop_index = 0while True:this_x = nx[loop_index]result = util.sign(np.dot(this_x, w.T)[0])this_y = ny[loop_index, 0]if result == this_y:continue_right_nums += 1else:continue_right_nums = 0w = w + rate * (this_x * this_y)total_update_nums += 1loop_index = (loop_index + 1) % total_train_example_numsif continue_right_nums == total_train_example_nums:breakreturn total_update_numsif __name__ == '__main__':"""这里展示的是随机打乱样本,以0.5的学习率运行1000次的结果"""out_nx, out_ny = util.load_data("data/data.txt")avg = 0for i in range(1000):shuffle_index = np.arange(0, np.size(out_nx, 0))np.random.shuffle(shuffle_index)shuffled_x = out_nx[shuffle_index]shuffled_y = out_ny[shuffle_index]result_out = pla(shuffled_x, shuffled_y, 0.5)print("第%d次的更新次数为:%d" % ((i + 1), result_out))avg = avg + (1.0 / (i + 1)) * (result_out - avg)print("平均迭代次数为:%d" % avg)
pocket_pla.py内容(同样,这里的mian方法中也只有部分的题目要求的测试)
# -*- coding:utf-8 -*-
# Author: Evan Mi
import numpy as np
from pla_and_pocket_pla import utildef error_counter(x, y, w):result = np.where(x.dot(w[0].T) > 0, 1, -1)compare_result = np.where(result == y.T[0], 0, 1)return (1.0 * np.sum(compare_result)) / np.size(y, 0)def pocket_pla(nx, ny, rate=1, max_iter=50):""":param nx:属性矩阵,格式是[[...],[...],[...]]:param ny: 值,格式是 [[.],[.],[.]]:param rate: 学习率,默认是1:param max_iter: 最大迭代次数,默认50:return: w_pocket和w"""total_update_nums = 0total_train_example_nums = np.size(nx, 0)w_pocket = np.zeros((1, 5)) # w_pocket 就是一个口袋里的桃子,观察着w的变化,一旦比自己好,立马把w放进口袋里w = np.zeros((1, 5)) # 初始化参数为0while True:rand_index = np.random.randint(0, total_train_example_nums)this_x = nx[rand_index]result = util.sign(np.dot(this_x, w.T)[0])this_y = ny[rand_index, 0]if int(result) != int(this_y):w = w + rate * (this_x * this_y)total_update_nums += 1if error_counter(nx, ny, w) < error_counter(nx, ny, w_pocket):w_pocket = wif total_update_nums == max_iter:breakreturn w_pocket, wif __name__ == '__main__':x_train, y_train = util.load_data("data/train.txt")x_test, y_test = util.load_data("data/test.txt")avg_pocket = 0avg = 0for index in range(2000):w_out_pocket, w_out = pocket_pla(x_train, y_train, max_iter=100)error_out_pocket = error_counter(x_test, y_test, w_out_pocket)error_out = error_counter(x_test, y_test, w_out)avg_pocket = avg_pocket + (1.0 / (index + 1)) * (error_out_pocket - avg_pocket)avg = avg + (1.0 / (index + 1)) * (error_out - avg)print(avg_pocket)print(avg)
详细项目代码及代码使用的数据见:PLA和POCKET_PLA