8
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

【強化学習】分散強化学習を解説・実装(GORILA・A3C・Ape-X)

Last updated at Posted at 2022-06-12

この記事は自作している強化学習フレームワーク SimpleDistributedRL の解説記事です。

フレームワークのタイトルにも入れている分散強化学習についての解説です。

はじめに

プログラムを並列処理させて早くしようというのはよくある話で、それを強化学習にも応用しようというのは自然な流れです。
ただ、強化学習では学習したパラメータを元にアクションを決めるので単純に並列処理させることはできません。

分散強化学習の流れとして、GORILA・A3C・Ape-Xのアーキテクチャを紹介し、最後に本フレームワークのアーキテクチャを紹介します。

GORILA(General Reinforcement Learning Architecture)

論文:Massively Parallel Methods for Deep Reinforcement Learning

GORILLAではないので注意です(笑)
(Oの位置がおかしいあたり名前で遊んでますねw)

分散強化学習を実装して成果をだした論文としては多分初出です。
提案されたアーキテクチャは以下です。

img7.PNG

各役割を箇条書きします。

  • Actor
    • Actorは複数インスタンス化され、それぞれが独自の環境を持つ
    • 各Actorは環境から経験を取得し、メモリ(ReplayMemory)に経験を送る
    • 定期的にパラメータサーバからQ-Networkのパラメータを同期
  • ReplayMemory
    • 2種類の形態が考えられる
    • 1、Actor毎にローカルにメモリを持つ方法(Actor数分メモリが出来る)
    • 2、グローバルメモリを1つ持つ方法(通信のオーバーヘッドがかかるデメリットあり)
  • Learner
    • Learnerは複数インスタンス化される
    • 各Learnerはローカルメモリー又はグローバルメモリからミニバッチを取得し、勾配をパラメータサーバに送る
    • 定期的にパラメータサーバからQ-Networkのパラメータを同期
  • Parameter server
    • 非同期確率的勾配降下アルゴリズム(ASGD: asynchronous stochastic gradient descent algorithm)で更新
    • 各マシンでパラメータを保持し、各マシンで勾配を元にパラメータを更新

参考
averaged stochastic gradient descentのご紹介

本記事用に調べたのですが、Actor/Learner/Parameter 3つとも並列処理ができる構成だったんですね…
拡張性はすごいけど実装が大変そう…

A3C

論文:Asynchronous Methods for Deep Reinforcement Learning

アーキテクチャのみ取り上げます。A3C/A2Cは別の記事を上げる予定です。
A3Cは基本 GORILA と同じですが、以下の変更点があります。

  • パラメータサーバとLearner達を同じマシン(1台)の複数CPUに変更
    • これによりHogwild!(※1)という高速化手法が使える
    • 通信のオーバーヘッドもなくなる
  • ActorとLearnerは1対1で対応し、それぞれが学習サイクルを回し、共有パラメータ(パラメータサーバ)を更新する
    • ReplayMemoryを廃止
      • 複数Actorがいるので環境の探索範囲が広くて多彩
      • それぞれのActorは異なる探索ポリシーを持つことができる
      • これにより複数のActorによる経験の取得が、ReplayMemoryの代わりになる
    • ReplayMemoryの廃止によりOn-policy(学習の方策と経験を収集する方策が同じ)なアルゴリズムにも適用できる
      (簡単に言うと、価値ベースと方策ベース両方のアルゴリズムに適用できる)
      (そのままだとReplayMemoryは方策ベースのアルゴリズムでは使えない)
    • 実際の更新は複数経験をためてミニバッチ形式にして更新する
      (これにより各Actor-Learner間による共有パラメータの上書き確率を下げる)

※1、確率的勾配降下法(SGD)の並列処理改善による高速化手法らしいです。
参考
HOGWILD! 論文要約
HOGWILD!: A Lock-Free Approach to Parallelizing Stochastic Gradient Descent(論文)

Ape-X

論文:Distributed Prioritized Experience Replay(論文)

Rainbowでも成果を見せた Prioritized Experience Replay(優先度付き経験再生)を分散強化学習で実現した論文となります。
(GORILA,A3Cの流れも含まれています)

提案されたアーキテクチャは以下です。

img8.PNG

  • Actor
    • GORILAと同じで環境から経験を取得しメモリに送信
    • 複数インスタンス、各Actorに1CPUが割り当てられる
  • Learner
    • メモリからミニバッチを取得しパラメータを更新
    • 1Learner
    • 1GPU
  • Replay(メモリ)
    • 1メモリ(優先度を計算する必要があるため)

大きな違いはメモリと共有パラメータ(とLearner)が1つになったことでしょう。
メモリは優先順位を付けるために1カ所にまとめています。

GORILAではLearnerは勾配のみを計算し、共有パラメータへの反映はパラメータサーバが行っています。
ですので、Learner側のミニバッチ計算とパラメータへの反映が並列で実行可能でした。
Ape-XではここをまとめてLearnerで処理するようにしています。

