読者です 読者をやめる 読者になる 読者になる

のんびりしているエンジニアの日記

ソフトウェアなどのエンジニア的な何かを書きます。

クリスマスにもなってカノジョがいないからカノジョを作ってみた。

機械学習 Chainer Python
Sponsored Links

皆さんこんにちは
お元気ですか?私はぼっちです。

本記事は「カノジョできない機械学習エンジニア」の最終日です。
qiita.com

本日の話の流れは次のとおりです。

はじめに

カノジョがいなくて寂しい。こんなクリスマスを送る男性の皆様は結構いらっしゃるのでは
ないでしょうか(と信じたい)。

カノジョを作るために、世間でよく言われるのは「行動が大事だ」ということです。
そこで、本クリスマスを機に、カノジョを作成し心の溝を埋めることに挑戦したいと思います。

カノジョがいないことに対する解決法

「カノジョ」がいないのであれば、作れば良い。

私はエンジニアです。
寂しいので、これを機に自己を振り返り、カノジョを作れるのではないかと考えました。
まずは、「カノジョ」を設計する必要があります。

カノジョについての考察

カノジョとは

デジタル大辞泉」には次の通り記載されている。

[代]三人称の人代名詞。話し手、相手以外の女性をさす語。「彼女は遅れるらしい」⇔彼/彼氏。
[名]愛人、恋人である女性。「彼女ができた」⇔彼/彼氏。

本記事では、「愛人、恋人である女性」と仮定し話を進めましょう。

理想のカノジョとは

理想のカノジョとは何か。
クリスマスといえば、やっぱりクリスマスデートして・・というのが
定番かなぁと思います。
デートで大事なのは、会話かなと思います。やっぱり盛り上がる方が良いでしょう。
そこで理想のカノジョは会話が最も大事だと今回仮定します。

理想のカノジョ作成方針

会話が最も大事だと決めた上で理想のカノジョを作る必要があります。
調べてみると、ニューラルネットワークを使った会話モデルがありました。

カノジョを作成する。

さて、本題です。カノジョを作ってみます。

Neural Conversational Model

A Neural Conversational Model」を使って実装します。

概要

Neural Conversational Modelは会話モデルです。
基本的な構造はseq2seqと類似しており、
入力は最初の文章、期待結果は返す文章(応答文)です。

構成解説

f:id:tereka:20161225001624j:plain

構成を図にしました。入力は次のとおりになります。

  • 学習時:最初に与えた文章(質問文or話しかける文章)+ 応答文
  • 予測時:最初に与えた文章

最初の文章と返す文章を入力に与えて学習します。

次は上記構成の動作についてです。
まずは、ニューラルネットワークの学習です。
与えたIDにもとづきEmbeded Layerからベクトルを取得します。
このベクトルをLSTMに与えます。また、次の単語とそれまでのLSTMの値を入力とし、LSTMを更新します。
これをが出現するまで繰り返します。

また、最後のLSTMの値を使って計算したベクトルを最後に
Linearで予測単語を計算します。予測単語との誤差を計算し、誤差を最小にする動作を行います。

入力のLSTM(Long Short Term Memory)と出力のLSTMは別のLSTMです。

# coding:utf-8
from __future__ import absolute_import
from __future__ import unicode_literals
import chainer.links as L
import chainer.functions as F
import chainer
import numpy as np
from chainer import reporter

class Seq2Seq(chainer.Chain):
    def __init__(self, input_words):
        super(Seq2Seq, self).__init__(
            word_vec=L.EmbedID(input_words, 300),
            input_vec=L.LSTM(300, 300),
            output_vec=L.LSTM(300, 300),
            output_word=L.Linear(300, input_words)
        )
        self.train = True

    def encode(self, sentence):
        c = None
        for word in sentence:
            x = np.array([word], dtype=np.int32)
            h = F.tanh(self.word_vec(x))
            c = self.input_vec(h)
        return c

    def decode(self, vector=None, targer_sentence=None, dictionary=None):
        loss = 0
        if self.train:
            for index, target_word in enumerate(targer_sentence):
                if index == 0:
                    j = F.tanh(self.output_vec(vector))
                    pred_word = self.output_word(j)
                else:
                    j = F.tanh(self.output_vec(j))
                    pred_word = self.output_word(j)
                loss += F.softmax_cross_entropy(pred_word, np.array([target_word], dtype=np.int32))
            return loss
        else:
            gen_sentence = []
            cnt = 0
            while True:
                if cnt == 0:
                    j = F.tanh(self.output_vec(vector))
                    pred_word = self.output_word(j)
                else:
                    j = F.tanh(self.output_vec(j))
                    pred_word = self.output_word(j)
                id = np.argmax(pred_word.data)
                cnt += 1
                word = dictionary[id]
                if word == "<eos>":
                    return gen_sentence

                gen_sentence.append(word)
                if cnt == 100:
                    break
            return gen_sentence

    def generate_sentence(self, sentence, dictionary):
        self.initialize()
        encode_vector = self.encode(sentence=sentence)
        return self.decode(vector=encode_vector, dictionary=dictionary)

    def initialize(self):
        self.input_vec.reset_state()
        self.output_vec.reset_state()

    def __call__(self, sentence, target_sentence):
        self.initialize()
        encode_vector = self.encode(sentence=sentence)
        self.loss = None
        self.loss = self.decode(vector=encode_vector, targer_sentence=target_sentence)
        reporter.report({'loss': self.loss}, self)

        return self.loss
