のんびりしているエンジニアの日記

ソフトウェアなどのエンジニア的な何かを書きます。

Deep Learning Tutorials Denoising AutoEncoder編

Sponsored Links

皆さんこんにちは
お元気ですか。私は非常に眠いです。はよ寝ろよってことか?

さて、本日はDeepLearningTutorialのDenoising AutoEncoderの解説(?)もとい勉強メモを書きます。

AutoEncoder

端的に申しますと、AutoEncoderとは生データから自動で特徴量を抽出できる偉大なマシンです。
(次元削減を繰り返す)。
具体的な動作として「入力と学習データを同じにし、中間層(HiddenLayer)の重みを学習する」といったことを行っています。

画像で書くとこんな感じ

f:id:tereka:20140515023806p:plain

x = zであり、それらを入力、教師データとして学習します。

Denoising Auto Encoder

Denoising AutoEncoderは一部を欠損させたデータを入力として学習することによって
元にデータを戻す作業を行っている感じです。
入力にある程度様々なパターンを与えることによって、堅牢な特徴量を作成する感じでしょうか。

入力→隠れ層の式(encode)は以下の通り

{ \displaystyle
y = s(Wx+b)
}

隠れ層→出力層の式は以下の通り

{ \displaystyle
y = s(W^{T}x+b^{T})
}

損失関数は

{ \displaystyle
L_H(x,z) = -\sum_{k=1}^{d}x_k logz_k + (1 - x_k)log(1-z_k)
}

そこで、これらを微分し、勾配法を使って更新を行います。ここはTheanoのgradを使っています。

因みに欠損部分ですが、Tutorialでは以下のような感じで作っていますね。ただの2項分布です。0が出たら欠損します。それだけなんだ。

from theano.tensor.shared_randomstreams import RandomStreams

def get_corrupted_input(self, input, corruption_level):
      """
      いくつかの値を0にする為に実行する。(欠損データを生成する)
   corruption_level = 欠損率
      """
      return  self.theano_rng.binomial(size=input.shape, n=1, p=1 - corruption_level) * input

これらを盛り込んだクラスの構成はこんな感じ(Tutorialそのまま)
以下、殆どチュートリアルのままです。
因みにLogisticRegressionにあるload_dataとutils(util)にあるコードが必要なので、取得してください。
殆ど、チュートリアルのまま実装しています。(ええ、コメントを変えて圧縮してるだけです)。
因みにcorruption_levelを変動させると、画像の形が違うので、試してみるのもいいでしょう。