ここの利点が論文から正確に読み取れた自信がないのですが、勾配はすぐ古くなるのに対し、経験はすぐには古いデータにならないので、システム全体を見ると多少の待ち時間を犠牲にしてもバッチ処理を優先させ、学習効率とスループットを高めているとの事です。

Priorityの計算

Rainbow の Prioritized Experience Replay ではメモリに経験を入れる時、必ず使われるように優先度を一番高くして追加していました。
しかし、分散学習では大量に経験が来て新しい経験だらけになってしまいます。
そこで追加する前にActor側で優先度を計算して追加するように変更しています。

SimpleDistributedRL

本フレームワークでの実装です。
基本はApe-Xと同じです。
より実装に向けた具体的なアーキテクチャになっています。

overview-distributed.drawio.png

基本的な構成は以下です。

  • Actor
    • 複数インスタンス作成でき、インスタンス毎にCPUが割り当て可能
    • 各Actorは独立で動き、非同期でメモリにバッチを送信
    • Boardを見て、もしパラメータが更新されていたら自分のパラメータを更新する
  • Trainer(Learner)
    • 1Trainer(GPUに割り当て可能)
    • メモリからミニバッチを取得し、ひたすら学習する
    • 定期的にBoardにパラメータを書き込む
  • RemoteMemory
    • サーバとして実装
    • バッチの受信とミニバッチの送信は非同期に実施
  • RemoteBoard
    • サーバとして実装
    • Blackboardアーキテクチャパターンがベースです(※2)
    • 書き込みは非同期に実行

※2:方向が逆なので厳密な意味としてはPub/Subを採用したほうが良さそうですが、素直に実装できたので採用しています

最初はキュー等を色々試していたのですが、RemoteMemory/RemoteBoardとのやりとりで待ち(非同期処理のロック等)が発生するとどうしても都合が悪く、この形になりました。
(例えばキューだとput中にgetはできずにputが終わるまで待ちが発生する)
また、この形はpythonだと multiprocessing.Manager で簡単に実装ができたのも大きいです。

また、まだ試していませんが、Manager はサーバプロセスとしての非同期も実現しているので、別PCからネットワーク経由でもアクセス可能との事です。
なのでこの構成のまま、各ActorとTrainerを別PCで学習する事も可能です(フレームワーク上はまだ実装していません)

実装

フレームワーク上の実装はこちら、分散処理のみを書いた最小コードのサンプルはこちらです。
記事のコードはメインロジックのみでそれ以外の細かいところ(例外処理等)は省略しています。

RemoteBoard

実装自体は簡単で以下2つです。
1、Trainerからもらったパラメータを保存
2、更新があるかどうかを知らせる(Actor側の実装もあり)

2はActor側にも実装が含まれてしまっているので完全に分離はできていません…

class Board:
    def __init__(self):
        self.params = None
        self.update_count = 0

    def write(self, params):
        self.params = params
        self.update_count += 1

    def get_update_count(self):
        return self.update_count

    def read(self):
        return self.params

get_update_count はActor側で更新があるかどうか判定する時に使います。
各Actorがそれぞれ判定する必要があるので回数を数える実装になっています。

RemoteMemory

中身の実装に指定はありません。
注意点としてはManagerでサーバプロセス化するので関数しか使えなくなる点です。
(なので length も __len__ の実装ではなく関数で実装)

例として ExperienceReplayBuffer の実装だと以下です。

import random
from collections import deque

class RemoteMemory():
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def length(self) -> int:
        return len(self.buffer)

    def add(self, batch: Any) -> None:
        self.buffer.append(batch)

    def sample(self, batch_size: int):
        return random.sample(self.memory, batch_size)

Actor

multiprocessing でプロセス化をするので関数で実装します。

def _run_actor(remote_memory, remote_board):
    # remote_memoryとremote_board はサーバプロセス化後のインスタンスが入っています

    env = Envを作成
    parameter = Parameterを作成
    worker = Workerを作成parameter  remote_memory を使用

    prev_update_count = 0

    # episode loop
    while True:
        env.reset()
        worker.on_reset(env)

        # 1 episode
        while not env.done:
            
            # 1step
            # この中でWorkerは remote_memory に経験を送る(非同期)
            action = worker.policy(env)
            env.step(action)
            worker.on_step(env)
        
        # remote_board で更新があるか確認
        # 更新回数が前回より増えていれば更新ありとする
        update_count = remote_board.get_update_count()
        if update_count != prev_update_count:
            # parameterを更新(非同期)
            prev_update_count = update_count
            params = remote_board.read()
            if params is not None:
                parameter.restore(params)

Trainer

Worker/Trainerという単語は論文ではなくRLlibからきていたり…。
同じくmultiprocessingでプロセス化をするので関数で実装しています。

