Python
DeepLearning
強化学習
人工知能
TensorFlow

[How to!] スーパーマリオをTensorflowで学習&プレイしよう!!

More than 1 year has passed since last update.

はじめに

前回、スーパーファミコンのソフトを学習させる内容でいろいろ要望が多かったので、今回はMacbook等で動くファミコン学習用のプログラムをシェアします。

実験で動作できるようにGitに上げときました。
Downloadする場合はStarください!
https://github.com/tsunaki00/super_mario

17038850_1863772750540489_5777848282422220640_o.jpg

実験環境

以下のマシン環境でテストしました。

環境
PC Macbook PRO 2016 OSX
CPU i7
MEM 16GB
開発言語 python

※ Macbook 12でも動きました

補足

Tensorflowでの学習は適当に作ってありますので、改善をお願いいたします。

環境についての注意

実行すると以下のエラーが発生するのでライブラリを修正する必要があります。

$ python3 start.py 
  Traceback (most recent call last):
    File "start.py", line 22, in <module>
      import gym_pull
    File "/usr/local/lib/python3.6/site-packages/gym_pull/__init__.py", line 41, in <module>
      import gym_pull.monitoring.monitor
    File "/usr/local/lib/python3.6/site-packages/gym_pull/monitoring/monitor.py", line 10, in <module>
      class Monitor(gym.monitoring.monitor.Monitor):
  AttributeError: module 'gym.monitoring' has no attribute 'monitor'

↓ 以下に修正

$ vi /usr/local/lib/python3.6/site-packages/gym_pull/monitoring/monitor.py
   :
   :
  class Monitor(gym.monitoring.monitor.Monitor):
   ↓
  class Monitor(gym.monitoring.monitor_manager.MonitorManager):

プログラムについての簡単な説明

学習方法は強化学習(Reinforcement Learning)で行います。
教師あり学習とも教師なし学習とも少し違い実行したアクションの評価を学習します。

[参考] Deep Q-Network

以下に記事にDQNの説明がありました。
DQNの生い立ち + Deep Q-NetworkをChainerで書いた

[参考] 人の操作を学習する。

自分達で実行した結果を評価に学習させると以下の様に最速マリオを作ることもできました!
評価は距離、時間、スコアです。

[【人工知能】人工知能SIVAにマリオをプレイさせてみた!【WORLD1-1】]
https://www.youtube.com/watch?v=T4dO1GKPx4Y

ソースコード

  • Actionはきちんと絞ったほうがいいです。
  • ミニバッチ処理にできるように配列にしていますので、適宜変更してください。
  • たまにランダムを入れるとさらに良くなります!
import tensorflow as tf
import gym
import gym_pull
import ppaquette_gym_super_mario
from gym.wrappers import Monitor
import random
import numpy as np
class Game :

  def __init__(self):
    self.episode_count = 10000;
    ## select stage
    self.env = gym.make('ppaquette/SuperMarioBros-1-1-Tiles-v0')

  def weight_variable(self, shape):
    initial = tf.truncated_normal(shape, stddev = 0.01)
    return tf.Variable(initial)


  def bias_variable(self, shape):
    initial = tf.constant(0.01, shape = shape)
    return tf.Variable(initial)

  def conv2d(self, x, W, stride):
    return tf.nn.conv2d(x, W, strides = [1, stride, stride, 1], padding = "SAME")

  def max_pool_2x2(self, x):
    return tf.nn.max_pool(x, ksize = [1, 2, 2, 1], strides = [1, 2, 2, 1], padding = "SAME")


  def create_network(self, action_size):
    #2層のレイヤーにする
    W_conv1 = self.weight_variable([8, 8, 1, 16])
    b_conv1 = self.bias_variable([16])
    W_conv2 = self.weight_variable([4, 4, 16, 32])
    b_conv2 = self.bias_variable([32])
    W_conv3 = self.weight_variable([4, 4, 32, 64])
    b_conv3 = self.bias_variable([64])
    W_fc1 = self.weight_variable([512, action_size])
    b_fc1 = self.bias_variable([action_size])
    s = tf.placeholder("float", [None, 13, 16, 1])
    # hidden layers
    h_conv1 = tf.nn.relu(self.conv2d(s, W_conv1, 2) + b_conv1)
    h_conv2 = tf.nn.relu(self.conv2d(h_conv1, W_conv2, 2) + b_conv2)
    h_conv3 = tf.nn.relu(self.conv2d(h_conv2, W_conv3, 1) + b_conv3)
    h_conv3_flat = tf.reshape(h_conv3, [-1, 512])
    readout = tf.matmul(h_conv3_flat, W_fc1) + b_fc1
    return s, readout 


  def play_game(self) :
    action_list = []
    for i in range(64) :
      command = format(i, 'b')
      command = '{0:06d}'.format(int(command))
      actions = []
      for cmd in list(command) :
        actions.append(int(cmd))
      action_list.append(actions)
    sess = tf.InteractiveSession()
    s, readout = self.create_network(len(action_list))
    a = tf.placeholder("float", [None, len(action_list)])
    y = tf.placeholder("float", [None, 1])
    readout_action = tf.reduce_sum(tf.multiply(readout, a), reduction_indices = 1)
    cost = tf.reduce_mean(tf.square(y - readout_action))
    train_step = tf.train.AdamOptimizer(1e-6).minimize(cost)
    saver = tf.train.Saver()
    sess.run(tf.initialize_all_variables())
    checkpoint = tf.train.get_checkpoint_state("./saved_networks/checkpoints")
    if checkpoint and checkpoint.model_checkpoint_path:
      saver.restore(sess, checkpoint.model_checkpoint_path)
      print ("Successfully loaded:", checkpoint.model_checkpoint_path)
    else:
      print ("Could not find old network weights")
    for episode in range(self.episode_count):
      self.env.reset()
      total_score = 0
      distance = 0
      is_finished = False
      actions, rewards, images = [], [] ,[]
      while is_finished == False :
        # 画面左上のドットを取得します(画像でやる場合はself.env.screenでとれます)
        screen = np.reshape(self.env.tiles, (13, 16, 1))
        if episode < 10 :
          action_index = random.randint(0, len(action_list) - 1)
        else :
          readout_t = readout.eval(feed_dict = {s : [screen]})[0]
          action_index = np.argmax(readout_t)
        # (1) 画面のマリオに処理してます(self.env.step)
        obs, reward, is_finished, info = self.env.step(action_list[action_index])
        ## MiniBatch化するように配列に
        action_array = np.zeros(len(action_list))
        action_array[action_index] = 1
        actions.append(action_array)
        # (2) 報酬を与えます
        rewards.append([float(info['distance'])])
        images.append(screen)

        train_step.run(feed_dict = {
          a : actions, y : rewards, s : images
        })
        print('Episode : ', episode, 'Actions : ', action_list[action_index], 'Rewards', reward)
        actions, rewards, images = [], [] ,[]

        self.env.render()
      saver.save(sess, 'saved_networks/model-dqn', global_step = episode)

if __name__ == '__main__' :
  game = Game()
  game.play_game()

実行

$ python3 start.py

最後に

Docker + ブラウザ版は別途書きます。

昨今、AI等の技術本など多く出てますが、なかなかハードルが高いと感じますよね・・・・
こういう身近なもので練習するといいかと思います!

GameAIやPythonについてプログラムについての詳細には別途、勉強会を実施します。
よかったら参加してください、

Tech Twitter始めました。随時更新していきますのでよかったらフォローお願いします。
https://twitter.com/gauss_club