のんびりしているエンジニアの日記

ソフトウェアなどのエンジニア的な何かを書きます。

MLFlow Trackingを使って、実験管理を効率化する

Sponsored Links

皆さんこんにちは
お元気でしょうか。COVIT-19起因で引きこもっているため、少しずつ自炊スキルが伸びていっています。

以前、実験管理に関していくつかのソフトウェアを紹介しました。
その中で、MLFlow Trackingが一番良さそうではあったのでパイプラインに取り込むことを考えています。
もう少し深ぼって利用方法を把握する必要があったので、メモ代わりに残しています。

nonbiri-tereka.hatenablog.com

MLFlow Trackingのおさらい

MLFlowとは

MLFlowはプラットフォームです。機械学習のデプロイやトラッキング、実装のパッケージングやデプロイなど幅広くサポートしています。
その中ではいくつかの機能があり、主にMLflow Trackingを実験管理に利用している人が増えています。
Trackingの機能については申し分がなさそうで、リモートサーバでの運用なども対応できるので、複数人で利用するにも有用でしょう。(大体以前と同じ)

github.com

基本的な利用方法

実装

実装はほぼ以前と同様ですが、以下の通りです。

import argparse
import os
import tempfile

import mlflow
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import Variable
from torchvision import datasets, transforms

# Command-line arguments
parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
parser.add_argument('--batch-size', type=int, default=64, metavar='N',
                    help='input batch size for training (default: 64)')
parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',
                    help='input batch size for testing (default: 1000)')
parser.add_argument('--epochs', type=int, default=10, metavar='N',
                    help='number of epochs to train (default: 10)')
parser.add_argument('--lr', type=float, default=0.01, metavar='LR',
                    help='learning rate (default: 0.01)')
parser.add_argument('--momentum', type=float, default=0.5, metavar='M',
                    help='SGD momentum (default: 0.5)')
parser.add_argument('--enable-cuda', type=str, choices=['True', 'False'], default='True',
                    help='enables or disables CUDA training')
parser.add_argument('--seed', type=int, default=1, metavar='S',
                    help='random seed (default: 1)')
parser.add_argument('--log-interval', type=int, default=100, metavar='N',
                    help='how many batches to wait before logging training status')
args = parser.parse_args()

enable_cuda_flag = True if args.enable_cuda == 'True' else False

args.cuda = enable_cuda_flag and torch.cuda.is_available()

torch.manual_seed(args.seed)
if args.cuda:
    torch.cuda.manual_seed(args.seed)

kwargs = {'num_workers': 1, 'pin_memory': True} if args.cuda else {}
train_loader = torch.utils.data.DataLoader(
    datasets.MNIST('../data', train=True, download=True,
                   transform=transforms.Compose([
                       transforms.ToTensor(),
                       transforms.Normalize((0.1307,), (0.3081,))
                   ])),
    batch_size=args.batch_size, shuffle=True, **kwargs)
test_loader = torch.utils.data.DataLoader(
    datasets.MNIST('../data', train=False, transform=transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ])),
    batch_size=args.test_batch_size, shuffle=True, **kwargs)


class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return F.log_softmax(x, dim=0)


model = Net()
if args.cuda:
    model.cuda()

optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)


def train(epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        if args.cuda:
            data, target = data.cuda(), target.cuda()
        data, target = Variable(data), Variable(target)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % args.log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                       100. * batch_idx / len(train_loader), loss.data.item()))
            step = epoch * len(train_loader) + batch_idx
            log_scalar('train_loss', loss.data.item(), step)


def test(epoch):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            if args.cuda:
                data, target = data.cuda(), target.cuda()
            data, target = Variable(data), Variable(target)
            output = model(data)
            test_loss += F.nll_loss(output, target, reduction='sum').data.item()  # sum up batch loss
            pred = output.data.max(1)[1]  # get the index of the max log-probability
            correct += pred.eq(target.data).cpu().sum().item()

    test_loss /= len(test_loader.dataset)
    test_accuracy = 100.0 * correct / len(test_loader.dataset)
    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset), test_accuracy))
    step = (epoch + 1) * len(train_loader)
    log_scalar('test_loss', test_loss, step)
    log_scalar('test_accuracy', test_accuracy, step)


