3
0

TensorFlowでiTransformerを実装

Last updated at Posted at 2024-05-02

iTransformerとは

Transformerベースの多変量時系列予測モデルの多くは、各時刻の多変量をトークンとみなします。一方iTransformer[1]は、各変数の時系列をトークンとみなすことで変数同士の相関をより良く捉え、予測精度の向上を図りました。大雑把に言うと入出力のテンソルを転置し、vanilla Transformerからモジュールをいくつか引算しただけです。vanilla Transformerとの実装上の相違点を次の表にまとめました。

相違点 vanilla Transformer iTransformer
入出力の形状 (バッチサイズ, 時系列の長さ, 変数の数) (バッチサイズ, 変数の数, 時系列の長さ)
embedding層の出力の形状 (バッチサイズ, 時系列の長さ, d_model) (バッチサイズ, 変数の数, d_model)
positional encoding あり なし
look ahead masks あり なし
デコーダ あり なし
推論 次の時刻の多変量を逐次予測 時系列を一度に予測

所感

従来のTransformerベースの多変量時系列予測モデルは各変数のタイムステップの大きさや時系列の長さが等しいことを前提としていました。しかし、現実にはすべての変数を同じタイムステップで取得できるとは限りませんし、変数ごとに時系列の長さを変えて学習させたいという要望もあるでしょう。各変数のタイムステップの大きさや時系列の長さが異なる場合もアーキテクチャを変更せずに実装できるという点でiTransformerに期待しています。

TensorFlowによる実装

最後に、TensorFlowの公式チュートリアルにあるvanilla Transformerのソースコード[2]をお手本にiTransformerを実装したのでご紹介します。

import tensorflow as tf


def scaled_dot_product_attention(q, k, v):
    """アテンションの重みの計算
    q, k, vは最初の次元が一致していること
    k, vは最後から2番めの次元が一致していること
    引数:
        q: query shape == (..., variates_num_q, depth)
        k: key shape == (..., variates_num_k, depth)
        v: value shape == (..., variates_num_v, depth_v)

    戻り値:
        出力
    """
    matmul_qk = tf.matmul(
        q, k, transpose_b=True
    )  # (..., variates_num_q, variates_num_k)
    # matmul_qkをスケール
    dk = tf.cast(tf.shape(k)[-1], tf.float32)
    scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)

    # softmax は最後の軸(variates_num_k)について
    # 合計が1となるように正規化
    attention_weights = tf.nn.softmax(
        scaled_attention_logits, axis=-1
    )  # (..., variates_num_q, variates_num_k)

    output = tf.matmul(attention_weights, v)  # (..., variates_num_q, depth_v)

    return output


class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.num_heads = num_heads
        self.d_model = d_model

        assert d_model % self.num_heads == 0

        self.depth = d_model // self.num_heads

        self.wq = tf.keras.layers.Dense(d_model)
        self.wk = tf.keras.layers.Dense(d_model)
        self.wv = tf.keras.layers.Dense(d_model)

        self.dense = tf.keras.layers.Dense(d_model)

    def split_heads(self, x, batch_size):
        """最後の次元を(num_heads, depth)に分割。
        結果をshapeが(batch_size, num_heads, variates_num, depth)となるようにリシェイプする。
        """
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, v, k, q):
        batch_size = tf.shape(q)[0]

        q = self.wq(q)  # (batch_size, variates_num, d_model)
        k = self.wk(k)  # (batch_size, variates_num, d_model)
        v = self.wv(v)  # (batch_size, variates_num, d_model)

        q = self.split_heads(
            q, batch_size
        )  # (batch_size, num_heads, variates_num_q, depth)
        k = self.split_heads(
            k, batch_size
        )  # (batch_size, num_heads, variates_num_k, depth)
        v = self.split_heads(
            v, batch_size
        )  # (batch_size, num_heads, variates_num_v, depth)

        # scaled_attention.shape == (batch_size, num_heads, variates_num_q, depth)
        scaled_attention = scaled_dot_product_attention(q, k, v)

        scaled_attention = tf.transpose(
            scaled_attention, perm=[0, 2, 1, 3]
        )  # (batch_size, variates_num_q, num_heads, depth)

        concat_attention = tf.reshape(
            scaled_attention, (batch_size, -1, self.d_model)
        )  # (batch_size, variates_num_q, d_model)

        output = self.dense(concat_attention)  # (batch_size, variates_num_q, d_model)

        return output


def point_wise_feed_forward_network(d_model, dff):
    return tf.keras.Sequential(
        [
            tf.keras.layers.Dense(
                dff, activation="gelu"
            ),  # (batch_size, variates_num, dff)
            tf.keras.layers.Dense(d_model),  # (batch_size, variates_num, d_model)
        ]
    )


class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super().__init__()

        self.mha = MultiHeadAttention(d_model, num_heads)
        self.ffn = point_wise_feed_forward_network(d_model, dff)

        self.dropout1 = tf.keras.layers.Dropout(rate)
        self.dropout2 = tf.keras.layers.Dropout(rate)
        
        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

    def call(self, x, training):
        attn_output = self.mha(x, x, x)  # (batch_size, variates_num, d_model)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(x + attn_output)  # (batch_size, variates_num, d_model)

        ffn_output = self.ffn(out1)  # (batch_size, variates_num, d_model)
        ffn_output = self.dropout2(ffn_output, training=training)
        out2 = self.layernorm2(out1 + ffn_output)  # (batch_size, variates_num, d_model)

        return out2


class Encoder(tf.keras.layers.Layer):
    def __init__(
        self,
        num_layers,
        d_model,
        num_heads,
        dff,
        rate=0.1,
    ):
        super().__init__()

        self.d_model = d_model
        self.num_layers = num_layers

        self.embedding = tf.keras.layers.Dense(d_model)
        self.dropout = tf.keras.layers.Dropout(rate)

        self.enc_layers = [
            EncoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)
        ]

    def call(self, x, training):
        x = self.embedding(x)  # (batch_size, variates_num, d_model)
        x = self.dropout(x, training=training)

        for i in range(self.num_layers):
            x = self.enc_layers[i](x, training=training)

        return x  # (batch_size, variates_num, d_model)


class ITransformer(tf.keras.Model):
    def __init__(
        self,
        num_layers,
        d_model,
        num_heads,
        dff,
        tar_seq_len,
        rate=0.1,
        name="i_transformer",
    ):
        super().__init__()

        self.encoder = Encoder(num_layers, d_model, num_heads, dff, rate)

        self.final_layer = tf.keras.layers.Dense(tar_seq_len)

    def call(self, inp, training):
        inp = tf.transpose(
            inp, perm=[0, 2, 1]
        )  # (batch_size, variates_num, inp_seq_len)

        enc_output = self.encoder(
            inp, training=training
        )  # (batch_size, variates_num, d_model)

        final_output = self.final_layer(
            enc_output
        )  # (batch_size, variates_num, tar_seq_len)

        final_output = tf.transpose(
            final_output, perm=[0, 2, 1]
        )  # (batch_size, tar_seq_len, variates_num)

        return final_output

参考文献

  1. Liu, Yong, et al. "itransformer: Inverted transformers are effective for time series forecasting." arXiv preprint arXiv:2310.06625 (2023).
  2. https://www.tensorflow.org/tutorials/text/transformer?hl=ja
3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0