この記事は自作している強化学習フレームワーク SimpleDistributedRL の解説記事です。
はじめに
機械学習を実践で試すなら Kaggle はうってつけです。
強化学習の問題もいくつかあり、投稿方法を紹介したいと思います。
基本は本フレームワークを使用しての投稿ですが、使わない場合も参考になるかと思います。
ConnectX
Kaggleが用意している強化学習のチュートリアルコンペです。
(タイタニックの強化学習版です)
サイトはこちら:ConnectX
内容はコネクトフォーという特殊な4マス並べゲームと同じです。(Switchゲームの世界のアソビ大全にもあるやつです)
これの強いAIを作るのがコンペ内容となります。
投稿形式に関して
他のコンペは予測したcsvファイルを提出だったりとイメージしやすいんですが、強化学習のコンペは提出ファイルがpyファイルになるので少しハードルが高くなります。
pyファイル単体でも提出できますが、本記事ではより汎用的な外部ライブラリを含めた複数ファイルの提出方法を説明します。
(提出後にエージェントが実行される環境はオフラインなので、外部ライブラリを使う場合は提出ファイルに含める必要があります)
複数のファイルを提出する場合、tar.gz形式にして main.py
が含まれている必要があります。(これが実行される)
main.py
はエージェントを返す関数で終わる必要があるようです。
最小コードは以下です。
(関数名はなんでもよさそう)
def agent(observation, configuration):
action = 0
return action
ただ、実行ディレクトリと tar.gz が展開されるパスが違うようで、他のファイルを使う場合はそのパスを通す必要があります。
import sys
# 外部ライブラリの import path を通す場合
sys.path.append("/kaggle_simulations/agent/")
# main.py 以外のファイルを利用する場合
actor = build_actor()
actor.load_weights('/kaggle_simulations/agent/model.actor.h5')
def agent(observation, configuration):
action = actor.predict(...)
return action
・参考
https://www.kaggle.com/c/google-football/discussion/191257
1.ローカルから提出(windows)
ディレクトリ構造は以下です。
.
│ // ローカル用ファイル
├─ create_submission.bat // (オプション)提出ファイル作成用バッチ
├─ train.bat // (オプション)学習用バッチ
├─ train.py // 学習用py
│
│ // 提出用ファイル
├─ srl // 本フレームワーク
├─ model.py // model
├─ parameter.dat // parameter
├─ main.py // main
│
│ // 提出ファイル
└─ submission.tar.gz
create_submission.bat
は提出用ファイルを作成するバッチファイルで以下となります。
del /S *.pyc
tar -czvf submission.tar.gz main.py model.py parameter.dat srl
1行目のdelはpythonのキャッシュ(*.pyc ファイル)を消すコマンドです。
2行目に圧縮したいファイルを列挙します。(tarコマンドはwindows10から追加されているようです)
1-1. 本フレームワークのダウンロード
直接 github から zip をDLして、ライブラリ直下にある srl フォルダを配置してください。
直接ではなくコマンドから実行する場合は以下です。(コマンドプロンプト例)
git clone https://github.com/pocokhc/simple_distributed_rl.git
move simple_distributed_rl/srl srl
# (オプション)余分なフォルダを削除します
rd /s /q simple_distributed_rl
1-2. model.py
train.py
と main.py
で同じ設定・モデルを使いたいので model.py
で共有させます。
DQNの例を書いておきます。
import srl
from srl import runner
from srl.algorithms import dqn
from srl.envs import connectx
def create_config():
env_config = srl.EnvConfig("ConnectX")
rl_config = dqn.Config()
config = sequence.Config(env_config, rl_config)
return config
環境の ConnectX
はKaggle側で用意されたものではなく、本フレームワーク内で別途作成したものです。(コードはこちら)
1-3. train.py
学習用のコードです。
ローカルだけで実行し、parameter.dat
を出力します。
import os
import numpy as np
from model import create_config
from srl import runner
def main():
config = create_config()
path = os.path.join(os.path.dirname(__file__), "parameter.dat")
# 以前学習したパラメータをロード
config.rl_config.parameter_path = path
# --- set players
config.players = [None, None] # self play
# config.players = [None, "alphabeta8"] # alphabeta(8 depth)のCPUとplay
# --- model summary
config.model_summary()
# --- train
if True:
# sequence training
parameter, memory, history = runner.train(
config,
timeout=60 * 60 * 10, # 10h
enable_file_logger=False,
enable_evaluation=False,
)
else:
# distributed training
parameter, memory, history = runner.mp_train(
config,
runner.MpConfig(actor_num=1),
timeout=60 * 60 * 10, # 10h
enable_file_logger=False,
enable_evaluation=False,
)
# save parameter
parameter.save(path)
# --- evaluate
for players in [
[None, None],
[None, "random"],
["random", None],
[None, "alphabeta7"],
["alphabeta7", None],
]:
config.players = players
rewards = runner.evaluate(config, parameter, max_episodes=5)
print(f"{np.mean(rewards, axis=0)}, {players}")
# --- vs human
config.players = [None, "human"]
runner.render(config, parameter=parameter)
if __name__ == "__main__":
main()
毎回コマンドプロンプトを開いて実行はめんどくさいのでバッチファイルを作成します。
venv等、仮想環境を作成している場合は以下です。
call .\venv_path\Scripts\activate & python train.py
pause
そのままの場合
python train.py
pause
1-4. main.py
実行テストもかねてローカルでも動かせるようにしています。
また、ここのコードですが、kaggle側の環境とフレームワークとの対応が結構難しく、今後変更になる可能性があります。
(現状動作確認できているのはConnectXのみです)
import os
from typing import cast
from srl.envs import connectx
from model import create_config
# ディレクトリがあるかないかで kaggle 環境かローカルかを調べる
KAGGLE_PATH = "/kaggle_simulations/agent/"
if os.path.isdir(KAGGLE_PATH):
is_local = False
path = os.path.join(KAGGLE_PATH, "parameter.dat")
else:
is_local = True
path = os.path.join(os.path.dirname(__file__), "parameter.dat")
# model.py から configを取得
config = create_config()
config.rl_config.parameter_path = path
# 実行用にenv,parameter,workerを作成
env = config.make_env()
org_env = cast(connectx.ConnectX, env.get_original_env())
parameter = config.make_parameter()
worker = config.make_worker(parameter)
def my_agent(observation, configuration):
step = observation.step
# 1エピソードの最初にresetを呼ぶ必要がある
# connectx は先行なら step==0、後攻なら step==1 がエピソードの最初
if step == 0 or step == 1:
# envを直接reset
env.direct_reset(observation, configuration)
# workerもreset
worker.on_reset(env, org_env.player_index)
# envを直接step
env.direct_step(observation, configuration)
# workerのpolicyを取得
action = worker.policy(env)
return action
# ローカル確認用
# 関数の下でも if で分けるなら大丈夫な模様
if is_local:
import time
import kaggle_environments # pip install kaggle_environments
import numpy as np
config.model_summary()
# kaggleのライブラリで動作するか検証
kaggle_env = kaggle_environments.make("connectx", debug=True)
for players in [
[my_agent, "random"],
["random", my_agent],
[my_agent, "negamax"],
["negamax", my_agent],
]:
# 10episode実行
rewards = []
t0 = time.time()
for _ in range(10):
steps = kaggle_env.run(players)
rewards.append([steps[-1][0]["reward"], steps[-1][1]["reward"]])
# 結果
rewards = np.mean(rewards, axis=0)
print(f"rewards {rewards}, {time.time() - t0:.3f}s, {players}")
後は create_submission.bat
を実行し、main.py
,model.py
,parameter.dat
,srl
を submission.tar.gz
に固めてこれを提出するだけです。
2. kaggle notebook の提出方法
Google Colaboratory でも動くと思います。
基本的には上と同じで、main.py
,model.py
,parameter.dat
,srl
を作成し、submission.tar.gz
に固めるだけです。
本記事のnotebookコードはこちらです。
(後述する connectx.LayerProcessor
と MyConnectXWorker
を適用した後のコードです)
ノートブック作成後の提出手順は以下です。
-
実行
Saveすると裏で勝手に実行されるので終わるまで待ちます。
(記事コードは10時間学習するので10時間待ちます)
(左下から動いてるか確認できます) -
結果を確認
実行が終わったら、Save Version の右側の数字をクリック → Go to Viewer で実行後のページに移動できます。
必要に応じて実行結果を確認します。 -
提出
今回はsubmission.tar.gz
を提出するので Viewer のタブから
Data → submission.tar.gz → Submit を選択します。
これで提出完了です。
コンペに戻って My Submissions を見てみると提出できているかと思います。
connectx.LayerProcessor による前処理
本フレームワークの rl_config では前処理(processors)を別途定義できます。
rl_config = dqn.Config()
rl_config.processors = [connectx.LayerProcessor()]
元の connectx の状態は、石がなければ0、player1なら1、player2なら2、の値を取る columns×rows の1次元の配列です。
これに connectx.LayerProcessor()
を通すと以下に変更します。
- shape=(3, columns, rows)の3次元の配列(2次元+3層)
- 1層目:player1の石が置いてある場所が1
- 2層目:player2の石が置いてある場所が1
- 3層目:playerを表す層、手番がplayer1なら全て0、player2なら全て1
層への変換のアイデアはAlphaZeroから持ってきています。
Workerの拡張(ルールベースの導入)
上記はDQNのみですべてを学習していますが、実環境に応用するには一部ルールベースを取り入れたくなります。
特にkaggleみたいなコンペでは強くするためにより拡張したいところです。
本フレームワークではWorkerの拡張機能を用意しており、その使い方の説明となります。(更新によりIFは更新になる可能性があります)
全体像は以下です。
from srl.base.env.base import EnvRun
from srl.base.rl.worker import ExtendWorker, WorkerRun
from srl.base.define import EnvAction
from typing import Tuple
# ExtendWorker を継承して作成します(中身は後述)
class MyConnectXWorker(ExtendWorker):
# init の引数は親クラスにそのまま渡します
def __init__(self, *args):
super().__init__(*args)
# self.rl_worker に元の強化学習が入ります。
# エピソードの最初に呼ばれます
def call_on_reset(self, env: EnvRun, worker_run: WorkerRun) -> dict:
raise NotImplementedError()
# アクションを返します
# self.rl_worker.policy(env) を必要に応じて実行します
def call_policy(self, env: EnvRun, worker_run: WorkerRun) -> Tuple[EnvAction, dict]:
raise NotImplementedError()
def create_config():
env_config = srl.EnvConfig("ConnectX")
rl_config = dqn.Config()
rl_config.processors = [connectx.LayerProcessor()]
# extend_workerに自作したクラスをいれます
rl_config.extend_worker = MyConnectXWorker
config = sequence.Config(env_config, rl_config)
return config
MyConnectXWorkerの作成例は以下です。
- 1ターン目:探索率を0.5に変更
- 2ターン目以降:MinMax法で探索
- 最善手が1個ならそのアクションを実施
- そうじゃない場合はDQNでアクションを決定
from typing import cast, Tuple
from srl.envs import connectx
from srl.base.define import EnvAction
from srl.base.env.base import EnvRun
from srl.base.rl.worker import ExtendWorker, WorkerRun
class MyConnectXWorker(ExtendWorker):
def __init__(self, *args):
super().__init__(*args)
# rlのconfig
self.rl_config = cast(dqn.Config, self.rl_worker.worker.config)
# MinMaxの探索数
self.max_depth = 4
def call_on_reset(self, env: EnvRun, worker_run: WorkerRun) -> dict:
self.is_rl = False
self.scores = [0] * env.action_space.n
self.minmax_time = 0
self.minmax_count = 0
return {}
def call_policy(self, env: EnvRun, worker_run: WorkerRun) -> Tuple[EnvAction, dict]:
if env.step_num == 0:
# --- 先行1ターン目
# DQNの探索率を0.5にして実行
self.rl_config.epsilon = 0.5
action = self.rl_worker.policy(env)
self.is_rl = True
return action, {}
# --- 2ターン目以降
# DQNの探索率は0.1に戻す
self.rl_config.epsilon = 0.1
# 元の環境を取得
env_org = cast(connectx.ConnectX, env.get_original_env())
# MinMaxを実施、環境は壊さないようにcopyで渡す
self.minmax_count = 0
t0 = time.time()
self.scores = self._minmax(env_org.copy())
self.minmax_time = time.time() - t0
# 最大スコア
max_score = np.max(self.scores)
max_count = np.count_nonzero(self.scores == max_score)
# 最大数が1個ならそのアクションを実施
if max_count == 1:
self.is_rl = False
action = int(np.argmax(self.scores))
return action, {}
# 最大値以外のアクションを選択しないようにする(invalid_actionsに追加)
new_invalid_actions = [a for a in range(env.action_space.n) if self.scores[a] != max_score]
env.add_invalid_actions(new_invalid_actions, self.player_index)
# rl実施
action = self.rl_worker.policy(env)
self.is_rl = True
return action, {}
# MinMax
def _minmax(self, env: connectx.ConnectX, depth: int = 0):
if depth == self.max_depth:
return [0] * env.action_space.n
self.minmax_count += 1
# 有効なアクションを取得
valid_actions = env.get_valid_actions(env.player_index)
# env復元用に今の状態を保存
env_dat = env.backup()
if env.player_index == self.player_index:
# 自分の番
scores = [-9.0 for _ in range(env.action_space.n)]
for a in valid_actions:
# envを復元
env.restore(env_dat)
# env stepを実施
_, r1, r2, done, _ = env.call_step(a)
if done:
# 終了状態なら報酬をスコアにする
if self.player_index == 0:
scores[a] = r1
else:
scores[a] = r2
else:
# 次のstepのスコアを取得
n_scores = self._minmax(env, depth + 1)
scores[a] = np.min(n_scores) # 相手の番は最小を選択
else:
# 相手の番
scores = [9.0 for _ in range(env.action_space.n)]
for a in valid_actions:
env.restore(env_dat)
_, r1, r2, done, _ = env.call_step(a)
if done:
if self.player_index == 0:
scores[a] = r1
else:
scores[a] = r2
else:
n_scores = self._minmax(env, depth + 1)
scores[a] = np.max(n_scores) # 自分の番は最大を選択
return scores
# 可視化用
def call_render(self, env: EnvRun, worker_run: WorkerRun) -> None:
print(f"- MinMax count: {self.minmax_count}, {self.minmax_time:.3f}s -")
print("+---+---+---+---+---+---+---+")
s = "|"
for a in range(env.action_space.n):
s += "{:2d} |".format(int(self.scores[a]))
print(s)
print("+---+---+---+---+---+---+---+")
if self.is_rl:
self.rl_worker.render(env)
Envの作成
新規に Env を作成する方法はこちらまたはフレームワーク内の他のenvのコードを見てください。
これにKaggle用に追加した要素を説明します。
Kaggle用に必要な関数は以下です。
class MyEnv(EnvBase):
def direct_reset(self, *args, **kwargs) -> Tuple[EnvObservation, List[int], Info]:
raise NotImplementedError()
def direct_step(self, *args, **kwargs) -> Tuple[EnvObservation, List[float], bool, List[int], Info]:
raise NotImplementedError()
direct_reset と direct_step を定義し、直接値を書き換えます。
引数は任意に変更可能です。
戻り値は元の call_reset と call_step と同じです。
ConnextXの例は以下です。
class ConnectX(TurnBase2Player):
def call_direct_reset(self, observation, configuration) -> Tuple[np.ndarray, dict]:
self._player_index = observation.mark - 1
self.board = observation.board[:]
return np.array(self.board), {}
def call_direct_step(self, observation, configuration) -> Tuple[np.ndarray, float, float, bool, dict]:
self._player_index = observation.mark - 1
self.board = observation.board[:]
return np.array(self.board), 0, 0, False, {}
これでフレームワーク外から直接resetとstepを実行できるようになります。
提出結果
kaggleの結果は他のユーザが提出したAIと戦ってその結果を元に決まります。(なので常に変動します)
オンラインゲームのレートみたいな感じですね。
今回作成した MinMax(深さ4)+DQN のアルゴリズムを10時間学習させた結果を提出しました。
スコアが安定したあたりのスコアは以下です。
2022/6/25現在 205エントリー中24位でした。