Python
DeepLearning
強化学習
DQN
TensorFlow

TensorFlowでDQNを実装(したかった・・・)

More than 1 year has passed since last update.

下記のような記事を読み、DQN(Deep Q-Network)って面白そうだな~と感心。最近話題のAlpha-GoもDQNの延長・・・なのかな?(よく分かってない)
DQNの生い立ち + Deep Q-NetworkをChainerで書いた
倒立振子で学ぶ DQN (Deep Q Network)
Chainerで機械学習と戯れる: 足し算ゲームをChainerを使って強化学習できるか?

そんなわけで、TensorFlowで実装してみようとしましたが・・・・(-_-;)???
よく分かりません。いや、理論も数式もよく分かってないでやろうとしている私が問題なんですけど。TensorFlowの例も少な過ぎじゃないでしょうか。
とりあえず見よう見真似で頑張ってみたので、勘違いや修正すべきところ等ありましたらコメントしていただけると幸いです。「この辺は正しい」「この辺はオカシイ」等でも大変助かります。

その他参考にしたサイト:
Deep-Q learning Pong with Tensorflow and PyGame
このページの上半分に載っているソースコードを多分に参考にしました。

実装内容

下記のようなゲームを考えます。
- 0から100までの数直線を考えます。
- プログラムは0から100に向けて出発します。
- プログラムの選択肢は2択で、+1か+2進むことが出来ます。
- 止まった場所が2の倍数のとき、+1の報酬を貰え、8の倍数のときはー1の報酬(というか罰則)とします。
何回か訓練してみて、プログラムがどう動くかを見ます。

環境

TensorFlow 0.7
Ubuntu 14.04
GCE vCPU x8 インスタンス

実装

ソースコードは一番下に張っておきますが、部分部分を説明していきます。

グラフ作成

def inference(x_ph):

    with tf.name_scope('hidden1'):
        weights = tf.Variable(tf.truncated_normal([NUM_IMPUT, NUM_HIDDEN1], stddev=stddev), name='weights')
        biases = tf.Variable(tf.zeros([NUM_HIDDEN1], dtype=tf.float32), name='biases')
        hidden1 = tf.matmul(x_ph, weights) + biases

    with tf.name_scope('hidden2'):
        weights = tf.Variable(tf.truncated_normal([NUM_HIDDEN1, NUM_HIDDEN2], stddev=stddev), name='weights')
        biases = tf.Variable(tf.zeros([NUM_HIDDEN2], dtype=tf.float32), name='biases')
        hidden2 = tf.matmul(hidden1, weights) + biases

    with tf.name_scope('output'):
        weights = tf.Variable(tf.truncated_normal([NUM_HIDDEN2, NUM_OUTPUT], stddev=stddev), name='weights')
        biases = tf.Variable(tf.zeros([NUM_OUTPUT], dtype=tf.float32), name='biases')
        y = tf.matmul(hidden2, weights) + biases

    return y

隠れ層は2つ、ユニット数はそれぞれ100,100とか。この辺の数は適当です。(いじりつつ試しています)
入力は現在の位置を示す数字1つのみ。出力は2つ、+1したときと+2したときの予想報酬になる(と思う)。
変数の初期化は、ゼロ初期化しているのを多く見かけましたが、そうすると上手く動かなかったためランダム初期化。(問題あり?)
活性化関数はreluを使ってたら上手くいかず、活性化関数無しで繋いだら多少マトモに動いたので無しのままです。(問題あり?)

損失計算

def loss(y, y_ph):
    return tf.reduce_mean(tf.nn.l2_loss((y - y_ph)))

損失計算は2乗して1/2するらしいので、それに相当するAPIで実装。

実際に訓練する部分

def getNextPositionReward(choice_position):

    if choice_position % 8 == 0:
        next_position_reward = -1.
    elif choice_position % 2 == 0:
        next_position_reward = 1.
    else:
        next_position_reward = 0.

    return next_position_reward