def _run_trainer(remote_memory, remote_board):
    # remote_memoryとremote_board はサーバプロセス化後のインスタンスが入っています

    parameter = Parameterを作成
    trainer = Trainerを作成parameter  remote_memory を使用

    # ハイパーパラメータ(学習を送る間隔)
    send_interval_by_train_count = 100

    train_count = 0

    # 無限ループ
    while True:
        # 仮で10000回学習したら終了とする
        if train_count > 10000:
            break
        
        # 学習
        # この中で、remote_memory からミニバッチを取得(非同期)
        trainer.train()
        train_count += 1
        
        # 一定学習毎に Board にパラメータを送る(非同期)
        if train_count % send_interval_by_train_count == 0:
            remote_board.write(parameter.backup())

    # 最後に学習結果を送信
    remote_board.write(parameter.backup())

main

上で定義したもろもろをmultiprocessingで動かします。

import multiprocessing as mp
from multiprocessing.managers import BaseManager

# Managerクラスを作成
class MPManager(BaseManager):
    pass

def main():

    # actor数
    actor_num = 2

    # Manager登録
    MPManager.register("RemoteMemory", RemoteMemory)
    MPManager.register("Board", Board)

    # Managerプロセスを起動
    with MPManager() as manager:

        # --- サーバプロセス作成
        remote_memory = manager.RemoteMemory()
        remote_board = manager.Board()

        # --- actor プロセス作成
        actors_ps_list = []
        for actor_id in range(actor_num):
            params = (remote_memory, remote_board)
            ps = mp.Process(target=_run_actor, args=params)
            actors_ps_list.append(ps)

        # --- trainer プロセス作成
        params = (remote_memory, remote_board)
        trainer_ps = mp.Process(target=_run_trainer, args=params)

        # --- 各プロセスの並列実行を開始
        [p.start() for p in actors_ps_list]
        trainer_ps.start()

        # trainerプロセスが終わるまで待つ
        trainer_ps.join()

        # 学習後の結果を受け取る、remote_boardから最後のパラメータを取得する
        parameter = Parameterを作成
        params = remote_board.read()
        parameter.restore(params)

ハイパーパラメータ

フレームワーク上のハイパーパラメータです。

@dataclass
class Config:

    # actor数です。
    actor_num: int = 1

    # TrainerがBoardにパラメータを書き込む間隔(学習回数)
    trainer_parameter_send_interval_by_train_count: int = 100

    # ActorがBoardに確認しに行く間隔(step数)
    actor_parameter_sync_interval_by_step: int = 100

    # 各プロセスのCPU/GPU割り当て
    allocate_main: str = "/CPU:0"
    allocate_trainer: str = "/GPU:0"
    allocate_actor: Union[List[str], str] = "/CPU:0"

multiprocessing による実行の注意点

メイン関数は if __name__ == '__main__': の中に記述しないといけないようで、グローバルに記述すると以下のエラーがでました。

RuntimeError:
    An attempt has been made to start a new process before the
    current process has finished its bootstrapping phase.
    This probably means that you are not using fork to start your
    child processes and you have forgotten to use the proper idiom
    in the main module:

        if __name__ == '__main__':
            freeze_support()
            ...

    The "freeze_support()" line can be omitted if the program
    is not going to be frozen to produce an executable.

if __name__ == '__main__':の中で実行しましょう。

実行/比較

分散学習しない場合とする場合で比較してみました。
アルゴリズムはDQNです。

import matplotlib.pyplot as plt
import numpy as np
import srl
from srl import runner

# --- env & algorithm load
import gym  # isort: skip # noqa F401
from srl.algorithms import dqn  # isort: skip

def main():
    env_config = srl.EnvConfig("Pendulum-v1")
    rl_config = dqn.Config(hidden_block_kwargs=dict(hidden_layer_sizes=(64, 64)))
    config = runner.Config(env_config, rl_config)

    # --- train
    parameter, remote_memory, history1 = runner.train(
        config,
        timeout=60 * 10,
        enable_evaluation=False,
    )
    rewards = runner.evaluate(config, parameter, max_episodes=100)
    print(f"Average reward for 100 episodes: {np.mean(rewards)}")

    # --- dist
    parameter, remote_memory, history2 = runner.mp_train(
        config,
        timeout=60 * 10,
        enable_evaluation=False,
    )
    rewards = runner.evaluate(config, parameter, max_episodes=100)
    print(f"Average reward for 100 episodes: {np.mean(rewards)}")

    # --- plot
    plt.xlabel("time")
    plt.ylabel("reward")
    for name, h in [("sequence", history1), ("mp", history2)]:
        df = h.get_df()
        df = df.groupby("time").mean()
        plt.plot(df["episode_reward0"].rolling(20).mean(), label=name)
    plt.grid()
    plt.legend()
    plt.tight_layout()
    plt.show()

if __name__ == "__main__":
    main()

Figure_1.png

実行した私のPCのスペックは以下です。(CPU/GPU共に1個です)

  • CPU: Core i7-8700 3.2GHz
  • GPU: NVIDIA GeForce GTX 1060 3GB

このレベルだと分散処理の恩師はほとんどありませんね(画像処理レイヤーもないので
GPUの効果もほとんどありません、たぶんCPUの方が早いかも)
ちなみにApe-Xの論文だと360actorです。(360CPU使用)
360個のCPUとか個人ではちょっと実験できないレベルですね…

8
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?