• theano是python的一个工具库,兼有符号计算和数值计算之长,可以认为是混合了mathmatica和matlab。它的主要功能是作为实现各种机器学习算法的函数库。
  • 图像卷积就是使用一些权重矩阵对图像进行加权平均,这些权重矩阵称之为卷积核,或是卷积模板。
  • 卷积神经网络通常有四种不同类型的层构成。
    • 最后的输出层就是一个普通的logistic回归
    • 回归层前面的输入层就是MLP中常见的隐藏层,各神经元是全连接的
    • 最前面的是一个卷积层,通过一个随机初始化的卷积模板将原始输入矩阵进行特征映射
    • 这种卷积模板会有多个,n个卷积模板会将原始输入层变成n个特征映射,这n个特征映射在同一层中
    • 卷积层后面会跟一个池化层,就是平均化一下,减少神经元个数
    • 这种卷积-池化层会有多个,最后输出给到隐藏层
    • 一个很好的参考文档: http://ibillxia.github.io/blog/2013/04/06/Convolutional-Neural-Networks/
  • 下面的示例是kaggle上的手写识别,在参考文档基础上修改了文件读取和预测输出函数
In [2]:
from sklearn.cross_validation import train_test_split
from sklearn.metrics import classification_report
import numpy as np
import pandas as pd
In [3]:
data = pd.read_csv('train.csv')
In [4]:
# 找到X变量名
select_x = data.columns != 'label'
# 将X和Y分开
data_x = (data.ix[:, select_x]/255).values
data_y = data.label.values
In [6]:
import matplotlib.pylab as plt
%matplotlib inline
In [7]:
temp = data_x[1,:].reshape(28,28)
plt.imshow(temp);
In [8]:
import cPickle
import time
import numpy
import theano
import theano.tensor as T
Couldn't import dot_parser, loading of dot files will not be possible.
In [114]:
# 建立logistic回归类,作为最后一层输出

class LogisticRegression(object):
    # 初始化定义参数属性
    def __init__(self, input, n_in, n_out):
        # initialize with 0 the weights W as a matrix of shape (n_in, n_out)
        self.W = theano.shared(
            value=numpy.zeros(
                (n_in, n_out),
                dtype=theano.config.floatX
            ),
            name='W',
            borrow=True
        )
        # initialize the baises b as a vector of n_out 0s
        self.b = theano.shared(
            value=numpy.zeros(
                (n_out,),
                dtype=theano.config.floatX
            ),
            name='b',
            borrow=True
        )
        self.p_y_given_x = T.nnet.softmax(T.dot(input, self.W) + self.b)
        self.y_pred = T.argmax(self.p_y_given_x, axis=1)
        self.params = [self.W, self.b]

    
    def negative_log_likelihood(self, y):
        return -T.mean(T.log(self.p_y_given_x)[T.arange(y.shape[0]), y])
 
    def errors(self, y):
        # check if y has same dimension of y_pred
        if y.ndim != self.y_pred.ndim:
            raise TypeError(
                'y should have the same shape as self.y_pred',
                ('y', y.type, 'y_pred', self.y_pred.type)
            )
        # check if y is of the correct datatype
        if y.dtype.startswith('int'):
            # the T.neq operator returns a vector of 0s and 1s, where 1
            # represents a mistake in prediction
            return T.mean(T.neq(self.y_pred, y))
        else:
            raise NotImplementedError()
In [10]:
# 定义一个读入数据的函数,将数值数据转为theano的格式
def shared_dataset(data_x,data_y):
    shared_x = theano.shared(numpy.asarray(data_x, dtype=theano.config.floatX))
    shared_y = theano.shared(numpy.asarray(data_y, dtype=theano.config.floatX))
    return shared_x, T.cast(shared_y, 'int32')
