首页 > 其他 > 详细

机器学习 类条件随机标签噪声情况下的二分类问题研究 复现NIPS论文learning with noisy label(logistic & C-SVM)

时间:2020-11-27 22:23:50      阅读:47      评论:0      收藏:0      [点我收藏+]

论文百度网盘链接:https://pan.baidu.com/s/1XWj1YInZCuKldRpYnZTttg

密码:u4lb

UCI数据可以在论文中给的一个URL中找到,如找不到也可联系QQ1551904915获取。

对类随机标签噪声下的二分类问题可以通过构造修正损失函数以及设置正负偏置两种方案解决。

流程:构造二维线性可分数据->UCI高维数据测试->构造二分线性不可分数据->UCI数据测试

import numpy as np
import matplotlib.pyplot as plt
import scipy.io as scio
from sklearn.preprocessing import StandardScaler
import math
from libsvm.svmutil import *
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split


class LogisticRegression:
    def __init__(self, n_iter=200, eta=1e-3, tol=None,rho_positive = 0.2,rho_negative = 0.2):
        # 训练迭代次数
        self.n_iter = n_iter
        # 学习率
        self.eta = eta
        # 误差变化阈值
        self.tol = tol
        # 模型参数w(训练时初始化)
        self.w = None
        # 批量大小
        self.batch_size = 0.1
        self.rho_positive = rho_positive
        self.rho_negative = rho_negative

    def _z(self, X, w):
        return np.dot(X, w)

    def _sigmoid(self, z):
        return 1. / (1. + np.exp(-z))

    def _predict_proba(self, X, w):
        z = self._z(X, w)
        return self._sigmoid(z)

    # 标签噪声不存在时的梯度
    def _loss(self, y, y_proba):
        ret = 0
        for yi, ti in zip(y, y_proba):
            ret += np.log(1 + np.exp(- yi * ti))

        return ret + 1/2*np.dot(self.w,self.w)

    # 标签噪声不存在时的梯度
    def _gradient(self, X, y):
        ret = np.zeros_like(self.w)
        for xi,yi in zip(X,y):
            ti = np.dot(self.w.T,xi)
            ret += -yi*np.exp(-ti*yi)/(1+np.exp(-ti*yi)) * xi
        return ret + self.w

    # 存在标签噪声情况下的SGD
    def _gradient_descent_under_noise(self, w, X, y):

        # 若用户指定tol, 则启用早期停止法.
        if self.tol is not None:
            loss_old = np.inf

        self.loss_list = []
        self.w = w

        # 使用梯度下降至多迭代n_iter次, 更新w.
        for step_i in range(self.n_iter):
            # 预测所有点为1的概率
            y_proba = self._predict_proba(X, self.w)
            # 计算损失
            loss = self._loss_under_noise(y, y_proba)
            self.loss_list.append(loss)
            # print(‘%4i Loss: %s‘ % (step_i, loss))

            # 早期停止法
            if self.tol is not None:
                # 如果损失下降不足阈值, 则终止迭代.
                if loss_old - loss < self.tol:
                    break
                loss_old = loss

            index = np.arange(X.shape[0])
            np.random.shuffle(index)
            selected = index[:math.ceil(self.batch_size * X.shape[0])]
            # 计算梯度
            grad = self._gradient_under_noise(X[selected], y[selected])
            # 更新参数w
            self.w -= self.eta * grad

    def _gradient_descent(self, w, X, y):

        # 若用户指定tol, 则启用早期停止法.
        if self.tol is not None:
            loss_old = np.inf

        self.loss_list = []
        self.w = w

        # 使用梯度下降至多迭代n_iter次, 更新w.
        for step_i in range(self.n_iter):
            # 预测所有点为1的概率
            y_proba = self._predict_proba(X, self.w)
            # 计算损失
            loss = self._loss(y, y_proba)
            self.loss_list.append(loss)
            print(%4i Loss: %s % (step_i, loss))
            if self.tol is not None:
                # 如果损失下降不足阈值, 则终止迭代.
                if loss_old - loss < self.tol:
                    break
                loss_old = loss

            index = np.arange(X.shape[0])
            np.random.shuffle(index)
            selected = index[:math.ceil(self.batch_size * X.shape[0])]
            # 计算梯度
            grad = self._gradient(X[selected], y[selected])
            # 更新参数w
            self.w -= self.eta * grad
    # 数据预处理
    def _preprocess_data_X(self, X):
        m, n = X.shape
        X_ = np.empty((m, n + 1))
        X_[:, 0] = 1
        X_[:, 1:] = X

        return X_

    def train(self, X_train, y_train):
        ‘‘‘训练‘‘‘
        # 预处理X_train(添加x0=1)
        X_train = self._preprocess_data_X(X_train)
        _, n = X_train.shape
        self.w = np.random.random(n) * 0.05
        self._gradient_descent(self.w, X_train, y_train)


    def train_from_noise_data(self,X_train,y_train):
        X_train = self._preprocess_data_X(X_train)
        _,n = X_train.shape
        self.w = np.random.random(n) * 0.05
        self._gradient_descent_under_noise(self.w,X_train,y_train)

    #     计算在标签噪声下的损失
    def _loss_under_noise(self,y,y_proba):
        ret = 0
        for yi, ti in zip(y, y_proba):
            ret += (1-np.where(yi == 1,self.rho_negative,self.rho_positive)) * np.log(1 + np.exp(- yi * ti)) +                    np.where(yi == 1,self.rho_positive,self.rho_negative) * np.log(1 + np.exp(yi * ti))

        return (ret + 1 / 2 * np.dot(self.w, self.w)) / (1 - self.rho_negative - self.rho_positive)
        # return (ret) / (1 - rho_negative - rho_positive)
    # 在标签噪声存在情况下的梯度
    def _gradient_under_noise(self,X,y):
        ret = np.zeros_like(self.w)
        for xi, yi in zip(X, y):
            ti = np.dot(self.w.T, xi)
            ret += (1-np.where(yi == 1,self.rho_negative,self.rho_positive)) * (-yi * np.exp(-ti * yi) / (1 + np.exp(-ti * yi)) * xi)                + np.where(yi == 1,self.rho_positive,self.rho_negative) * (-yi * np.exp(ti * yi) / (1 + np.exp(ti * yi)) * xi)
        return (ret + self.w) / (1 - self.rho_negative - self.rho_positive)
        # return (ret) / (1 - rho_negative - rho_positive)

    def predict(self, X):

        X = self._preprocess_data_X(X)
        y_pred = self._predict_proba(X, self.w)
        return np.where(y_pred >= 0.5, 1, -1)

