In [1]:
import numpy as np
import os
os.chdir('../')
import matplotlib.pyplot as plt
%matplotlib inline

简介

上一讲我们实现了一个简单二元分类器:LogisticRegression,但通常情况下,我们面对的更多是多分类器的问题,而二分类转多分类的通常做法也很朴素,一般分为两种:one-vs-rest以及one-vs-one。顾名思义,one-vs-rest将多类别中的其中一类作为正类,剩余其他所有类别作为负类,对于n_class类别的分类问题,需要构建$n\_class$种分类器;而one-vs-one是指进行两两分类,这样将会构造$n\_class*(n\_class-1)/2$种分类器,由于实现思路很简单,就直接贴出代码,将多分类实现封装到MultiClassWrapper类,并放到ml_models.wrapper_models

In [2]:
from ml_models.linear_model import *
from ml_models.wrapper_models import *
In [3]:
#准备手写数据
from sklearn.metrics import f1_score
from sklearn import model_selection
from sklearn import datasets
digits = datasets.load_digits()
data = digits['data']
target = digits['target']
X_train, X_test, y_train, y_test = model_selection.train_test_split(data, target, test_size=0.3,
                                                                    random_state=0)
In [4]:
#构建初始模型
lr = LogisticRegression()
In [5]:
#进行one-vs-rest训练并评估
ovr = MultiClassWrapper(lr, mode='ovr')
ovr.fit(X_train, y_train)

y = ovr.predict(X_test)
print('ovr:', f1_score(y_test, y, average='macro'))
ovr: 0.9492701335705958
In [6]:
#进行one-vs-one训练并评估
ovo = MultiClassWrapper(lr, mode='ovo')
ovo.fit(X_train, y_train)

y = ovo.predict(X_test)
print('ovo:', f1_score(y_test, y, average='macro'))
ovo: 0.959902103714483

MultiClassWrapper类实现细节

In [7]:
import threading
import copy
import numpy as np

"""
继承Thread,获取函数的返回值
"""


class MyThread(threading.Thread):
    def __init__(self, target, args, kwargs, name=''):
        threading.Thread.__init__(self)
        self.name = name
        self.target = target
        self.args = args
        self.kwargs = kwargs
        self.result = self.target(*self.args, **self.kwargs)

    def get_result(self):
        try:
            return self.result
        except:
            return None


class MultiClassWrapper(object):
    def __init__(self, base_classifier, mode='ovr'):
        """
        :param base_classifier: 实例化后的分类器
        :param mode: 'ovr'表示one-vs-rest方式,'ovo'表示one-vs-one方式
        """
        self.base_classifier = base_classifier
        self.mode = mode

    @staticmethod
    def fit_base_classifier(base_classifier, x, y, **kwargs):
        base_classifier.fit(x, y, **kwargs)

    @staticmethod
    def predict_proba_base_classifier(base_classifier, x):
        return base_classifier.predict_proba(x)

    def fit(self, x, y, **kwargs):
        # 对y分组并行fit
        self.n_class = np.max(y)
        if self.mode == 'ovr':
            # 打包数据
            self.classifiers = []

            for cls in range(0, self.n_class + 1):
                self.classifiers.append(copy.deepcopy(self.base_classifier))
            # 并行训练
            tasks = []
            for cls in range(len(self.classifiers)):
                task = MyThread(target=self.fit_base_classifier,
                                args=(self.classifiers[cls], x, (y == cls).astype('int')), kwargs=kwargs)
                task.start()
                tasks.append(task)
            for task in tasks:
                task.join()
        elif self.mode == "ovo":
            # 打包数据
            self.classifiers = {}
            for first_cls in range(0, self.n_class):
                for second_cls in range(first_cls + 1, self.n_class + 1):
                    self.classifiers[(first_cls, second_cls)] = copy.deepcopy(self.base_classifier)
            # 并行训练
            tasks = {}
            for first_cls in range(0, self.n_class):
                for second_cls in range(first_cls + 1, self.n_class + 1):
                    index = np.where(y == first_cls)[0].tolist() + np.where(y == second_cls)[0].tolist()
                    new_x = x[index, :]
                    new_y = y[index]
                    task = MyThread(target=self.fit_base_classifier,
                                    args=(self.classifiers[(first_cls, second_cls)], new_x,
                                          (new_y == first_cls).astype('int')), kwargs=kwargs)
                    task.start()
                    tasks[(first_cls, second_cls)] = task
            for first_cls in range(0, self.n_class):
                for second_cls in range(first_cls + 1, self.n_class + 1):
                    tasks[(first_cls, second_cls)].join()

    def predict_proba(self, x, **kwargs):
        if self.mode == 'ovr':
            tasks = []
            probas = []
            for cls in range(len(self.classifiers)):
                task = MyThread(target=self.predict_proba_base_classifier, args=(self.classifiers[cls], x),
                                kwargs=kwargs)
                task.start()
                tasks.append(task)
            for task in tasks:
                task.join()
            for task in tasks:
                probas.append(task.get_result())
            total_probas = np.concatenate(probas, axis=1)
            # 归一化
            return total_probas / total_probas.sum(axis=1, keepdims=True)
        elif self.mode == 'ovo':
            tasks = {}
            probas = {}
            for first_cls in range(0, self.n_class):
                for second_cls in range(first_cls + 1, self.n_class + 1):
                    task = MyThread(target=self.predict_proba_base_classifier,
                                    args=(self.classifiers[(first_cls, second_cls)], x), kwargs=kwargs)
                    task.start()
                    tasks[(first_cls, second_cls)] = task
            for first_cls in range(0, self.n_class):
                for second_cls in range(first_cls + 1, self.n_class + 1):
                    tasks[(first_cls, second_cls)].join()
            for first_cls in range(0, self.n_class):
                for second_cls in range(first_cls + 1, self.n_class + 1):
                    probas[(first_cls, second_cls)] = tasks[(first_cls, second_cls)].get_result()
                    probas[(second_cls, first_cls)] = 1.0 - probas[(first_cls, second_cls)]
            # 统计概率
            total_probas = []
            for first_cls in range(0, self.n_class + 1):
                temp = []
                for second_cls in range(0, self.n_class + 1):
                    if first_cls != second_cls:
                        temp.append(probas[(first_cls, second_cls)])
                temp = np.concatenate(temp, axis=1).sum(axis=1, keepdims=True)
                total_probas.append(temp)
            # 归一化
            total_probas = np.concatenate(total_probas, axis=1)
            return total_probas / total_probas.sum(axis=1, keepdims=True)

    def predict(self, x):
        return np.argmax(self.predict_proba(x), axis=1)
In [ ]: