12
15

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

【強化学習】KaggleのConnectXへの投稿方法を解説(スコア900越えコード付き)

Last updated at Posted at 2022-06-25

この記事は自作している強化学習フレームワーク SimpleDistributedRL の解説記事です。

はじめに

機械学習を実践で試すなら Kaggle はうってつけです。
強化学習の問題もいくつかあり、投稿方法を紹介したいと思います。
基本は本フレームワークを使用しての投稿ですが、使わない場合も参考になるかと思います。

ConnectX

Kaggleが用意している強化学習のチュートリアルコンペです。
(タイタニックの強化学習版です)

サイトはこちら:ConnectX
内容はコネクトフォーという特殊な4マス並べゲームと同じです。(Switchゲームの世界のアソビ大全にもあるやつです)

image.png

これの強いAIを作るのがコンペ内容となります。

投稿形式に関して

他のコンペは予測したcsvファイルを提出だったりとイメージしやすいんですが、強化学習のコンペは提出ファイルがpyファイルになるので少しハードルが高くなります。
pyファイル単体でも提出できますが、本記事ではより汎用的な外部ライブラリを含めた複数ファイルの提出方法を説明します。
(提出後にエージェントが実行される環境はオフラインなので、外部ライブラリを使う場合は提出ファイルに含める必要があります)

複数のファイルを提出する場合、tar.gz形式にして main.py が含まれている必要があります。(これが実行される)
main.py はエージェントを返す関数で終わる必要があるようです。

最小コードは以下です。
(関数名はなんでもよさそう)

main.py
def agent(observation, configuration):
    action = 0
    return action

ただ、実行ディレクトリと tar.gz が展開されるパスが違うようで、他のファイルを使う場合はそのパスを通す必要があります。

main.py
import sys

# 外部ライブラリの import path を通す場合
sys.path.append("/kaggle_simulations/agent/") 

# main.py 以外のファイルを利用する場合
actor = build_actor()
actor.load_weights('/kaggle_simulations/agent/model.actor.h5')

def agent(observation, configuration):
    action = actor.predict(...)
    return action

・参考
https://www.kaggle.com/c/google-football/discussion/191257

1.ローカルから提出(windows)

ディレクトリ構造は以下です。

.
│ // ローカル用ファイル
├─ create_submission.bat  // (オプション)提出ファイル作成用バッチ
├─ train.bat              // (オプション)学習用バッチ
├─ train.py               // 学習用py
│
│ // 提出用ファイル
├─ srl           // 本フレームワーク
├─ model.py      // model
├─ parameter.dat // parameter
├─ main.py       // main
│
│ // 提出ファイル
└─ submission.tar.gz

create_submission.bat は提出用ファイルを作成するバッチファイルで以下となります。

create_submission.bat
del /S *.pyc
tar -czvf submission.tar.gz main.py model.py parameter.dat srl

1行目のdelはpythonのキャッシュ(*.pyc ファイル)を消すコマンドです。
2行目に圧縮したいファイルを列挙します。(tarコマンドはwindows10から追加されているようです)

1-1. 本フレームワークのダウンロード

直接 github から zip をDLして、ライブラリ直下にある srl フォルダを配置してください。
直接ではなくコマンドから実行する場合は以下です。(コマンドプロンプト例)

git clone https://github.com/pocokhc/simple_distributed_rl.git
move simple_distributed_rl/srl srl

# (オプション)余分なフォルダを削除します
rd /s /q simple_distributed_rl

1-2. model.py

train.pymain.py で同じ設定・モデルを使いたいので model.py で共有させます。
DQNの例を書いておきます。

model.py
import srl
from srl import runner

from srl.algorithms import dqn
from srl.envs import connectx

def create_config():
    env_config = srl.EnvConfig("ConnectX")
    rl_config = dqn.Config()

    config = sequence.Config(env_config, rl_config)
    return config