# 生成三角形区域的随机点集
def random_data(x1,y1,x2,y2,x3,y3):
    x1, y1 = x1, y1
    x3, y3 = x3, y3
    x2, y2 = x2, y2
    sample_size = 1000
    theta = np.arange(0, 1, 0.001)
    rnd1 = np.random.random(size=sample_size)
    rnd2 = np.random.random(size=sample_size)
    rnd2 = np.sqrt(rnd2)
    x = rnd2 * (rnd1 * x1 + (1 - rnd1) * x2) + (1 - rnd2) * x3
    y = rnd2 * (rnd1 * y1 + (1 - rnd1) * y2) + (1 - rnd2) * y3
    return x,y

def method_1_on_synthetic_data():
    x_positive, y_positive = random_data(-100, -100, -100, 100, 100, 100)
    x_negative, y_negative = random_data(-40, -100, 100, 20, 100, -100)
    x_data = np.concatenate([x_positive, x_negative])
    y_data = np.concatenate([y_positive, y_negative])
    label = np.concatenate([np.ones_like(x_positive), np.ones_like(x_negative) * (-1)])

    # view data from true distribution
    plt.scatter(x_data[label == 1], y_data[label == 1], c=‘‘, edgecolors=b, facecolor=none, marker=o, s=25,
                linewidths=1)
    plt.scatter(x_data[label == -1], y_data[label == -1], c=r, marker=+)
    plt.axis([-100, 100, -100, 100])
    plt.title(initial data)
    plt.show()

    print(= * 10 + 在正确数据下的logistic预测 + = * 10)
    # view regression result from data without noise
    reg = LogisticRegression(n_iter=100, eta=0.0001, tol=1e-5)


    # 划分训练集以及测试集
    X_train, X_test, y_train, y_test = train_test_split(np.column_stack([x_data, y_data]), label, test_size=0.5)
    reg.train(X_train, y_train)

    plt.plot(reg.loss_list)
    plt.title(train loss with SGD)
    plt.show()

    y_pred = reg.predict(X_test)
    accuracy_rate = accuracy_score(y_test, y_pred)
    print(测试集上的训练准确度:, accuracy_rate)
    theta = reg.w
    print(theta参数为:, theta)
    print(分割函数:0={0}+({1})*x1+({2})*x2.format(np.round(theta[0], 3), np.round(theta[1], 3), np.round(theta[2], 3)))

    rho_positive_list = [0.2,0.3,0.4]
    rho_negative_list = [0.2,0.1,0.4]
    print(= * 10 + 在类标签噪声条件下的logistic预测 + = * 10)
    for index in range(3):
        mask_1 = np.random.uniform(0, 1, size=1000) < rho_positive_list[index]
        mask_2 = np.random.uniform(0, 1, size=1000) < rho_negative_list[index]

        label_dirty = np.concatenate([np.where(mask_1, -1, 1), np.where(mask_2, 1, -1)])

        # view regression result from data with noise
        plt.scatter(x_data[label_dirty == 1], y_data[label_dirty == 1], c=‘‘, edgecolors=b, facecolor=none, marker=o,
                    s=25,
                    linewidths=1)
        plt.scatter(x_data[label_dirty == -1], y_data[label_dirty == -1], c=r, marker=+)
        plt.axis([-100, 100, -100, 100])
        plt.title(dirty data with rho_positive = {0} and rho_negative = {1}.format(rho_positive_list[index],rho_negative_list[index]))
        plt.show()


        reg = LogisticRegression(n_iter=1000, eta=0.00001, tol=1e-5, rho_positive=rho_positive_list[index], rho_negative=rho_negative_list[index])
        # 划分训练集以及测试集
        X_train = np.column_stack([x_data, y_data])
        reg.train_from_noise_data(X_train, label_dirty)

        y_pred = reg.predict(X_train)

        plt.scatter(x_data[y_pred == 1], y_data[y_pred == 1], c=‘‘, edgecolors=b, facecolor=none, marker=o, s=25,linewidths=1)
        plt.scatter(x_data[y_pred == -1], y_data[y_pred == -1], c=r, marker=+)
        plt.axis([-100, 100, -100, 100])
        plt.title(dirty data with rho_positive = {0} and rho_negative = {1}.format(rho_positive_list[index], rho_negative_list[index]))
        plt.show()

        y_pred_under_noise = reg.predict(X_train)
        accuracy_rate = accuracy_score(label, y_pred_under_noise)
        print(rho_positive:{0} rho_negative:{1} accuracy:{2}.format(rho_positive_list[index],rho_negative_list[index],accuracy_rate))
    print(=*20)

