Help us understand the problem. What is going on with this article?

tf.learnとtf.slimで主なDCNNを書く

More than 3 years have passed since last update.

tf.learn/tf.slimとは

TensorFlowのリポジトリで開発中(TensorFlow0.11時点)の、TensorFlow向け高レベルインターフェース
Cloud Machine Learningがオープンベータになって、今後使えそうだけどまともなドキュメントもないので調べてみた
練習用なので、わかりやすさよりもtf.slimを使い倒すことを重視している

学習・推論

n_classは分類したいクラスの数
X_trainは訓練用画像データ(sample, width, height, channel)
y_trainは正解ラベル(長さsampleのリスト)

import tensorflow as tf
learn = tf.contrib.learn
slim = tf.contrib.slim

class CustomMonitor(learn.monitors.EveryN):
    def begin(self, max_steps):
        super(CustomMonitor, self).begin(max_steps)
        print('Start training')

    def end(self):
        super(CustomMonitor, self).end()
        print('Completed')

    def every_n_step_begin(self, step):
        return ['loss/value:0']

    def every_n_step_end(self, step, outputs):
        print('Step %d - loss: %s' % (step, outputs['loss/value:0']))

classifier = learn.Estimator(model_fn=model, model_dir='/tmp/my_model')
classifier.fit(x=X_train, y=y_train, steps=20000, batch_size=128, 
               monitors=learn.monitors.get_default_monitors(save_summary_steps=1000)+\
                        [CustomMonitor(every_n_steps=10, first_n_steps=0)])

VGG

畳み込み→プーリングを繰り返して特徴を抽出し、最後に全結合層を通す
思想がわかりやすく、書きやすいのでDCNNが初めての人向き
conv2dで全結合を書くサンプルがあったので、両方使ってみた

vgg.py
mode = 'conv'

def fc_drop(inputs, *args, **kwargs):
    net = slim.fully_connected(inputs, *args, **kwargs)

    return slim.dropout(net, 0.5)

def conv_drop(inputs, *args, **kwargs):
    net = slim.conv2d(inputs, kernel_size=args[0], activation_fn=tf.nn.elu,
                      **kwargs)

    return slim.dropout(net, 0.5)

def model(x, y):
    net = x
    y = slim.one_hot_encoding(y, n_class)

    with slim.arg_scope([slim.conv2d, slim.fully_connected],
                        activation_fn=tf.nn.elu, normalizer_fn=slim.batch_norm):
        for i, n_repeat in enumerate([2, 2, 3, 3]):
            layer = i + 1
            net = slim.repeat(net, n_repeat, slim.conv2d, 64 * 2 ** i, [3, 3],
                              scope='conv%d' % layer)
            net = slim.max_pool2d(net, [2, 2], scope='pool%d' % layer)

        if mode == 'fc':
            net = slim.flatten(net, scope='flatten')
            net = slim.stack(net, fc_drop, [4096, 4096], scope='fc')
            logits = slim.fully_connected(net, n_class, activation_fn=None,
                                          normalizer_fn=None, scope='logits')
        else:
            net = slim.stack(net, conv_drop, [net.get_shape()[1:3], [1, 1]],
                             num_outputs=4096, padding='VALID', scope='fc')
            net = slim.conv2d(net, n_class, [1, 1], activation_fn=None,
                              normalizer_fn=None, scope='logits')
            logits = slim.flatten(net)

        loss = slim.losses.softmax_cross_entropy(logits, y, scope='loss')
        train_op = slim.optimize_loss(loss, slim.get_global_step(),
                                      learning_rate=0.01, optimizer='Adam')

    return {'class': tf.argmax(logits, 1), 'prob': slim.softmax(logits)}, loss, train_op

Network in Network

畳み込み層の後に局所全結合をつけたもの
例えば特徴マップの座標(1,1)に注目すると次の層の座標(1,1)と全結合になっている
畳み込みのカーネルや活性化関数を複雑・抽象化するイメージ
最後はGlobal Average Poolingで推論値を出す
元論文
日本語解説

nin.py
n_kernel = [96, 256, 384]
size_kernel = [11, 5, 3]
mode = 'conv'

def nin(inputs, *args, **kwargs):
    return slim.conv2d(inputs, kernel_size=args[0], **kwargs)