次に進む場所が8の倍数なら罰則、2の倍数なら報酬を返す関数。

def getNextPosition(position, action_reward1, action_reward2):

    if random.random() < RANDOM_FACTOR:
        if random.randint(0, 1) == 0:
            next_position = position + 1
        else:
            next_position = position + 2
    else:
        if action_reward1 > action_reward2:
            next_position = position + 1
        else:
            next_position = position + 2

    return next_position

2つの報酬を比較して、+1進むのか+2進むのかを考えている部分。
訓練時は一定のランダム要素を入れて進行するようにしています。

    for i in range(REPEAT_TIMES):
        position = 0.
        position_history = []
        reward_history = []

        while(True):
            if position >= GOAL:
                break

            choice1_position = position + 1.
            choice2_position = position + 2.

            next_position1_reward = getNextPositionReward(choice1_position)
            next_position2_reward = getNextPositionReward(choice2_position)

            reward1 = sess.run(y, feed_dict={x_ph: [[choice1_position]]})[0]
            reward2 = sess.run(y, feed_dict={x_ph: [[choice2_position]]})[0]

            action_reward1 = next_position1_reward + GAMMA * np.max(reward1)
            action_reward2 = next_position2_reward + GAMMA * np.max(reward2)

            position_history.append([position])
            reward_history.append([action_reward1, action_reward2])

            position = getNextPosition(position, action_reward1, action_reward2)

        sess.run(train_step, feed_dict={x_ph: position_history, y_ph: reward_history})

訓練部分(抜粋)。
2択の選択肢があり、その報酬を比較して、報酬が高い方を選ぶ。報酬は「次のポジションで(確実に)得られる報酬」と「さらにその後で得られる(であろう)報酬の予測値」の最大値の合計。
また、2つの報酬と現在のポジションをリストにまとめて、教師あり学習をさせる。
これを1000回ほど繰り返しています。
⇒この辺不安です。盛大に勘違いしているんじゃないかと思ってます。

結果

訓練後に実際にどう動いたかの軌跡を見てみます。

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98]

完全にただの偶数を返しています。8の倍数も完璧に踏んでるやん!
プラスとなる報酬は取りに行っている感じはありますが、マイナスを避けている感じはありません。
報酬のマイナス値を大きくしたりと色々試したところ、上記数値はバラけるので数値が固定値になっているとかそういうことではないようですが、理想的な動きはしてくれませんでした・・・・。
ちなみに損失は収束はしています。

ソースコード(全部)

import tensorflow as tf
import numpy as np
import random

# definition
NUM_IMPUT = 1
NUM_HIDDEN1 = 100
NUM_HIDDEN2 = 100
NUM_OUTPUT = 2
LEARNING_RATE = 0.1
REPEAT_TIMES = 100
GOAL = 100
LOG_DIR = "tf_log"
GAMMA = 0.8
stddev = 0.01
RANDOM_FACTOR = 0.1

def inference(x_ph):

    with tf.name_scope('hidden1'):
        weights = tf.Variable(tf.truncated_normal([NUM_IMPUT, NUM_HIDDEN1], stddev=stddev), name='weights')
        biases = tf.Variable(tf.zeros([NUM_HIDDEN1], dtype=tf.float32), name='biases')
        hidden1 = tf.matmul(x_ph, weights) + biases

    with tf.name_scope('hidden2'):
        weights = tf.Variable(tf.truncated_normal([NUM_HIDDEN1, NUM_HIDDEN2], stddev=stddev), name='weights')
        biases = tf.Variable(tf.zeros([NUM_HIDDEN2], dtype=tf.float32), name='biases')
        hidden2 = tf.matmul(hidden1, weights) + biases

    with tf.name_scope('output'):
        weights = tf.Variable(tf.truncated_normal([NUM_HIDDEN2, NUM_OUTPUT], stddev=stddev), name='weights')
        biases = tf.Variable(tf.zeros([NUM_OUTPUT], dtype=tf.float32), name='biases')
        y = tf.matmul(hidden2, weights) + biases

    return y