# 模拟模块
def train_mode(data,rho_negative,rho_positive,index):
    Xdata = data[x][0][0]
    label = data[t][0][0]
    sampleCapacity,_ = Xdata.shape
    dirty_label = np.zeros_like(label)
    # 构造噪声数据
    for i in range(sampleCapacity):
        if label[i] == 1:
            dirty_label[i] = np.where(np.random.uniform(0,1) < rho_positive,-1,1)
        else:
            dirty_label[i] = np.where(np.random.uniform(0,1) < rho_positive,1,-1)
    eta_list = [0.008,0.002,0.009]
    model = LogisticRegression(n_iter=1000, eta=eta_list[index], tol=1e-5, rho_positive=rho_positive, rho_negative=rho_negative)
    index = np.arange(sampleCapacity)
    np.random.shuffle(index)
    accuracy_rate = 0
    # 3折交叉验证
    for i in range(1, 4):
        # 构造索引集,将其中五分之一大小的一段作为测试集
        selected = np.array([False] * sampleCapacity)
        selected[int((i - 1) * sampleCapacity / 3):int(i * sampleCapacity / 3)] = True
        X_test = Xdata[index[selected]]
        X_train = Xdata[index[~selected]]
        # 对属性数据进行标准化,消除量纲以及数量级造成的差距
        ss = StandardScaler()
        ss.fit(X_train)
        X_train_std = ss.transform(X_train)
        X_test_std = ss.transform(X_test)
        y_train = dirty_label[index[~selected]]
        true_test_label = label[index[selected]]
        model.train_from_noise_data(X_train_std,y_train)
        pred = model.predict(X_test_std)
        accuracy_rate += accuracy_score(true_test_label,pred)

    print("rho_positive={0}  rho_negative={1} accuracy:{2}".format(rho_positive,rho_negative,round(accuracy_rate / 3,4)))