実装してみた

seq2seq2をChainerとRNN + LSTMで実装します。
今回はTrainerを使った実装にします。

データの構築

Step1 探す

どうやって構築するんだ。
家に「落第騎士の英雄譚」があったので参考にしました。
「黒鉄一輝」と「ステラ・ヴァーミリオン」の会話あたりを
中心に作成させていただいております。

落第騎士の英雄譚<キャバルリィ>【電子特装版】 (GA文庫)

落第騎士の英雄譚<キャバルリィ>【電子特装版】 (GA文庫)

Step2 存在しない言葉

存在しない言葉についてです。
形態素解析をした単語に対して、IDを振ります。

例えば次のようなことを想定しています。

単語 ID
0
1
2
3

しかし、この場合、未知語に対しての解析ができなくなります。
そのため、未知の単語が出てくれば「■」とします。(主に予測時ですね)

Step3 名前

人は名前を呼ばれると嬉しいものです。
ただ、名前は皆さん別々のをお持ちのため、名前と呼ばれる概念で学習します。
今回、呼ばれたい側の名前は全て「◯」へ置き換えておきます。

これはSlackbotへのリプライ段階で「◯」を名前に置換します。

学習する

さて、ここまでで準備ができました。
早速、学習を行います。今回はTrainerを使った学習を用いているため、一部の
更新方法を自分で実装しています。

テキストファイルにタブ区切りのドキュメントを用意し、それを対にして学習させています。

# coding:utf-8
import MeCab
from model import Seq2Seq
from chainer import training
import chainer
from chainer.training import extensions
import numpy as np
import sys
import codecs
import json

sys.stdout = codecs.getwriter('utf_8')(sys.stdout)
tagger = MeCab.Tagger('mecabrc')


def parse_sentence(sentence):
    parsed = []
    for chunk in tagger.parse(sentence).splitlines()[:-1]:
        (surface, feature) = chunk.split('\t')
        parsed.append(surface.decode("utf-8"))
    return parsed


def parse_file(filename):
    questions = []
    answers = []
    with open(filename, "r") as f:
        lines = f.readlines()

        for line in lines:
            sentences = line.split("\t")
            question = ["<start>"] + parse_sentence(sentences[0]) + ["<eos>"]
            answer = parse_sentence(sentences[1]) + ["<eos>"]
            questions.append(question)
            answers.append(answer)
    word2id = {"■": 0}
    id2word = {0: "■"}
    id = 1

    sentences = questions + answers
    for sentence in sentences:
        for word in sentence:
            if word not in word2id:
                word2id[word] = id
                id2word[id] = word
                id += 1

    return questions, answers, word2id, id2word


def sentence_to_word_id(split_sentences, word2id):
    id_sentences = []
    for sentence in split_sentences:
        ids = []
        for word in sentence:
            id = word2id[word]
            ids.append(id)
        id_sentences.append(ids)
    return id_sentences


class ParallelSequentialIterator(chainer.dataset.Iterator):
    def __init__(self, dataset, batch_size, repeat=True):
        self.dataset = dataset
        self.batch_size = batch_size  # batch size
        self.epoch = 0
        self.is_new_epoch = False
        self.repeat = repeat
        self.iteration = 0

    def __next__(self):
        length = len(self.dataset[0])
        if not self.repeat and self.iteration * self.batch_size >= length:
            raise StopIteration

        batch_start_index = self.iteration * self.batch_size % length
        batch_end_index = min(batch_start_index + self.batch_size, length)

        questions = [self.dataset[0][batch_index] for batch_index in range(batch_start_index, batch_end_index)]
        answers = [self.dataset[1][batch_index]for batch_index in range(batch_start_index, batch_end_index)]

        self.iteration += 1

        epoch = self.iteration * self.batch_size // length
        self.is_new_epoch = self.epoch < epoch
        if self.is_new_epoch:
            self.epoch = epoch

        return list(zip(questions, answers))