def loss(y, y_ph):
    return tf.reduce_mean(tf.nn.l2_loss((y - y_ph)))

def optimize(loss):
    optimizer = tf.train.AdamOptimizer(LEARNING_RATE)
    train_step = optimizer.minimize(loss)
    return train_step

def getNextPositionReward(choice_position):

    if choice_position % 8 == 0:
        next_position_reward = -1.
    elif choice_position % 2 == 0:
        next_position_reward = 1.
    else:
        next_position_reward = 0.

    return next_position_reward

def getNextPosition(position, action_reward1, action_reward2):

    if random.random() < RANDOM_FACTOR:
        if random.randint(0, 1) == 0:
            next_position = position + 1
        else:
            next_position = position + 2
    else:
        if action_reward1 > action_reward2:
            next_position = position + 1
        else:
            next_position = position + 2

    return next_position

if __name__ == "__main__":

    x_ph = tf.placeholder(tf.float32, [None, NUM_IMPUT])
    y_ph = tf.placeholder(tf.float32, [None, NUM_OUTPUT])

    y = inference(x_ph)
    loss = loss(y, y_ph)
    tf.scalar_summary("Loss", loss)
    train_step = optimize(loss)

    sess = tf.Session()
    summary_op = tf.merge_all_summaries()
    init = tf.initialize_all_variables()
    sess.run(init)
    summary_writer = tf.train.SummaryWriter(LOG_DIR, graph_def=sess.graph_def)

    for i in range(REPEAT_TIMES):
        position = 0.
        position_history = []
        reward_history = []

        while(True):
            if position >= GOAL:
                break

            choice1_position = position + 1.
            choice2_position = position + 2.

            next_position1_reward = getNextPositionReward(choice1_position)
            next_position2_reward = getNextPositionReward(choice2_position)

            reward1 = sess.run(y, feed_dict={x_ph: [[choice1_position]]})[0]
            reward2 = sess.run(y, feed_dict={x_ph: [[choice2_position]]})[0]

            action_reward1 = next_position1_reward + GAMMA * np.max(reward1)
            action_reward2 = next_position2_reward + GAMMA * np.max(reward2)

            position_history.append([position])
            reward_history.append([action_reward1, action_reward2])

            position = getNextPosition(position, action_reward1, action_reward2)

        sess.run(train_step, feed_dict={x_ph: position_history, y_ph: reward_history})
        summary_str = sess.run(summary_op, feed_dict={x_ph: position_history, y_ph: reward_history})
        summary_writer.add_summary(summary_str, i)
        if i % 10 == 0:
            print "Count: " + str(i)

    # TEST
    position = 0
    position_history = []
    while(True):
        if position >= GOAL:
                break

        position_history.append(position)

        rewards = sess.run(y, feed_dict={x_ph: [[position]]})[0]
        choice = np.argmax(rewards)
        if choice == 0:
            position += 1
        else:
            position += 2

    print position_history

ホントに

アドバイス、ダメ出しお待ちしております。

2016/04/27追記

dsanno様よりコメントでアドバイスいただきました。ありがとうございます。そちらを試してみます。

その1

この問題設定なら隠れ層なしでよく、
入力100値(現在地を表すone-hot vector)、出力2値のembedding_lookup1層だけで学習できると思います。

なるほどなるほど・・・
embedding_lookupは未だにイマイチ理解できていないので置いておいて、入力をone-hot vectorにして隠れ層無しでやってみます。

def inference(x_ph):

    with tf.name_scope('output'):
        weights = tf.Variable(tf.truncated_normal([NUM_IMPUT, NUM_OUTPUT], stddev=stddev), name='weights')
        biases = tf.Variable(tf.zeros([NUM_OUTPUT], dtype=tf.float32), name='biases')
        y = tf.matmul(x_ph, weights) + biases