def train_mode_2(data,rho_negative,rho_positive,index):
    Xdata = data[x][0][0]
    label = data[t][0][0]
    label[label == -1] = 0
    sampleCapacity, _ = Xdata.shape
    dirty_label = np.zeros_like(label)
    # 构造噪声数据
    for i in range(sampleCapacity):
        if label[i] == 1:
            dirty_label[i] = np.where(np.random.uniform(0, 1) < rho_positive, 0, 1)
        else:
            dirty_label[i] = np.where(np.random.uniform(0, 1) < rho_positive, 1, 0)
    index = np.arange(sampleCapacity)
    np.random.shuffle(index)
    # 3折交叉验证
    for i in range(1, 4):
        # 构造索引集,将其中五分之一大小的一段作为测试集
        selected = np.array([False] * sampleCapacity)
        selected[int((i - 1) * sampleCapacity / 3):int(i * sampleCapacity / 3)] = True
        X_test = Xdata[index[selected]]
        X_train = Xdata[index[~selected]]
        # 对属性数据进行标准化,消除量纲以及数量级造成的差距
        ss = StandardScaler()
        ss.fit(X_train)
        X_train_std = ss.transform(X_train)
        X_test_std = ss.transform(X_test)
        y_train = dirty_label[index[~selected]]
        true_test_label = label[index[selected]]
        alpha = (1 - rho_positive + rho_negative) / 2
        model = svm_train(y_train.reshape(y_train.shape[0],),X_train_std,-s 0 -t 2 -c 0.01 -w0 {0} -w1 {1}.format(alpha,1 - alpha))
        svm_predict(true_test_label,X_test_std,model)


# logistic test on UCI benchmark
def method_1_on_UCI_benchmark():
    rho_positive_list = [0.2,0.3,0.4]
    rho_negative_list = [0.2,0.1,0.4]
    # 加载数据集
    benchmarks = scio.loadmat("benchmarks.mat")
    breastCancer = benchmarks[breast_cancer]
    diabetis = benchmarks[diabetis]
    thyroid = benchmarks[thyroid]
    german = benchmarks[german]
    heart = benchmarks[heart]
    image = benchmarks[image]

    print("test on breastCancer:")
    for i in range(3):
        train_mode(data=breastCancer,rho_negative=rho_negative_list[i],rho_positive=rho_positive_list[i],index = i)
    print("test on diabetis:")
    for i in range(3):
        train_mode(data=diabetis, rho_negative=rho_negative_list[i], rho_positive=rho_positive_list[i], index=i)
    print("test on thyroid:")
    for i in range(3):
        train_mode(data=thyroid, rho_negative=rho_negative_list[i], rho_positive=rho_positive_list[i], index=i)
    print("test on german:")
    for i in range(3):
        train_mode(data=german, rho_negative=rho_negative_list[i], rho_positive=rho_positive_list[i], index=i)
    print("test on heart:")
    for i in range(3):
        train_mode(data=heart, rho_negative=rho_negative_list[i], rho_positive=rho_positive_list[i], index=i)
    print("test on image:")
    for i in range(3):
        train_mode(data=image, rho_negative=rho_negative_list[i], rho_positive=rho_positive_list[i], index=i)