def model(x, y):
    net = x
    y = slim.one_hot_encoding(y, n_class)

    with slim.arg_scope([slim.conv2d], activation_fn=tf.nn.elu, normalizer_fn=slim.batch_norm):
        for i, size in enumerate(size_kernel):
            layer = i + 1
            net = slim.stack(net, nin, [size, 1, 1], num_outputs=n_kernel[i], scope='conv%d' % layer)

            scope = 'pool%d' % layer
            if mode == 'conv':
                net = slim.conv2d(net, n_kernel[i], [3, 3], stride=2, padding='VALID', scope=scope)
            else:
                if layer == 1 or mode == 'max_pool':
                    net = slim.max_pool2d(net, [3, 3], scope=scope)
                else:
                    net = slim.avg_pool2d(net, [3, 3], scope=scope)
            if layer > len(n_kernel) - 2:
                net = slim.dropout(net, 0.5)

        layer = len(n_kernel) + 1
        net = slim.conv2d(net, 1024, [3, 3], scope='conv%d_1' % layer)
        net = slim.conv2d(net, 1024, [1, 1], scope='conv%d_2' % layer)
        net = slim.conv2d(net, n_class, [1, 1], scope='conv%d_3' % layer)
        net = slim.avg_pool2d(net, net.get_shape()[1:3], scope='logits')
        logits = slim.flatten(net)

        loss = slim.losses.softmax_cross_entropy(logits, y, scope='loss')
        train_op = slim.optimize_loss(loss, slim.get_global_step(),
                                      learning_rate=0.01, optimizer='Adam')

    return {'class': tf.argmax(logits, 1), 'prob': slim.softmax(logits)}, loss, train_op

Residual Network

DCNNの層を深くしすぎると性能が悪化する問題を解決するため、元の入力と最適な出力の差を学習するようにしたもの
元論文
日本語解説

res_net.py
n_reputations = [3, 8, 36, 3]
n_outputs = [64 * 2 ** (i + 1) for i in range(len(n_reputations))]
mode = 'bottleneck'

def bn_actv_conv(inputs, stride, *args, **kwargs):
    net = slim.batch_norm(inputs)

    return slim.conv2d(net, stride=stride, activation_fn=None, *args, **kwargs)

def shortcut(identity, residual, stride, channel):
    if stride > 1:
        identity = slim.max_pool2d(identity, 1, stride, scope='downsample')
    if channel:
        identity = tf.pad(identity, 
                          [[0, 0], [0, 0], [0, 0], [channel, channel]], 
                          name='projection')

    return identity + residual

def basic_unit(inputs, stride, num_outputs, *args, **kwargs):
    residual = slim.stack(inputs, bn_actv_conv, [stride, 1], 
                          num_outputs=num_outputs, kernel_size=[3, 3], *args, 
                          **kwargs)

    return shortcut(inputs, residual, stride, 
                    (num_outputs - inputs.get_shape().as_list()[-1]) // 2)

def bottleneck(inputs, stride, num_outputs, *args, **kwargs):
    kwargs.pop('scope')
    residual = bn_actv_conv(inputs, stride, num_outputs=num_outputs, 
                            kernel_size=[1, 1], *args, **kwargs)
    residual = bn_actv_conv(residual, 1, num_outputs=num_outputs, 
                            kernel_size=[3, 3], *args, **kwargs)
    residual = bn_actv_conv(residual, 1, num_outputs=num_outputs * 4, 
                            kernel_size=[1, 1], *args, **kwargs)

    return shortcut(inputs, residual, stride, 
                    (num_outputs * 4 - inputs.get_shape().as_list()[-1]) // 2)

def residual_block(inputs, id_block, *args, **kwargs):
    n_reputation = n_reputations[id_block]
    num_outputs = n_outputs[id_block]
    downsample = False if id_block == 0 else True
    strides = [2 if downsample and i == 0 else 1 for i in range(n_reputation)]
    channel_diff = num_outputs - inputs.get_shape().as_list()[-1]
    scope = 'conv%d' % (id_block + 2)

    if mode == 'basic':
        net = slim.stack(inputs, basic_unit, strides, num_outputs=num_outputs, 
                         scope=scope)
    else:
        net = slim.stack(inputs, bottleneck, strides, num_outputs=num_outputs, 
                         scope=scope)

    return net

def model(x, y):
    net = x
    y = slim.one_hot_encoding(y, n_class)

    with slim.arg_scope([slim.batch_norm], activation_fn=tf.nn.elu):
        net = slim.conv2d(net, n_outputs[0] // 2, [7, 7], stride=2, 
                          activation_fn=None, scope='conv1')
        net = slim.max_pool2d(net, [3, 3], padding='SAME', scope='pool1')

        net = slim.stack(net, residual_block, 
                         [i for i in range(len(n_reputations))])

        net = slim.batch_norm(net)
        net = slim.avg_pool2d(net, net.get_shape()[1:3])
        net = slim.flatten(net)
        logits = slim.fully_connected(net, n_class, activation_fn=None, 
                                      scope='logits')

    loss = slim.losses.softmax_cross_entropy(logits, y, scope='loss')
    train_op = slim.optimize_loss(loss, slim.get_global_step(),
                                  learning_rate=0.01, optimizer='Adam')

    return {'class': tf.argmax(logits, 1), 'prob': slim.softmax(logits)}, loss, train_op

Inception

時間がかかりそうなので、そのうち追加予定

shngt
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away