utilsはこちらへ(http://deeplearning.net/tutorial/code/utils.py
)LogisticRegressionは前回の記事(http://nonbiri-tereka.hatenablog.com/entry/2014/05/11/231555)へ
DenoisingAutoEncoder.py

#coding:utf-8

import numpy

import theano
import theano.tensor as T
from theano.tensor.shared_randomstreams import RandomStreams

class DenoisingAutoEncoder(object):
	"""docstring for ClassName"""
	def __init__(self, numpy_rng, theano_rng=None, input=None,
	             n_visible=784, n_hidden=500,
	             W=None, bhid=None, bvis=None):
	    """
            
	    """
	    self.n_visible = n_visible
	    self.n_hidden = n_hidden

	    # create a Theano random generator that gives symbolic random values
	    if not theano_rng:
	        theano_rng = RandomStreams(numpy_rng.randint(2 ** 30))

	    # note : W' was written as `W_prime` and b' as `b_prime`
	    if not W:
	        # 重みを生成する。
	        initial_W = numpy.asarray(numpy_rng.uniform(
	                  low=-4 * numpy.sqrt(6. / (n_hidden + n_visible)),
	                  high=4 * numpy.sqrt(6. / (n_hidden + n_visible)),
	                  size=(n_visible, n_hidden)), dtype=theano.config.floatX)
	        W = theano.shared(value=initial_W, name='W', borrow=True)

	    if not bvis:
	        bvis = theano.shared(value=numpy.zeros(n_visible,
	                                     dtype=theano.config.floatX),
	                             borrow=True)

	    if not bhid:
	        bhid = theano.shared(value=numpy.zeros(n_hidden,
	                                               dtype=theano.config.floatX),
	                             name='b',
	                             borrow=True)

	    self.W = W
	    # 転置
	    self.W_prime = self.W.T
	    # b corresponds to the bias of the hidden
	    self.b = bhid
	    # b_prime corresponds to the bias of the visible
	    self.b_prime = bvis
	    self.theano_rng = theano_rng
	    # if no input is given, generate a variable representing the input
	    if input == None:
	        # we use a matrix because we expect a minibatch of several
	        # examples, each example being a row
	        self.x = T.dmatrix(name='input')
	    else:
	        self.x = input

	    self.params = [self.W, self.b, self.b_prime]

        """欠損する箇所をランダムで決定する関数"""
	def get_corrupted_input(self,input,corruption_level):
		return  self.theano_rng.binomial(size=input.shape, n=1,
                                         p=1 - corruption_level,
                                         dtype=theano.config.floatX) * input

	"""隠れ層の出力"""
	def get_hidden_values(self,input):
		return T.nnet.sigmoid(T.dot(input, self.W) + self.b)

	"""出力層の出力"""
	def get_reconstructed_input(self,hidden):
		return  T.nnet.sigmoid(T.dot(hidden, self.W_prime) + self.b_prime)

	def get_cost_updates(self,corruption_level,learning_rate):
		tilde_x = self.get_corrupted_input(self.x,corruption_level)
		"""隠れ層と出力層の出力"""
		y = self.get_hidden_values(tilde_x)
		z = self.get_reconstructed_input(y)

		"""損失を計算"""
		L = -T.sum(self.x * T.log(z) + (1 - self.x) * T.log(1 - z), axis=1)
		cost = T.mean(L)

		gparams = T.grad(cost,self.params)
		updates = []
		for param,gparam in zip(self.params,gparams):
			updates.append((param,param-learning_rate * gparam))

		return (cost,updates)

ExecAutoEncoder.py
MNISTを実行するメソッド
pilがなければインストールしてください(sudo pip install pil)

#coding:utf-8

import os
import sys
import time

import numpy
import theano
import theano.tensor as T

from LogisticRegression import load_data
from utils import tile_raster_images

from theano.tensor.shared_randomstreams import RandomStreams
import DenoisingAutoEncoder
import PIL.Image

def test_dA(learning_rate=0.1, training_epochs=15,
            dataset=[],
            batch_size=20, output_folder='dA_plots'):

    #####################################
    #           欠損率 30%        #
    #####################################
    print len(dataset)
    dataset_x = theano.shared(numpy.asarray(dataset,dtype=theano.config.floatX),borrow=True)

    index = T.lscalar()    # index to a [mini]batch
    x = T.matrix('x')  # the data is presented as rasterized images

    rng = numpy.random.RandomState(123)
    theano_rng = RandomStreams(rng.randint(2 ** 30))

    """オートエンコーダーの初期化"""
    da = DenoisingAutoEncoder.DenoisingAutoEncoder(numpy_rng=rng, theano_rng=theano_rng, input=x,
            n_visible=28 * 28, n_hidden=500)
    n_train_batches = len(dataset) / batch_size
    """コストとアップデートを一括りに"""
    cost, updates = da.get_cost_updates(corruption_level=0.3,
                                        learning_rate=learning_rate)

    train_da = theano.function([index], cost, updates=updates,
         givens={x: dataset_x[index * batch_size:(index + 1) * batch_size]})

    start_time = time.clock()

    ##########################
    # トレーニングと時間計測 #
    ##########################
    print 'training start!!!'
    for epoch in xrange(training_epochs):
        c = []
        for batch_index in xrange(n_train_batches):
            c.append(train_da(batch_index))

        print 'Training epoch %d, cost ' % epoch, numpy.mean(c)

    end_time = time.clock()

    training_time = (end_time - start_time)

    print >> sys.stderr, ('The 30% corruption code for file ' +
                          os.path.split(__file__)[1] +
                          ' ran for %.2fm' % (training_time / 60.))
       """可視化メソッド"""
    image = PIL.Image.fromarray(tile_raster_images(
        X=da.W.get_value(borrow=True).T,
        img_shape=(28, 28), tile_shape=(10, 10),
        tile_spacing=(1, 1)))
    image.save('filters_corruption_30.png')