# C-SVM test on banana data
def method_2_on_banana_data():
    benchmarks = scio.loadmat("benchmarks.mat")
    banana = benchmarks[banana]
    y_data = banana[t][0][0].reshape(banana[t][0][0].shape[0], )
    y_data[y_data == -1] = 0
    x_data = banana[x][0][0]
    plt.scatter(x_data[y_data == 1, 0], x_data[y_data == 1, 1], c=‘‘, edgecolors=b, facecolor=none,
                marker=o, s=25,
                linewidths=1)
    plt.scatter(x_data[y_data == 0, 0], x_data[y_data == 0, 1], c=r, marker=+)
    plt.title(initial image)
    plt.plot()
    plt.show()

    rho_positive_list = [0.2, 0.3, 0.4]
    rho_negative_list = [0.2, 0.1, 0.4]
    print(= * 10 + 在类标签噪声条件下的C-SVM对香蕉数据的预测 + = * 10)
    for index in range(3):
        label_dirty = np.zeros_like(y_data)
        for i in range(label_dirty.shape[0]):
            if y_data[i] == 1:
                label_dirty[i] = np.where(np.random.uniform(0,1) < rho_positive_list[index],0,1)
            else:
                label_dirty[i] = np.where(np.random.uniform(0, 1) < rho_negative_list[index], 1, 0)

        # view regression result from data with noise
        plt.scatter(x_data[label_dirty == 1,0],
                    x_data[label_dirty == 1,1],
                    c=‘‘,
                    edgecolors=b,
                    facecolor=none,
                    marker=o,
                    s=25,
                    linewidths=1)
        plt.scatter(x_data[label_dirty == 0,0], x_data[label_dirty == 0,1], c=r, marker=+)
        plt.title(dirty data with rho_positive = {0} and rho_negative = {1}.format(rho_positive_list[index],
                                                                                     rho_negative_list[index]))
        plt.show()

        # 划分训练集以及测试集
        X_train = np.column_stack([x_data, y_data])
        alpha = (1 - rho_positive_list[index] + rho_negative_list[index]) / 2
        model = svm_train(label_dirty,X_train,-s 0 -t 2 -c 0.05 -w0 {0} -w1 {1}.format(alpha,1 - alpha))
        # 在真实数据上进心预测
        y_pred = svm_predict(y_data,X_train,model)[0]
        y_pred = np.array(y_pred).astype(int64)
        # 绘制出预测之后的图像
        plt.scatter(x_data[y_pred == 1,0], x_data[y_pred == 1,1], c=‘‘, edgecolors=b, facecolor=none, marker=o, s=25,
                    linewidths=1)
        plt.scatter(x_data[y_pred == 0,0], x_data[y_pred == 0,1], c=r, marker=+)
        plt.title(dirty data with rho_positive = {0} and rho_negative = {1}.format(rho_positive_list[index],
                                                                                     rho_negative_list[index]))
        plt.show()


    print(= * 20)

def method_2_on_UCI_data():
    rho_positive_list = [0.2, 0.3, 0.4]
    rho_negative_list = [0.2, 0.1, 0.4]
    benchmarks = scio.loadmat("benchmarks.mat")
    breastCancer = benchmarks[breast_cancer]
    diabetis = benchmarks[diabetis]
    thyroid = benchmarks[thyroid]
    german = benchmarks[german]
    heart = benchmarks[heart]
    image = benchmarks[image]

    print(=*20+"test on breastCancer:"+=*20)
    for i in range(3):
        train_mode_2(data=breastCancer, rho_negative=rho_negative_list[i], rho_positive=rho_positive_list[i], index=i)
    print(=*20+"test on diabetis:"+=*20)
    for i in range(3):
        train_mode_2(data=diabetis, rho_negative=rho_negative_list[i], rho_positive=rho_positive_list[i], index=i)
    print(=*20+"test on thyroid:"+=*20)
    for i in range(3):
        train_mode_2(data=thyroid, rho_negative=rho_negative_list[i], rho_positive=rho_positive_list[i], index=i)
    print(=*20+"test on german:"+=*20)
    for i in range(3):
        train_mode_2(data=german, rho_negative=rho_negative_list[i], rho_positive=rho_positive_list[i], index=i)
    print(=*20+"test on heart:"+=*20)
    for i in range(3):
        train_mode_2(data=heart, rho_negative=rho_negative_list[i], rho_positive=rho_positive_list[i], index=i)
    print(=*20+"test on image:"+=*20)
    for i in range(3):
        train_mode_2(data=image, rho_negative=rho_negative_list[i], rho_positive=rho_positive_list[i], index=i)