In [11]:
# 定义隐藏层对象,即MLP中的全连接层
class HiddenLayer(object):
    def __init__(self, rng, input, n_in, n_out, W=None, b=None,
                 activation=T.tanh):
        self.input = input
        if W is None:
            W_values = numpy.asarray(
                rng.uniform(
                    low=-numpy.sqrt(6. / (n_in + n_out)),
                    high=numpy.sqrt(6. / (n_in + n_out)),
                    size=(n_in, n_out)
                ),
                dtype=theano.config.floatX
            )
            if activation == theano.tensor.nnet.sigmoid:
                W_values *= 4

            W = theano.shared(value=W_values, name='W', borrow=True)

        if b is None:
            b_values = numpy.zeros((n_out,), dtype=theano.config.floatX)
            b = theano.shared(value=b_values, name='b', borrow=True)

        self.W = W
        self.b = b

        lin_output = T.dot(input, self.W) + self.b
        self.output = (
            lin_output if activation is None
            else activation(lin_output)
        )
        # parameters of the model
        self.params = [self.W, self.b]
In [21]:
# 定义卷积-池化层对象
class LeNetConvPoolLayer(object):
    # 初始化定义输入层的维度和卷积核的维度,以及池化矩阵大小
    # image_shape是一个四维数组,分别是输入样本数, 特征映射个数, 长,宽
    # filter_shape也是一个四维数组,分别是卷积核个数,输入的特征映射个数,长,宽
    def __init__(self, rng, input, filter_shape, image_shape, poolsize=(2, 2)):
        assert image_shape[1] == filter_shape[1]
        self.input = input
        fan_in = numpy.prod(filter_shape[1:])
        fan_out = (filter_shape[0] * numpy.prod(filter_shape[2:]) /
                   numpy.prod(poolsize))
        # initialize weights with random weights
        W_bound = numpy.sqrt(6. / (fan_in + fan_out))
        self.W = theano.shared(
            numpy.asarray(
                rng.uniform(low=-W_bound, high=W_bound, size=filter_shape),
                dtype=theano.config.floatX
            ),
            borrow=True
        )
        b_values = numpy.zeros((filter_shape[0],), dtype=theano.config.floatX)
        self.b = theano.shared(value=b_values, borrow=True)

        # convolve input feature maps with filters
        conv_out = conv.conv2d(
            input=input,
            filters=self.W,
            filter_shape=filter_shape,
            image_shape=image_shape
        )

        # downsample each feature map individually, using maxpooling
        pooled_out = downsample.max_pool_2d(
            input=conv_out,
            ds=poolsize,
            ignore_border=True
        )
        self.output = T.tanh(pooled_out + self.b.dimshuffle('x', 0, 'x', 'x'))
        self.params = [self.W, self.b]