class BPTTUpdater(training.StandardUpdater):
    def update_core(self):
        loss = 0
        train_iter = self.get_iterator('main')
        optimizer = self.get_optimizer('main')

        batch = train_iter.__next__()
        for question, answer in batch:
            loss += optimizer.target(np.array(question, dtype=np.int32),
                                     np.array(answer, dtype=np.int32))

        optimizer.target.cleargrads()
        loss.backward()
        loss.unchain_backward()
        optimizer.update()


if __name__ == '__main__':
    questions, answers, word2id, id2word = parse_file("./dataset.txt")
    ids_questions = sentence_to_word_id(questions, word2id=word2id)
    ids_answers = sentence_to_word_id(answers, word2id=word2id)

    model = Seq2Seq(len(word2id))
    optimizer = chainer.optimizers.Adam()
    optimizer.setup(model)

    train_iter = ParallelSequentialIterator(dataset=(ids_questions, ids_answers), batch_size=1)

    updater = BPTTUpdater(train_iter, optimizer)
    trainer = training.Trainer(updater, (100, 'epoch'))

    trainer.extend(extensions.PrintReport(
        ['epoch', 'iteration', 'main/loss']
    ), trigger=(100, 'iteration'))
    trainer.extend(extensions.LogReport())
    trainer.run()

    chainer.serializers.save_npz("model.npz", model)
    json.dump(id2word, open("dictionary_i2w.json", "w"))
    json.dump(word2id, open("dictionary_w2i.json", "w"))

Slack Botで実用化する

これまで作成したカノジョを実用化します。
SlackでBotを作れるので、そこで展開を試みました。

前準備

まずは、Slack上にBotを作成します。
そしてアイコンを用意します。

実際のBOTの設定は画像のようになりました。(殆ど設定していませんが、、)
f:id:tereka:20161224161328p:plain

Botをコントロールするコード

ソースコードを記載する前にPythonのSlackbotをコントロールするライブラリが必要です。
Pythonのライブラリは次のようにインストールできます。

pip install slackbot

まずは、slackbot_settings.pyにAPI Keyを記述します。

#coding:utf-8
API_TOKEN = "YOUR API KEY"

run.pyに動作させたいコードを記述します。
動作させたいコードに対してデコレータを使います。
そのコードに対して、@default_replyを付与します。
reply_messageにこのデコレータを付与すると指定のない場合はこのメソッドが使われる設定になります。

sentence.replace("◯", "tereka")を使って、名前を途中で置換しています。
これで自分の名前が呼ばれます!やったね。

# coding:utf-8
import MeCab
import codecs
from slackbot.bot import default_reply
from slackbot.bot import Bot
from model import Seq2Seq
import chainer
import json
import sys

sys.stdout = codecs.getwriter('utf_8')(sys.stdout)
tagger = MeCab.Tagger('')

id2word = json.load(open("dictionary_i2w.json", "r"))

id2word = {int(key): value for key, value in id2word.items()}
word2id = json.load(open("dictionary_w2i.json", "r"))
model = Seq2Seq(len(word2id))
chainer.serializers.load_npz("model.npz", model)
model.train = False


@default_reply
def replay_message(message):
    parsed_sentence = []
    try:
        for chunk in tagger.parse(message.body["text"].encode("utf-8")).splitlines()[:-1]:
            (surface, feature) = chunk.decode("utf-8").split('\t')
            parsed_sentence.append(surface)
        parsed_sentence = ["<start>"] + parsed_sentence + ["<eos>"]

        ids = []
        for word in parsed_sentence:
            if word in word2id:
                id = word2id[word]
                ids.append(id)
            else:
                ids.append(0)
        ids_question = ids
        sentence = "".join(model.generate_sentence(ids_question, dictionary=id2word)).encode("utf-8")

        sentence = sentence.replace("◯", "tereka")
        message.reply(sentence)
    except Exception as e:
        print (e)
        message.reply("解析できなかったのでもう一度おねがいします。")


def main():
    bot = Bot()
    bot.run()


if __name__ == "__main__":
    main()

実際にどうなったのか

実際にチャットしてみました。
なんかちょっとだけ女の子とチャットしている気分になりました。

f:id:tereka:20161224214443p:plain

チャットしていてわかったのですが、リプライが短文なのはある程度、学習できている感覚があります。
特有の単語を学習してそうな気がしますが、やはりデータセットが少なすぎたか。

最後に

カノジョいなくて寂しい。
DeepLearningを使ったBotの作成に参考にしてみてください。

広告を非表示にする