LoginSignup
8
5

More than 3 years have passed since last update.

[TensorFlow 2/Keras] KerasでもCTC Lossを使った学習を動かす方法

Last updated at Posted at 2020-04-27

はじめに

前回の記事で、TensorFlow 2.xで可変長データを入出力に取るモデル (RNN) をCTC (Connectionist Temporal Classification) Lossを使って学習する方法を書きました。
[TensorFlow 2] RNNをCTC Lossを使って学習してみる - Qiita

しかし、一つ積み残していたものがありました。KerasでCTC Lossをうまく扱う方法です。
前回の記事で試してはみたものの、怪しげなハックだらけになった上に処理が遅いという、目も当てられない結果になってしまいました。
今回、その解決策が見つかったのでメモしておきます。

ここに書いた方法は、CTC Lossのみならず、特殊な損失関数を定義して学習したい場合に応用できるのではないかと思います。

検証環境

  • Ubuntu 18.04
  • Python 3.6.9
  • TensorFlow 2.1.0 (CPU)

損失関数の定義方法は compile() だけではない

前回の敗因は、つまるところKerasの Model.compile() で無理やりCTC Lossを定義しようとしてドツボにはまった、ということに尽きると思います。
しかし、実は Model.compile() 以外にも損失関数や評価尺度(正解率など)を追加する方法が存在していました。

Train and evaluate with Keras | TensorFlow Core

The overwhelming majority of losses and metrics can be computed from y_true and y_pred, where y_pred is an output of your model. But not all of them. For instance, a regularization loss may only require the activation of a layer (there are no targets in this case), and this activation may not be a model output.

In such cases, you can call self.add_loss(loss_value) from inside the call method of a custom layer. Here's a simple example that adds activity regularization (note that activity regularization is built-in in all Keras layers -- this layer is just for the sake of providing a concrete example):
(略)
You can do the same for logging metric values:
(略)

独自レイヤーを定義して add_loss() を使用すれば、(y_true, y_pred) のプロトタイプにこだわらずに損失関数を定義できるとのことです!
いやはや、ちゃんとチュートリアルは読まないとダメですね…orz

損失関数を定義する add_loss() と、評価尺度を定義する add_metric() のAPIの説明は、以下のページに書いてあります。
tf.keras.layers.Layer | TensorFlow Core v2.1.0

チュートリアルのサンプルコードからも分かるように、損失関数や評価尺度は Tensor であり、これらを Tensor の演算で組み立てます。サンプルコードに含まれている x1 は、レイヤーからの出力を表す Tensor であり、損失関数を表現するために使用することができます。

inputs = keras.Input(shape=(784,), name='digits')
x1 = layers.Dense(64, activation='relu', name='dense_1')(inputs)
x2 = layers.Dense(64, activation='relu', name='dense_2')(x1)
outputs = layers.Dense(10, name='predictions')(x2)
model = keras.Model(inputs=inputs, outputs=outputs)

model.add_loss(tf.reduce_sum(x1) * 0.1)

model.add_metric(keras.backend.std(x1),
                 name='std_of_activation',
                 aggregation='mean')

今回の場合、特徴量系列・ラベル系列の長さ情報、さらにはラベル系列そのものも、CTC Loss計算に必要な情報ですから、Tensor として持っていなければなりません。つまり、これらもモデルへの入力として(yではなく、xとして)与える必要があるわけですね。つまり、複数の入力を持つモデルを作ることになります。

うまく動いたサンプルコード

大元のソースコードは
GitHub - igormq/ctc_tensorflow_example: CTC + Tensorflow Example for ASR
です。

ctc_tensorflow_example_tf2_keras.py
#  Compatibility imports
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import time

import tensorflow as tf
import scipy.io.wavfile as wav
import numpy as np

from six.moves import xrange as range

try:
    from python_speech_features import mfcc
except ImportError:
    print("Failed to import python_speech_features.\n Try pip install python_speech_features.")
    raise ImportError

from utils import maybe_download as maybe_download
from utils import sparse_tuple_from as sparse_tuple_from