In [337]:
# 定义训练主函数
def evaluate_lenet5(learning_rate=0.1, n_epochs=100,
                    data_x=data_x,data_y=data_y,
                    nkerns=[20, 50], batch_size=500):
    # 随机划分为三组
    ind = numpy.random.choice([1,2,3],size=len(data_y),replace=True,p=[0.7,0.2,0.1])
    train_x = data_x[ind==1,:]
    train_y = data_y[ind==1]
    valida_x = data_x[ind==2,:]
    valida_y = data_y[ind==2]
    test_x = data_x[ind==3,:]
    test_y = data_y[ind==3]
    train_set_x, train_set_y = shared_dataset(train_x,train_y)
    valid_set_x, valid_set_y = shared_dataset(valida_x,valida_y)
    test_set_x, test_set_y = shared_dataset(test_x, test_y)

    # compute number of minibatches for training, validation and testing
    n_train_batches = train_set_x.get_value(borrow=True).shape[0] / batch_size
    n_valid_batches = valid_set_x.get_value(borrow=True).shape[0] / batch_size
    n_test_batches = test_set_x.get_value(borrow=True).shape[0] / batch_size

    # allocate symbolic variables for the data
    index = T.lscalar()  # index to a [mini]batch

    # start-snippet-1
    x = T.matrix('x')   # the data is presented as rasterized images
    y = T.ivector('y')  # the labels are presented as 1D vector of
                        # [int] labels

    ######################
    # BUILD ACTUAL MODEL #
    ######################
    print '... building the model'

    layer0_input = x.reshape((batch_size, 1, 28, 28))
    # 第0层,输入数据,进行卷积,将28*28的原始数据卷积后变成24*24数据
    # 有20个卷积核,因此形成20个特征映射(共20*24*24个神经元)
    # 再池化后变成12*12
    layer0 = LeNetConvPoolLayer(
        rng,
        input=layer0_input,
        image_shape=(batch_size, 1, 28, 28),
        filter_shape=(nkerns[0], 1, 5, 5),
        poolsize=(2, 2)
    )
    # 第1层,再进行卷积,将12*12的原始数据卷积后变成8*8数据
    # 形成50个特征映射,再池化后变成4*4数据
    layer1 = LeNetConvPoolLayer(
        rng,
        input=layer0.output,
        image_shape=(batch_size, nkerns[0], 12, 12),
        filter_shape=(nkerns[1], nkerns[0], 5, 5),
        poolsize=(2, 2)
    )

    # 将前面一层的数据平展开,输入到全连接的隐藏层,共有50*4*4=800个神经元
    # 隐藏层神经元个数设置为500个
    layer2_input = layer1.output.flatten(2)

    # construct a fully-connected sigmoidal layer
    layer2 = HiddenLayer(
        rng,
        input=layer2_input,
        n_in=nkerns[1] * 4 * 4,
        n_out=500,
        activation=T.tanh
    )

    # 最后输入到最终的logistic回归层
    # 输入500个神经元,输出10个
    layer3 = LogisticRegression(input=layer2.output, n_in=500, n_out=10)

    # 计算cost function
    cost = layer3.negative_log_likelihood(y)

    # create a function to compute the mistakes that are made by the model
    test_model = theano.function(
        [index],
        layer3.errors(y),
        givens={
            x: test_set_x[index * batch_size: (index + 1) * batch_size],
            y: test_set_y[index * batch_size: (index + 1) * batch_size]
        }
    )

    validate_model = theano.function(
        [index],
        layer3.errors(y),
        givens={
            x: valid_set_x[index * batch_size: (index + 1) * batch_size],
            y: valid_set_y[index * batch_size: (index + 1) * batch_size]
        }
    )

    # 保存四层神经元的所有参数
    params = layer3.params + layer2.params + layer1.params + layer0.params

    # 创建cost的梯度函数
    grads = T.grad(cost, params)

    # 使用随机梯度下降算法计算参数
    updates = [
        (param_i, param_i - learning_rate * grad_i)
        for param_i, grad_i in zip(params, grads)
    ]

    train_model = theano.function(
        [index],
        cost,
        updates=updates,
        givens={
            x: train_set_x[index * batch_size: (index + 1) * batch_size],
            y: train_set_y[index * batch_size: (index + 1) * batch_size]
        }
    )
    # end-snippet-1

    ###############
    # TRAIN MODEL #
    ###############
    print '... training'
    # early-stopping parameters
    patience = 10000  # look as this many examples regardless
    patience_increase = 2  # wait this much longer when a new best is
                           # found
    improvement_threshold = 0.995  # a relative improvement of this much is
                                   # considered significant
    validation_frequency = min(n_train_batches, patience / 2)
                                  # go through this many
                                  # minibatche before checking the network
                                  # on the validation set; in this case we
                                  # check every epoch

    best_validation_loss = numpy.inf
    best_iter = 0
    test_score = 0.
    start_time = time.clock()

    epoch = 0
    done_looping = False

    while (epoch < n_epochs) and (not done_looping):
        epoch = epoch + 1
        for minibatch_index in xrange(n_train_batches):

            iter = (epoch - 1) * n_train_batches + minibatch_index

            if iter % 100 == 0:
                print 'training @ iter = ', iter
            cost_ij = train_model(minibatch_index)

            if (iter + 1) % validation_frequency == 0:

                # compute zero-one loss on validation set
                validation_losses = [validate_model(i) for i
                                     in xrange(n_valid_batches)]
                this_validation_loss = numpy.mean(validation_losses)
                print('epoch %i, minibatch %i/%i, validation error %f %%' %
                      (epoch, minibatch_index + 1, n_train_batches,
                       this_validation_loss * 100.))

                # if we got the best validation score until now
                if this_validation_loss < best_validation_loss:

                    #improve patience if loss improvement is good enough
                    if this_validation_loss < best_validation_loss *  \
                       improvement_threshold:
                        patience = max(patience, iter * patience_increase)

                    # save best validation score and iteration number
                    best_validation_loss = this_validation_loss
                    best_iter = iter

                    # test it on the test set
                    test_losses = [
                        test_model(i)
                        for i in xrange(n_test_batches)
                    ]
                    test_score = numpy.mean(test_losses)
                    print(('     epoch %i, minibatch %i/%i, test error of '
                           'best model %f %%') %
                          (epoch, minibatch_index + 1, n_train_batches,
                           test_score * 100.))

            if patience <= iter:
                done_looping = True
                break

    end_time = time.clock()
    print('Optimization complete.')
    print('Best validation score of %f %% obtained at iteration %i, '
          'with test performance %f %%' %
          (best_validation_loss * 100., best_iter + 1, test_score * 100.))
    return params