def log_scalar(name, value, step):
    """Log a scalar value to both MLflow and TensorBoard"""
    mlflow.log_metric(name, value)

active_run = mlflow.active_run()
for key, value in vars(args).items():
    mlflow.log_param(key, value)
# Create a SummaryWriter to write TensorBoard events locally
output_dir = dirpath = tempfile.mkdtemp()
# Perform the training
for epoch in range(1, args.epochs + 1):
    train(epoch)
    test(epoch)
# Upload the TensorBoard event logs as a run artifact
with tempfile.TemporaryDirectory()  as tmp:
    filename = os.path.join(tmp, "model.bin")
    torch.save(model.state_dict(), filename)
    mlflow.log_artifacts(tmp, artifact_path="results")

既存実装からそこまで改修を加える必要はありません。
mlflowの開始前には、active_run()を行います。
その後、mlflowの関数利用して、記録を残せます。記録を残すための関数は次の通りです。

項目 説明
log_param パラメータを記録する。batch_sizeなど
log_metric 時間で経過する数値を記録する。train_lossなど
log_artifacts 出力されたファイルを記録する。モデルなど

可視化

一覧画面

分析の一覧画面は次の通りです。
f:id:tereka:20200321195334p:plain

Searchの箇所にクエリを入力し、データを絞ることが可能です。
また、「Columns」の左にある表示のクリックによりテーブルのUIを変更できます。
更には各メトリクスの値をクリックすることでソートが可能です。

個別画面

一般的な実験条件です。こちらは特に目新しいこともなく、情報が記載されています。
f:id:tereka:20200321200048p:plain

各ステップごとのメトリクスや実験で保存したファイルをmlflowの画面から閲覧できます。
f:id:tereka:20200321200057p:plain

メトリクス

取得したメトリクスは次のようにグラフとして可視化できます。
Pointを点にしたり、対数でスケーリングしたりすることも可能です。
f:id:tereka:20200321200302p:plain

比較画面

一覧の画面から複数の実験を選択し、「Compare」を押すと次のような画面に進みます。
この画面で実験の比較を行えます。複数の実験が一覧で並んでいるため、左右見比べたりするのは容易です。

f:id:tereka:20200321214411p:plain

より使いやすくするために

これまで標準的な方式を紹介しましたが、一歩使いやすくするためのTipsを以下に紹介します。

リモートサーバへの送信

今回はリモートのサーバを利用することにします。
もともとローカルに出力されていますが、set_tracking_uriを実行することで、ファイルの配置先、リモートへの出力も可能とします。
まず、リモートにmlflowサーバの立ち上げを行います。

mlflow server -h 0.0.0.0 -p 5000

次にリモートサーバへの設定を行います。
active_run関数の前にset_tracking_url関数を実行します。
サーバのエンドポイントはログを見る限り、httpsで起動していると思われがちですが、httpで記述する必要があります。
(ここですが、httpsで記述していて、少しはまりました)

この状態でもUIは利用できるので、記録が成功しているかはアクセスすればわかります。

mlflow.set_tracking_uri("http://192.168.1.5:5000")

また、実装を変更したくない場合、MLFLOW_TRACKING_URI環境変数の設定による変更も可能です。

実験名の設定

通常だと、名前がなく、日付のみで管理し難いです。
mlflowは、<実験>/の階層で管理しています。
これらの指定により管理しやすくなります。例えば、実験とrun_idを指定するには次のコードを記載すれば良いでしょう。

eid = mlflow.create_experiment("experiment")
mlflow.start_run(experiment_id=eid, run_name="run01")

既に実験を作成した場合は、get_experiment_by_name関数を呼び出すとexperiment_idを取得できます。

セキュリティ

セキュリティはmlflow側で担保していないので、別のソフトウェア(例:nginx)を利用する必要があります。
例えば、dockerを利用する場合だと次のサイトが参考になります。
ymym3412.hatenablog.com

最後に

MLFlow Trackingの利用をすれば予想通り、リモートで集約するなども可能です。
自分の作りたいものが作れそうではあるので、これを使いこなしてより実験管理を効率化したいと感じています。