下記はone-hot vectorを作るための関数。

def onehot(idx):
    idx = int(idx)
    array = np.zeros(GOAL)
    array[idx] = 1.
    return array

結果

[0, 2, 4, 6, 7, 9, 10, 12, 14, 15, 17, 18, 20, 22, 23, 25, 26, 28, 30, 32, 34, 36, 38, 39, 41, 42, 44, 46, 47, 49, 50, 52, 53, 54, 55, 57, 58, 60, 62, 63, 65, 66, 68, 70, 71, 73, 74, 76, 78, 79, 81, 82, 84, 86, 88, 90, 92, 94, 95, 97, 98, 99]

なんだかそれっぽい。完璧ではないけど、なるべく2の倍数を踏みつつ8の倍数は避けようとしている感が出ています。

その2

ReLuだと出力に上限がなく相性が悪そうなので、上限があるtanhやrelu6を活性化関数にする

入力は1つのまま、隠れ層のユニット数は100,100で試しました。
こちらは活性化関数無しとあまり変わらず。

結果

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98]

その3

解が周期的であることを前提としてよいなら活性化関数にtf.sinを使う(1段目はsinで2段目はreluといった使い方)
入力は1つのまま、隠れ層のユニット数は100,100で試しました。

def inference(x_ph):

    with tf.name_scope('hidden1'):
        weights = tf.Variable(tf.zeros([NUM_IMPUT, NUM_HIDDEN1], dtype=tf.float32), name='weights')
        biases = tf.Variable(tf.zeros([NUM_HIDDEN1], dtype=tf.float32), name='biases')
        hidden1 = tf.sin(tf.matmul(x_ph, weights) + biases)

    with tf.name_scope('hidden2'):
        weights = tf.Variable(tf.truncated_normal([NUM_HIDDEN1, NUM_HIDDEN2], stddev=stddev), name='weights')
        biases = tf.Variable(tf.zeros([NUM_HIDDEN2], dtype=tf.float32), name='biases')
        hidden2 = tf.nn.relu(tf.matmul(hidden1, weights) + biases)

    with tf.name_scope('output'):
        weights = tf.Variable(tf.truncated_normal([NUM_HIDDEN2, NUM_OUTPUT], stddev=stddev), name='weights')
        biases = tf.Variable(tf.zeros([NUM_OUTPUT], dtype=tf.float32), name='biases')
        y = tf.matmul(hidden2, weights) + biases

    return y

結果

[0, 2, 4, 6, 8, 9, 10, 12, 14, 15, 17, 18, 20, 22, 23, 25, 26, 28, 29, 30, 31, 33, 34, 36, 38, 39, 41, 43, 44, 46, 47, 49, 50, 51, 53, 55, 57, 58, 60, 62, 63, 64, 66, 68, 69, 71, 73, 74, 76, 78, 79, 81, 82, 83, 84, 85, 87, 89, 90, 92, 94, 95, 97, 98]

最初の8は踏んでますが、こちらもソコソコ頑張っている感が出ています。

ちょっと修正して、隠れ層のユニット数を500,100で調整。

結果

[0, 2, 4, 6, 7, 9, 10, 12, 14, 15, 17, 18, 20, 22, 23, 25, 26, 28, 30, 31, 33, 34, 36, 38, 39, 41, 42, 44, 46, 47, 49, 50, 52, 54, 55, 57, 58, 60, 62, 63, 65, 66, 68, 70, 71, 73, 74, 76, 78, 79, 81, 82, 84, 86, 87, 89, 90, 92, 94, 95, 97, 98]

完璧か。
sin()を使うとか全然考え付きもしませんでした。
dsanno様、改めてありがとうございました。

所感

人工知能と聞くと、とりあえずぶっこんでやれば何でもかんでも自分で考えてやってくれるという幻想を持っていましたが、ちゃんと入力データの特性とか作り手が考えてやらねばならないんだなと実感しました。