In [371]:
# 开始训练
dig_para = evaluate_lenet5()
In [ ]:
# 建立预测函数
In [339]:
def prediction(data_x=data_x, dig_para=dig_para, batch_size = 500):
    shared_x = theano.shared(data_x)
    x = T.matrix('x')   # the data is presented as rasterized images
    y = T.ivector('y')  # the labels are presented as 1D vector of
                            # [int] labels
    layer0_input = x.reshape((batch_size, 1, 28, 28))       
    layer0 = LeNetConvPoolLayer(
            rng,
            input=layer0_input,
            image_shape=(batch_size, 1, 28, 28),
            filter_shape=(20, 1, 5, 5),
            poolsize=(2, 2)
    )
    layer0.W.set_value(dig_para[6].get_value())
    layer0.b.set_value(dig_para[7].get_value())
    layer1 = LeNetConvPoolLayer(
        rng,
        input=layer0.output,
        image_shape=(batch_size, 20, 12, 12),
        filter_shape=(50,20, 5, 5),
        poolsize=(2, 2)
    )
    layer1.W.set_value(dig_para[4].get_value())
    layer1.b.set_value(dig_para[5].get_value())
    layer2_input = layer1.output.flatten(2)

    layer2 = HiddenLayer(
            rng,
            input=layer2_input,
            n_in=50 * 4 * 4,
            n_out=500,
            activation=T.tanh
    )
    layer2.W.set_value(dig_para[2].get_value())
    layer2.b.set_value(dig_para[3].get_value())
    layer3 = LogisticRegression(input=layer2.output, n_in=500, n_out=10)
    layer3.W.set_value(dig_para[0].get_value())
    layer3.b.set_value(dig_para[1].get_value())
    pred_model = theano.function(
        [index],
        layer3.y_pred,
        givens={
            x: shared_x[index * batch_size: (index + 1) * batch_size]   
        }
    )
    n_batches=shared_x.get_value(borrow=True).shape[0] / batch_size
    result = numpy.array([])
    for i in xrange(n_batches):
        result = numpy.append(result,pred_model(i))
    return result
In [ ]:
# 预测test.csv数据,将输出写入csv文件,提交到kaggle大约排到50位
In [340]:
test_data = pd.read_csv('test.csv')
test_x = (test_data/255).values
In [344]:
result = prediction(test_x)
In [370]:
Label = pd.Series(result)
ImageId = pd.Series(range(len(Label))) + 1
sub = pd.concat ([ImageId, Label],1)
sub.columns = ['ImageId', 'Label']
sub['Label'] = sub.Label.astype('int')
sub.to_csv('sub.csv',index =False)