1. はじめに
2つの電源が接続された電力系統のシミュレータをマルチエージェント強化学習で制御するというタスクに取り組みました。
学習のアルゴリズムは、TF-Agents で実装しました。特に、Environment を実装するときに、シングルエージェントの強化学習のアルゴリズムに比べて、苦労しました。1つの Environment のインスタンスで、2つのエージェント、それぞれに応じて異なる応答を返すように実装する必要があるためです。
この記事では、Environment の実装の工夫を紹介します。
2. タスクの定義
状態方程式
電源Aおよび電源Bが供給した電力を負荷が消費する状況を想定します。制御対象の状態方程式は以下の通りです。なお、サセプタンスは$B=1$としました。
\def\a{{\rm a}}
\def\b{{\rm b}}
\def\load{{\rm load}}
\begin{align*}
\dot{\theta}_{\a} &= \omega_{\a},\\
\dot{\theta}_{\b} &= \omega_{\b},\\
\dot{\theta}_{\load} &= \omega_{\load},\\
%
\dot{\omega}_{\a} &= p_{\a} - B\sin(\theta_{\a} - \theta_{\load}) - B\sin(\theta_{\a} - \theta_{\b}),\\
\dot{\omega}_{\b} &= p_{\b} - B\sin(\theta_{\b} - \theta_{\load}) - B\sin(\theta_{\b} - \theta_{\a}),\\
\dot{\omega}_{\load} &= -p_{\load} + B\sin(\theta_{\a} - \theta_{\load}) + B\sin(\theta_{\b} - \theta_{\load}).\\
\end{align*}
コスト関数
制御要件は、負荷変動に対して角周波数を抑えることです。コスト関数は次の通り設定しました。
\def\a{{\rm a}}
\def\b{{\rm b}}
\def\load{{\rm load}}
\begin{align*}
|\omega_{\a}| + |\omega_{\b}| + |\omega_{\load}|
\end{align*}
コントローラの仕様
エージェントは、SacAgent で実装しました。ポリシーの入出力は以下の通りです。
特に、入力の第5引数で、エージェントAはエージェントBの操作を見て自身の操作を決定しています。また、エージェントBはエージェントAの操作を見て操作量を決めます。制御対象だけではなく、お互いの操作に配慮している点が、このタスクの特徴です。
表2-1. ポリシーの入力
引数 | エージェントA | エージェントB |
---|---|---|
第1引数 | 角周波数$\omega_{\rm a}$ | 角周波数$\omega_{\rm b}$ |
第2引数 | 角周波数$\omega_{\rm b}$ | 角周波数$\omega_{\rm a}$ |
第3引数 | 角周波数$\omega_{\rm load}$ | 同左 |
第4引数 | 負荷の電力需要$p_{\rm load}$ | 同左 |
第5引数 | 電源Bの供給電力$p_{\rm b}$ | 電源Aの供給電力$p_{\rm a}$ |
表2-2. ポリシーの出力
引数 | エージェントA | エージェントB |
---|---|---|
第1引数 | 電源Aの供給電力$p_{\rm a}$ | 電源Bの供給電力$p_{\rm b}$ |
3. 実装
はじめに、学習アルゴリズムの全体の流れを説明して、次に、Environment の実装を工夫した点を紹介します。ソースコードの詳細は、github を参照してください。
学習の全体の流れ
エージェントを、セルフプレイで学習しました。すなわち、2つのエージェントのうち、エージェントAだけを学習して、エージェントBについては、エージェントAのポリシーをコピーしたポリシーに従って駆動しました。セルフプレイについては、こちらの文献 を参照下さい。
エージェントAをセルフプレイで学習するアルゴリズムを、main.pyに実装しました。処理の順番に従って、要点を説明します。
1. エージェントAのパラメータをエージェントBにコピーします。
train_agent_b._actor_network.set_weights(train_agent_a._actor_network.get_weights())
2. エージェントAとエージェントBがパートナーを組んで制御対象を操作し、ログをreplay_buffer
に保存します。
time_step_for_agent_a = train_env.reset()
while True:
action_step_of_agent_a = train_agent_a.collect_policy.action(time_step_for_agent_a)
time_step_for_agent_b = train_env.step(action_step_of_agent_a.action)
action_step_of_agent_b = train_agent_b.policy.action(time_step_for_agent_b)
next_time_step_for_agent_a = train_env.step(action_step_of_agent_b.action)
traj = trajectory.from_transition(time_step_for_agent_a, action_step_of_agent_a, next_time_step_for_agent_a)
id_eps = replay_buffer.add_batch(traj, id_eps)
if next_time_step_for_agent_a.is_last():
break
else:
time_step_for_agent_a = next_time_step_for_agent_a
3. replay_buffer
に保存したログを使って、エージェントAのパラメータを更新します。なお、エージェントBのパラメータは更新しません。
iterator = iter(replay_buffer.as_dataset(sample_batch_size = config["sample_batch_size"], num_steps = config["num_steps"]))
for _ in range(config["num_train_iteration"]):
trajectories, _ = next(iterator)
train_agent_a.train(experience=trajectories)
Environment の実装の工夫
制御対象を初期化する
main.py で、time_step_for_agent_a = train_env.reset()
と、Environment のインスタンスを初期化しています。このときのメソッドreset
の応答は以下の通りです。
def _reset(self):
self.osc.reset()
self._episode_ended = False
self.mode = 0
return ts.restart(observation = self.get_observation_for_agent_a())
def get_observation_for_agent_a(self):
omega_a, omega_b, omega_load = self.osc.get_omega()
p_load = self.osc.get_p_load()
p_source_a, p_source_b = self.osc.get_p_source()
return np.clip([omega_a/(2*np.pi), omega_b/(2*np.pi), omega_load/(2*np.pi), p_load, p_source_b], -1, 1).astype(np.float32) # (2*n_self.oscillator+3,)
要点を以下に説明します。
- 内部変数
self.osc
は電力系統シミュレータのインスタンスです。 - 内部変数
self.mode
は、エージェントAの操作を受け付けるモード:0と、エージェントBの操作を受け付けるモード:1を切り替える変数です。self.mode = 0
として、エージェントAの操作を受け付けるモードで初期化しています。 -
return ts.restart(observation = self.get_observation_for_agent_a())
で、エージェントAに渡す観測量を返します。観測量の内訳は表2-1の通りです。
mode=0
: エージェントAの操作に対する応答
main.pyで、time_step_for_agent_b = train_env.step(action_step_of_agent_a.action)
と、エージェントAの操作に対してEnvironment のインスタンスは応答しています。このときのメソッドstep
の応答は以下の通りです。
def _step(self, action_from_an_agent):
(中略)
if self.mode == 0:
'''
When mode = 0,
interpret the value of action as p_source_a,
return observations for agent b,
and wait until the next action has arrived.
'''
self.p_source_a = action_from_an_agent[0]
self.mode = 1
reward_dummy = 0.0
observation = self.get_observation_for_agent_b()
return ts.transition(observation = observation, reward = reward_dummy, discount = self.discount)
def get_observation_for_agent_b(self):
omega_a, omega_b, omega_load = self.osc.get_omega()
p_load = self.osc.get_p_load()
p_source_a, p_source_b = self.osc.get_p_source()
return np.clip([omega_b/(2*np.pi), omega_a/(2*np.pi), omega_load/(2*np.pi), p_load, p_source_a], -1, 1).astype(np.float32) # (2*n_self.oscillator+3,)
要点は以下の通りです。
-
self.p_source_a = action_from_an_agent[0]
と、エージェントAの操作量を電源Aの電力潮流に代入しています。 -
self.mode = 1
と、Environment のインスタンスをエージェントBの操作を待つ状態に設定しています。 -
observation = self.get_observation_for_agent_b()
で、エージェントBに渡す観測量を求めます。内訳は表2-1の通りです。- 特に、エージェントAのポリシーのクローンを、エージェントBで使いまわすために、第1引数に電源Bの角周波数を、第2引数に電源Aの角周波数を代入しています。また、同じ理由で、第5引数に、電源Aの電力潮流を代入しています。
-
reward_dummy = 0.0
と、ダミーのコストを設定しています。これは、エージェントBは、エージェントAのクローンとしていて、パラメータを学習しないためです。
mode=1
: エージェントBの操作に対する応答
main.pyの51行目で、next_time_step_for_agent_a = train_env.step(action_step_of_agent_b.action)
と、エージェントBの操作に対してEnvironment のインスタンスは応答しています。このときのメソッドstep
の応答は以下の通りです。
def _step(self, action_from_an_agent):
(中略)
if self.mode == 1:
'''
When mode = 1,
interpret the value of action as p_source_b,
develop the simulation by several steps,
and return observations for agent a,
'''
self.p_source_b = action_from_an_agent[0]
self.mode = 0
rewards = []
for _ in range(self.control_interval_step):
done = self.osc.step(p_source = [self.p_source_a, self.p_source_b])
rewards.append(self.get_reward())
if done:
break
reward = np.mean(rewards)
observation = self.get_observation_for_agent_a()
if done:
self._episode_ended = True
return ts.termination(observation = observation, reward = reward)
else:
self._episode_ended = False
return ts.transition(observation = observation, reward = reward, discount = self.discount)
要点は以下の通りです。
-
self.p_source_b = action_from_an_agent[0]
と、エージェントBの操作量を電源Bの電力潮流に代入しています。 -
self.mode = 0
と、Environment のインスタンスをエージェントAの操作を待つ状態に設定しています。 -
self.osc
は、電力系統シミュレータのインスタンスです。-
done = self.osc.step(p_source = [self.p_source_a, self.p_source_b])
とすることで、単位時刻だけ、シミュレータを時間発展させています。 -
rewards.append(self.get_reward())
と、操作に対する評価を求めています。エージェントBと違って、エージェントAのパラメータは学習するため、操作に対する評価量を算出します。
-
-
observation = self.get_observation_for_agent_a()
で、エージェントAに渡す観測量を求めます。