if __name__ == __main__:
    # 方法一在问题一的人造数据上进行测试
    method_1_on_synthetic_data()
    # 方法一在UCI数据上进行测试
    method_1_on_UCI_benchmark()
    # 方法二在香蕉数据上的可视化展示
    method_2_on_banana_data()
    # 方法二在UCI数据上进行测试
    method_2_on_UCI_data()

二分线性可分:

原数据:

技术分享图片

 

 

 噪声概率不同情况下的结果:

技术分享图片技术分享图片

 

 

 技术分享图片技术分享图片

技术分享图片技术分享图片

 

 

 

测试结果:

==========在正确数据下的logistic预测==========
   0 Loss: 655.0571765768963
   1 Loss: 523.6231639912271
   2 Loss: 508.3195448331075
   3 Loss: 506.8197572187573
   4 Loss: 506.8141896877468
   5 Loss: 506.68658996045286
   6 Loss: 507.0420107165859
测试集上的训练准确度: 0.993
theta参数为: [ 0.00242018 -0.16241579  0.15456798]
分割函数:0=0.002+(-0.162)*x1+(0.155)*x2
==========在类标签噪声条件下的logistic预测==========
rho_positive:0.2 rho_negative:0.2 accuracy:0.9955
rho_positive:0.3 rho_negative:0.1 accuracy:0.97
rho_positive:0.4 rho_negative:0.4 accuracy:1.0
====================
test on breastCancer:
rho_positive=0.2  rho_negative=0.2 accuracy:0.6314
rho_positive=0.3  rho_negative=0.1 accuracy:0.5319
rho_positive=0.4  rho_negative=0.4 accuracy:0.6239
test on diabetis:
rho_positive=0.2  rho_negative=0.2 accuracy:0.7474
rho_positive=0.3  rho_negative=0.1 accuracy:0.6654
rho_positive=0.4  rho_negative=0.4 accuracy:0.6107
test on thyroid:
rho_positive=0.2  rho_negative=0.2 accuracy:0.8184
rho_positive=0.3  rho_negative=0.1 accuracy:0.4797
rho_positive=0.4  rho_negative=0.4 accuracy:0.5445
test on german:
rho_positive=0.2  rho_negative=0.2 accuracy:0.696
rho_positive=0.3  rho_negative=0.1 accuracy:0.5609
rho_positive=0.4  rho_negative=0.4 accuracy:0.507
test on heart:
rho_positive=0.2  rho_negative=0.2 accuracy:0.8
rho_positive=0.3  rho_negative=0.1 accuracy:0.7667
rho_positive=0.4  rho_negative=0.4 accuracy:0.6704
test on image:
rho_positive=0.2  rho_negative=0.2 accuracy:0.7171
rho_positive=0.3  rho_negative=0.1 accuracy:0.6961
rho_positive=0.4  rho_negative=0.4 accuracy:0.6424

二维线性不可分:

原图:

技术分享图片

 

 

不同噪声情况下的分类结果:

技术分享图片技术分享图片

 

 

 

 技术分享图片技术分享图片

 

 技术分享图片技术分享图片

 

高维的C-SVM结果比较长,不展示了。

 

机器学习 类条件随机标签噪声情况下的二分类问题研究 复现NIPS论文learning with noisy label(logistic & C-SVM)

原文:https://www.cnblogs.com/randy-lo/p/14050456.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!