# Constants
SPACE_TOKEN = '<space>'
SPACE_INDEX = 0
FIRST_INDEX = ord('a') - 1  # 0 is reserved to space
FEAT_MASK_VALUE = 1e+10

# Some configs
num_features = 13
num_units = 50 # Number of units in the LSTM cell
# Accounting the 0th indice +  space + blank label = 28 characters
num_classes = ord('z') - ord('a') + 1 + 1 + 1

# Hyper-parameters
num_epochs = 400
num_layers = 1
batch_size = 2
initial_learning_rate = 0.005
momentum = 0.9

# Loading the data

audio_filename = maybe_download('LDC93S1.wav', 93638)
target_filename = maybe_download('LDC93S1.txt', 62)

fs, audio = wav.read(audio_filename)

# create a dataset composed of data with variable lengths
inputs = mfcc(audio, samplerate=fs)
inputs = (inputs - np.mean(inputs))/np.std(inputs)
inputs_short = mfcc(audio[fs*8//10:fs*20//10], samplerate=fs)
inputs_short = (inputs_short - np.mean(inputs_short))/np.std(inputs_short)
# Transform in 3D array
train_inputs = tf.ragged.constant([inputs, inputs_short], dtype=np.float32)
train_seq_len = tf.cast(train_inputs.row_lengths(), tf.int32)
train_inputs = train_inputs.to_tensor(default_value=FEAT_MASK_VALUE)

# Reading targets
with open(target_filename, 'r') as f:

    #Only the last line is necessary
    line = f.readlines()[-1]

    # Get only the words between [a-z] and replace period for none
    original = ' '.join(line.strip().lower().split(' ')[2:]).replace('.', '')
    targets = original.replace(' ', '  ')
    targets = targets.split(' ')

# Adding blank label
targets = np.hstack([SPACE_TOKEN if x == '' else list(x) for x in targets])

# Transform char into index
targets = np.asarray([SPACE_INDEX if x == SPACE_TOKEN else ord(x) - FIRST_INDEX
                      for x in targets])
# Creating sparse representation to feed the placeholder
train_targets = tf.ragged.constant([targets, targets[13:32]], dtype=np.int32) 
train_targets_len = tf.cast(train_targets.row_lengths(), tf.int32)
train_targets = train_targets.to_sparse() 

# We don't have a validation dataset :(
val_inputs, val_targets, val_seq_len, val_targets_len = train_inputs, train_targets, \
                                                        train_seq_len, train_targets_len

# THE MAIN CODE!

# add loss and metrics with a custom layer
class CTCLossLayer(tf.keras.layers.Layer):
    def call(self, inputs):
        labels = inputs[0]
        logits = inputs[1]
        label_len = inputs[2]
        logit_len = inputs[3]

        logits_trans = tf.transpose(logits, (1, 0, 2))
        label_len = tf.reshape(label_len, (-1,))
        logit_len = tf.reshape(logit_len, (-1,))
        loss = tf.reduce_mean(tf.nn.ctc_loss(labels, logits_trans, label_len, logit_len, blank_index=-1))
        # define loss here instead of compile()
        self.add_loss(loss)

        # decode
        decoded, _ = tf.nn.ctc_greedy_decoder(logits_trans, logit_len)

        # Inaccuracy: label error rate
        ler = tf.reduce_mean(tf.edit_distance(tf.cast(decoded[0], tf.int32),
                                          labels))
        self.add_metric(ler, name="ler", aggregation="mean")

        return logits  # Pass-through layer.

# Defining the cell
# Can be:
#   tf.nn.rnn_cell.RNNCell
#   tf.nn.rnn_cell.GRUCell 
cells = []
for _ in range(num_layers):
    cell = tf.keras.layers.LSTMCell(num_units)  # Or LSTMCell(num_units)
    cells.append(cell)
stack = tf.keras.layers.StackedRNNCells(cells)

input_feature     = tf.keras.layers.Input((None, num_features), name="input_feature")
input_label       = tf.keras.layers.Input((None,), dtype=tf.int32, sparse=True, name="input_label")
input_feature_len = tf.keras.layers.Input((1,), dtype=tf.int32, name="input_feature_len")
input_label_len   = tf.keras.layers.Input((1,), dtype=tf.int32, name="input_label_len")

layer_masking = tf.keras.layers.Masking(FEAT_MASK_VALUE)(input_feature)
layer_rnn     = tf.keras.layers.RNN(stack, return_sequences=True)(layer_masking)
layer_output  = tf.keras.layers.Dense(
                   num_classes,
                   kernel_initializer=tf.keras.initializers.TruncatedNormal(0.0, 0.1),
                   bias_initializer="zeros",
                   name="logit")(layer_rnn)
layer_loss = CTCLossLayer()([input_label, layer_output, input_label_len, input_feature_len])

# create models for training and prediction (sharing weights)
model_train = tf.keras.models.Model(
            inputs=[input_feature, input_label, input_feature_len, input_label_len],
            outputs=layer_loss)

model_predict = tf.keras.models.Model(inputs=input_feature, outputs=layer_output)

optimizer = tf.keras.optimizers.SGD(initial_learning_rate, momentum)
# adding no loss: we have already defined with a custom layer
model_train.compile(optimizer=optimizer)

# training: y is dummy!
model_train.fit(x=[train_inputs, train_targets, train_seq_len, train_targets_len], y=None,
                validation_data=([val_inputs, val_targets, val_seq_len, val_targets_len], None),
                epochs=num_epochs)

# Decoding
print('Original:')
print(original)
print(original[13:32])
print('Decoded:')
decoded, _ = tf.nn.ctc_greedy_decoder(tf.transpose(model_predict.predict(train_inputs), (1, 0, 2)), train_seq_len)
d = tf.sparse.to_dense(decoded[0], default_value=-1).numpy()
str_decoded = [''.join([chr(x + FIRST_INDEX) for x in np.asarray(row) if x != -1]) for row in d]
for s in str_decoded:
    # Replacing blank label to none
    s = s.replace(chr(ord('z') + 1), '')
    # Replacing space label to space
    s = s.replace(chr(ord('a') - 1), ' ')
    print(s)

実行結果は以下のようになります。

Train on 2 samples, validate on 2 samples
Epoch 1/400
2/2 [==============================] - 2s 991ms/sample - loss: 546.3565 - ler: 1.0668 - val_loss: 464.2611 - val_ler: 0.8801
Epoch 2/400
2/2 [==============================] - 0s 136ms/sample - loss: 464.2611 - ler: 0.8801 - val_loss: 179.9780 - val_ler: 1.0000
(略)
Epoch 400/400
2/2 [==============================] - 0s 135ms/sample - loss: 1.6670 - ler: 0.0000e+00 - val_loss: 1.6565 - val_ler: 0.0000e+00
Original:
she had your dark suit in greasy wash water all year
dark suit in greasy
Decoded:
she had your dark suit in greasy wash water all year
dark suit in greasy

処理時間も誤り率の値も問題なさそうで、ようやくまともに動いたようです…!
(処理時間はサンプル数で割った値なので実際の時間は表示の値の2倍になりますが、2サンプルで300ms以下なら前回と同等といえるでしょう)

解説

Loss, Metricsの追加

最初にご説明したように、Layer.add_loss() を使うことによって、モデルに関係する Tensor を自由に使って損失関数を定義することができます。上のコードでは CTCLossLayer というレイヤーを定義して、call() の中ではTensorFlow 2.x版(前回記事参照)の学習ループとほぼ同様の処理を書いています。最後に、入力された logits をそのまま出力するようにしています。

ここで call() には、self を除いて4つの引数を持たせています。この4つの情報があれば、CTC Lossおよびデコードを行うことができます。モデル構築時には、以下のようにレイヤーへの入力も4つ必要となります。

layer_loss = CTCLossLayer()(input_label, layer_output, input_label_len, input_feature_len)

入力レイヤー

先ほどの引数で与えている情報は Tensor になっていなければいけません。layer_output は普通のKerasモデルと変わらないのですが、input_label, input_label_len, input_feature_len については、入力レイヤーを追加して対応しています。

input_feature     = tf.keras.layers.Input((None, num_features), name="input_feature")
input_label       = tf.keras.layers.Input((None,), dtype=tf.int32, sparse=True, name="input_label")
input_feature_len = tf.keras.layers.Input((1,), dtype=tf.int32, name="input_feature_len")
input_label_len   = tf.keras.layers.Input((1,), dtype=tf.int32, name="input_label_len")

ご覧のように、適切な形状と dtype を指定して Input レイヤーを作っています。特徴量以外のレイヤーの dtypeint32 としておけばよいです。
tf.nn.ctc_loss の引数の指定を考えると、本当は input_feature_leninput_label_len の形状を () とでもしたいところなのですが、後段でエラーになってうまく動かせませんでした。そのため、形状は (1,) と書いておき、CTCLossLayer の中で reshape を行って使っています。

もう一つしれっと追加しているのが、input_label 作成時の sparse=True です。これを指定しておくと、input_label に対応する TensorSparseTensor になります。
tf.keras.Input | TensorFlow Core v2.1.0

この sparse=True は、デコード結果の誤り率の計算時に、正解ラベルを SparseTensor で渡さなければならないための処置です(CTC Lossの計算で使う tf.nn.ctc_lossSparseTensor を受けることができます)。Model.fit() などで与えるデータも SparseTensor で作っておきます。
tf.nn.ctc_loss | TensorFlow Core v2.1.0
tf.edit_distance | TensorFlow Core v2.1.0

同様に RaggedTensor の入力を想定する場合には ragged=True というのもあるようです。

モデルの作り方

以下のように、学習用と予測(推論)用のモデルを別個に作成しています。

model_train = tf.keras.models.Model(
            inputs=[input_feature, input_label, input_feature_len, input_label_len],
            outputs=layer_loss)

model_predict = tf.keras.models.Model(inputs=input_feature, outputs=layer_output)

学習時には4つの入力が必要でしたが、予測(デコード)時にはLogits(つまり Dense の出力)だけがあればよいので、入力としては特徴量があれば十分です。そのため、予測用のモデルは入力が1つだけで動くようにしています。もちろんLossは計算できませんが、デコード用途だけなら不要ですので、出力としては CTCLossLayer を通す前の layer_output を指定しておきます。

図で描くと、つまりこういうことになります。
image.png

重みが入っているレイヤーは共有するように作成していますので、学習用モデルで学習を行った後、そのまま予測用モデルで推論することができます。

モデルのコンパイル

CTC Lossは独自レイヤーで定義してしまったので、compile() では損失関数を定義する必要がありません。その場合、loss 引数を単に指定しなければよいです。

model_train.compile(optimizer=optimizer)

学習の実行

損失関数の計算に使用するための正解ラベルや長さ情報は、入力レイヤーに流す情報ですから Model.fit() の引数の x 側に指定する必要があります。y には指定するものがなくなりましたので、None を書いておきます。

validation_data も同様に、タプルの2番目に None を書いておきます。

model_train.fit(x=[train_inputs, train_targets, train_seq_len, train_targets_len], y=None,
                validation_data=([val_inputs, val_targets, val_seq_len, val_targets_len], None),
                epochs=num_epochs)

正直これが公式が想定する使い方なのかどうか分かりませんが、compile()loss を指定しない場合には y=None にしても問題なく動いています(loss を指定しているときは、損失関数の y_true 引数に渡すラベルが必要なので、当然 y に何かデータを与えないとエラーになります)。

デコード結果の出力

前述の通り、推論時には model_predict の方を使います。predict() の引数には特徴量系列のみを与えればOKです。

decoded, _ = tf.nn.ctc_greedy_decoder(tf.transpose(model_predict.predict(train_inputs), (1, 0, 2)), train_seq_len)

その他気になること

  • Maskinginput_feature_len は働きが似ているので、なんだか冗長な気がします…。

まとめ

ちゃんとチュートリアルを読めばKerasでもCTC Lossを使って学習を実行できました。意外とKerasも小回りが利くのですね。どうもすみませんでした。

8
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
5