環境の ConnectX はKaggle側で用意されたものではなく、本フレームワーク内で別途作成したものです。(コードはこちら

1-3. train.py

学習用のコードです。
ローカルだけで実行し、parameter.dat を出力します。

train.py
import os

import numpy as np

from model import create_config
from srl import runner


def main():
    config = create_config()
    path = os.path.join(os.path.dirname(__file__), "parameter.dat")

    # 以前学習したパラメータをロード
    config.rl_config.parameter_path = path

    # --- set players
    config.players = [None, None]  # self play
    # config.players = [None, "alphabeta8"]  # alphabeta(8 depth)のCPUとplay

    # --- model summary
    config.model_summary()

    # --- train
    if True:
        # sequence training
        parameter, memory, history = runner.train(
            config,
            timeout=60 * 60 * 10,  # 10h
            enable_file_logger=False,
            enable_evaluation=False,
        )
    else:
        # distributed training
        parameter, memory, history = runner.mp_train(
            config,
            runner.MpConfig(actor_num=1),
            timeout=60 * 60 * 10,  # 10h
            enable_file_logger=False,
            enable_evaluation=False,
        )

    # save parameter
    parameter.save(path)

    # --- evaluate
    for players in [
        [None, None],
        [None, "random"],
        ["random", None],
        [None, "alphabeta7"],
        ["alphabeta7", None],
    ]:
        config.players = players
        rewards = runner.evaluate(config, parameter, max_episodes=5)
        print(f"{np.mean(rewards, axis=0)}, {players}")

    # --- vs human
    config.players = [None, "human"]
    runner.render(config, parameter=parameter)


if __name__ == "__main__":
    main()

毎回コマンドプロンプトを開いて実行はめんどくさいのでバッチファイルを作成します。
venv等、仮想環境を作成している場合は以下です。

train.bat
call .\venv_path\Scripts\activate & python train.py
pause

そのままの場合

train.bat
python train.py
pause

1-4. main.py

実行テストもかねてローカルでも動かせるようにしています。

また、ここのコードですが、kaggle側の環境とフレームワークとの対応が結構難しく、今後変更になる可能性があります。
(現状動作確認できているのはConnectXのみです)

import os
from typing import cast

from srl.envs import connectx
from model import create_config

# ディレクトリがあるかないかで kaggle 環境かローカルかを調べる
KAGGLE_PATH = "/kaggle_simulations/agent/"
if os.path.isdir(KAGGLE_PATH):
    is_local = False
    path = os.path.join(KAGGLE_PATH, "parameter.dat")
else:
    is_local = True
    path = os.path.join(os.path.dirname(__file__), "parameter.dat")

# model.py から configを取得
config = create_config()
config.rl_config.parameter_path = path

# 実行用にenv,parameter,workerを作成
env = config.make_env()
org_env = cast(connectx.ConnectX, env.get_original_env())
parameter = config.make_parameter()
worker = config.make_worker(parameter)


def my_agent(observation, configuration):
    step = observation.step

    # 1エピソードの最初にresetを呼ぶ必要がある
    # connectx は先行なら step==0、後攻なら step==1 がエピソードの最初
    if step == 0 or step == 1:
        # envを直接reset
        env.direct_reset(observation, configuration)
        # workerもreset
        worker.on_reset(env, org_env.player_index)

    # envを直接step
    env.direct_step(observation, configuration)

    # workerのpolicyを取得
    action = worker.policy(env)
    return action


# ローカル確認用
# 関数の下でも if で分けるなら大丈夫な模様
if is_local:
    import time

    import kaggle_environments  # pip install kaggle_environments
    import numpy as np

    config.model_summary()

    # kaggleのライブラリで動作するか検証
    kaggle_env = kaggle_environments.make("connectx", debug=True)
    for players in [
        [my_agent, "random"],
        ["random", my_agent],
        [my_agent, "negamax"],
        ["negamax", my_agent],
    ]:
        # 10episode実行
        rewards = []
        t0 = time.time()
        for _ in range(10):
            steps = kaggle_env.run(players)
            rewards.append([steps[-1][0]["reward"], steps[-1][1]["reward"]])

        # 結果
        rewards = np.mean(rewards, axis=0)
        print(f"rewards {rewards}, {time.time() - t0:.3f}s, {players}")

後は create_submission.bat を実行し、main.py,model.py,parameter.dat,srlsubmission.tar.gz に固めてこれを提出するだけです。

2. kaggle notebook の提出方法

Google Colaboratory でも動くと思います。
基本的には上と同じで、main.py,model.py,parameter.dat,srl を作成し、submission.tar.gz に固めるだけです。

本記事のnotebookコードはこちらです。
(後述する connectx.LayerProcessorMyConnectXWorker を適用した後のコードです)

ノートブック作成後の提出手順は以下です。

  1. Save Version
    ノートブック右上の Save Version を押して保存します。
    kaggle2.PNG

  2. 実行
    Saveすると裏で勝手に実行されるので終わるまで待ちます。
    (記事コードは10時間学習するので10時間待ちます)
    (左下から動いてるか確認できます)

  3. 結果を確認
    実行が終わったら、Save Version の右側の数字をクリック → Go to Viewer で実行後のページに移動できます。
    kaggle4.PNG
    kaggle5.PNG
    必要に応じて実行結果を確認します。

  4. 提出
    今回は submission.tar.gz を提出するので Viewer のタブから
    Data → submission.tar.gz → Submit を選択します。
    kaggle6.PNG
    kaggle7.PNG
    これで提出完了です。
    コンペに戻って My Submissions を見てみると提出できているかと思います。

connectx.LayerProcessor による前処理

本フレームワークの rl_config では前処理(processors)を別途定義できます。

rl_config = dqn.Config()
rl_config.processors = [connectx.LayerProcessor()]

元の connectx の状態は、石がなければ0、player1なら1、player2なら2、の値を取る columns×rows の1次元の配列です。
これに connectx.LayerProcessor() を通すと以下に変更します。

  • shape=(3, columns, rows)の3次元の配列(2次元+3層)
  • 1層目:player1の石が置いてある場所が1
  • 2層目:player2の石が置いてある場所が1
  • 3層目:playerを表す層、手番がplayer1なら全て0、player2なら全て1

層への変換のアイデアはAlphaZeroから持ってきています。

Workerの拡張(ルールベースの導入)

上記はDQNのみですべてを学習していますが、実環境に応用するには一部ルールベースを取り入れたくなります。
特にkaggleみたいなコンペでは強くするためにより拡張したいところです。

本フレームワークではWorkerの拡張機能を用意しており、その使い方の説明となります。(更新によりIFは更新になる可能性があります)
全体像は以下です。

from srl.base.env.base import EnvRun
from srl.base.rl.worker import ExtendWorker, WorkerRun
from srl.base.define import EnvAction
from typing import Tuple

# ExtendWorker を継承して作成します(中身は後述)
class MyConnectXWorker(ExtendWorker):

    # init の引数は親クラスにそのまま渡します
    def __init__(self, *args):
        super().__init__(*args)
        # self.rl_worker に元の強化学習が入ります。

    # エピソードの最初に呼ばれます
    def call_on_reset(self, env: EnvRun, worker_run: WorkerRun) -> dict:
        raise NotImplementedError()

    # アクションを返します
    # self.rl_worker.policy(env) を必要に応じて実行します
    def call_policy(self, env: EnvRun, worker_run: WorkerRun) -> Tuple[EnvAction, dict]:
        raise NotImplementedError()


def create_config():
    env_config = srl.EnvConfig("ConnectX")

    rl_config = dqn.Config()
    rl_config.processors = [connectx.LayerProcessor()]

    # extend_workerに自作したクラスをいれます
    rl_config.extend_worker = MyConnectXWorker

    config = sequence.Config(env_config, rl_config)
    return config

MyConnectXWorkerの作成例は以下です。

  • 1ターン目:探索率を0.5に変更
  • 2ターン目以降:MinMax法で探索
    • 最善手が1個ならそのアクションを実施
    • そうじゃない場合はDQNでアクションを決定
from typing import cast, Tuple
from srl.envs import connectx
from srl.base.define import EnvAction
from srl.base.env.base import EnvRun
from srl.base.rl.worker import ExtendWorker, WorkerRun

class MyConnectXWorker(ExtendWorker):
    def __init__(self, *args):
        super().__init__(*args)

        # rlのconfig
        self.rl_config = cast(dqn.Config, self.rl_worker.worker.config)

        # MinMaxの探索数
        self.max_depth = 4

    def call_on_reset(self, env: EnvRun, worker_run: WorkerRun) -> dict:
        self.is_rl = False
        self.scores = [0] * env.action_space.n
        self.minmax_time = 0
        self.minmax_count = 0
        return {}

    def call_policy(self, env: EnvRun, worker_run: WorkerRun) -> Tuple[EnvAction, dict]:
        if env.step_num == 0:
            # --- 先行1ターン目
            # DQNの探索率を0.5にして実行
            self.rl_config.epsilon = 0.5
            action = self.rl_worker.policy(env)
            self.is_rl = True
            return action, {}

        # --- 2ターン目以降
        # DQNの探索率は0.1に戻す
        self.rl_config.epsilon = 0.1

        # 元の環境を取得
        env_org = cast(connectx.ConnectX, env.get_original_env())

        # MinMaxを実施、環境は壊さないようにcopyで渡す
        self.minmax_count = 0
        t0 = time.time()
        self.scores = self._minmax(env_org.copy())
        self.minmax_time = time.time() - t0

        # 最大スコア
        max_score = np.max(self.scores)
        max_count = np.count_nonzero(self.scores == max_score)

        # 最大数が1個ならそのアクションを実施
        if max_count == 1:
            self.is_rl = False
            action = int(np.argmax(self.scores))
            return action, {}

        # 最大値以外のアクションを選択しないようにする(invalid_actionsに追加)
        new_invalid_actions = [a for a in range(env.action_space.n) if self.scores[a] != max_score]
        env.add_invalid_actions(new_invalid_actions, self.player_index)

        # rl実施
        action = self.rl_worker.policy(env)
        self.is_rl = True

        return action, {}

    # MinMax
    def _minmax(self, env: connectx.ConnectX, depth: int = 0):
        if depth == self.max_depth:
            return [0] * env.action_space.n

        self.minmax_count += 1

        # 有効なアクションを取得
        valid_actions = env.get_valid_actions(env.player_index)

        # env復元用に今の状態を保存
        env_dat = env.backup()

        if env.player_index == self.player_index:
            # 自分の番
            scores = [-9.0 for _ in range(env.action_space.n)]
            for a in valid_actions:
                # envを復元
                env.restore(env_dat)

                # env stepを実施
                _, r1, r2, done, _ = env.call_step(a)
                if done:
                    # 終了状態なら報酬をスコアにする
                    if self.player_index == 0:
                        scores[a] = r1
                    else:
                        scores[a] = r2
                else:
                    # 次のstepのスコアを取得
                    n_scores = self._minmax(env, depth + 1)
                    scores[a] = np.min(n_scores)  # 相手の番は最小を選択

        else:
            # 相手の番
            scores = [9.0 for _ in range(env.action_space.n)]
            for a in valid_actions:
                env.restore(env_dat)

                _, r1, r2, done, _ = env.call_step(a)
                if done:
                    if self.player_index == 0:
                        scores[a] = r1
                    else:
                        scores[a] = r2
                else:
                    n_scores = self._minmax(env, depth + 1)
                    scores[a] = np.max(n_scores)  # 自分の番は最大を選択

        return scores

    # 可視化用
    def call_render(self, env: EnvRun, worker_run: WorkerRun) -> None:
        print(f"- MinMax count: {self.minmax_count}, {self.minmax_time:.3f}s -")
        print("+---+---+---+---+---+---+---+")
        s = "|"
        for a in range(env.action_space.n):
            s += "{:2d} |".format(int(self.scores[a]))
        print(s)
        print("+---+---+---+---+---+---+---+")
        if self.is_rl:
            self.rl_worker.render(env)

Envの作成

新規に Env を作成する方法はこちらまたはフレームワーク内の他のenvのコードを見てください。
これにKaggle用に追加した要素を説明します。

Kaggle用に必要な関数は以下です。

class MyEnv(EnvBase):
    def direct_reset(self, *args, **kwargs) -> Tuple[EnvObservation, List[int], Info]:
        raise NotImplementedError()

    def direct_step(self, *args, **kwargs) -> Tuple[EnvObservation, List[float], bool, List[int], Info]:
        raise NotImplementedError()

direct_reset と direct_step を定義し、直接値を書き換えます。
引数は任意に変更可能です。
戻り値は元の call_reset と call_step と同じです。

ConnextXの例は以下です。

class ConnectX(TurnBase2Player):
    def call_direct_reset(self, observation, configuration) -> Tuple[np.ndarray, dict]:
        self._player_index = observation.mark - 1
        self.board = observation.board[:]
        return np.array(self.board), {}

    def call_direct_step(self, observation, configuration) -> Tuple[np.ndarray, float, float, bool, dict]:
        self._player_index = observation.mark - 1
        self.board = observation.board[:]
        return np.array(self.board), 0, 0, False, {}

これでフレームワーク外から直接resetとstepを実行できるようになります。

提出結果

kaggleの結果は他のユーザが提出したAIと戦ってその結果を元に決まります。(なので常に変動します)
オンラインゲームのレートみたいな感じですね。

今回作成した MinMax(深さ4)+DQN のアルゴリズムを10時間学習させた結果を提出しました。
スコアが安定したあたりのスコアは以下です。

kaggle9.PNG

2022/6/25現在 205エントリー中24位でした。

12
15
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
15

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?