はじめに
pythonでマルチエージェント追跡問題を解いてみました.
教科書として『学習とそのアルゴリズム』を使いました.
本記事の構成
- はじめに
- マルチエージェント追跡
- ルール
- Q 値の更新
- 実装
- コード
- ポイント
- 結果
- おわりに
マルチエージェント追跡
$2$ つのエージェントで $1$ つのターゲットを捕獲するための最適な行動を学習します.
エージェントがターゲットを挟み込んだ状態を,捕獲した状態と定義します.
「挟み込む」とは,ターゲットの $4$ 近傍のうち $2$ 箇所にエージェントが存在する状態を指します.
ルール
詳細なルールを以下に示します.『学習とそのアルゴリズム』のルールを採用しています.
- 環境として $20 \times 20$ の格子状トーラス平面を設ける.
- $2$ つのエージェントと $1$ つのターゲットを環境中にランダムに配置する.
- エージェントは自分を中心とした $2$ 格子分の状態を観測することができる.
すなわち,自分のいる格子を含め $25$ 格子分を状態として認識する. - それぞれのエージェントは,独立に Q-learning を進める.
学習結果に基づいて,$1$ ステップにつき上下左右のいずれかに $1$ 格子移動する行動が選択される.
各行動につき $-1$ の報酬がつねに与えられる.
移動先にすでに別のエージェントあるいはターゲットが存在する場合はその場に留まる. - ターゲットは,$1$ ステップにつき上下左右のいずれかの方向へランダムに移動する.移動先にエージェントが存在する場合は,別のランダムな行動を選択する.
- $2$ つのエージェントがターゲットを挟み込んだ時点で $2$ つのエージェントそれぞれに報酬を与え,$1$ 回の試行を終了する.その後,2. に戻り次の試行を開始する.
Q 値の更新
以下の式により $Q$ 値を更新します.
Q_{new}(s_t, a) = Q_{old}(s_t, a) + \alpha\bigl[r_{t+1} + \gamma max_{a'}Q(s_{t+1}, a') - Q_{old}(s_t, a)\bigr]
$Q(s_t, a)$ は,ある状態 $s_t$ での行動 $a$ の価値を表します.状態 $s_t$ での行動 $a$ により報酬 $r_{t+1}$ が得られます.
$Q$ 値を更新する際,直近の報酬 $r_{t+1}$ に,その次の状態 $s_{t+1}$ における行動 $a'$ によって得られる価値の最大値に $\gamma$ を乗じたものを加えます.
これは,ある状態での行動を直近の報酬だけでなく,その後の状態における価値を加味して行動を選択することを意味します.
$\gamma$ を割引率,$\alpha$ を学習率と呼びます.
実装
コードとポイントに分けて載せます.
コード
以下のように実装しました.
.
├── main.py
├── model
│ ├── agent.py
│ ├── area.py
│ ├── movingobject.py
│ ├── target.py
└── service
└── gameservice.py
import numpy
import random
import copy
class MovingObject:
def __init__(self, area):
self.state = self.__random_state(area)
self.actions = self.__create_actions(area.shape)
self.moves = self.__create_moves()
# public method
def reset_state(self, area):
self.state = self.__random_state(area)
def get_act_index(self, epsilon):
return self.__select_action(epsilon)
def move_state(self, act_index):
return self.state + self.moves.get(act_index)
# private method
def __random_state(self, area):
h, w = area.shape
mergin = area.mergin
while True:
y = numpy.random.randint(mergin, h - mergin)
x = numpy.random.randint(mergin, w - mergin)
if area.state[y, x] == 0:
break
return numpy.array([y, x])
def __create_actions(self, area_shape):
actions = []
h, w = area_shape
for j in range(h):
actions.append([])
for i in range(w):
action_org = [0, 1, 2, 3]
action = self.__remove_actions(action_org, h, w, j, i)
actions[j].append(action)
return numpy.array(actions)
def __remove_actions(self, action_org, h, w, j, i):
action = copy.deepcopy(action_org)
mergin = 2
if j == mergin:
action.remove(3)
if j == h - mergin - 1:
action.remove(1)
if i == mergin:
action.remove(0)
if i == w - mergin - 1:
action.remove(2)
return action
def __create_moves(self):
# 0->left, 1->down, 2-> right, 3->up
return {0: numpy.array([0, -1]), 1: numpy.array([1, 0]), 2: numpy.array([0, 1]), 3: numpy.array([-1, 0])}
def __select_action(self, epsilon):
y, x = self.state
action = self.actions[y, x]
if numpy.random.rand() > epsilon:
return self.__greedy_action(action)
else:
return self.__random_action(action)
def __greedy_action(self, action):
sy, sx = self.around
qmax = self.q[action, sy, sx].max()
indexes = list(numpy.argwhere(self.q[action, sy, sx] == qmax))
random.shuffle(indexes)
return action[indexes[0]]
def __random_action(self, action):
act_indexes = copy.deepcopy(action)
random.shuffle(act_indexes)
return act_indexes[0]
from movingobject import *
class Target(MovingObject):
def __init__(self, area):
MovingObject.__init__(self, area)
self.value = 127
# public method
def act(self, area):
_y, _x = self.state
if self.__check_catched(area, _y, _x) == False:
area.state[_y, _x] = 0
while True:
act_index = MovingObject.get_act_index(self, 1.0)
y, x = MovingObject.move_state(self, act_index)
if area.state[y, x] == 0:
self.state = numpy.array([y ,x])
area.state[y, x] = self.value
break
# private method
def __check_catched(self, area, _y, _x):
check = self.__is_surrounded(area)
check *= self.__is_atcorners(area, _y, _x)
return check
def __is_surrounded(self, area):
t_state = numpy.argwhere(area.state == 127)
a_state = numpy.argwhere(area.state == 255)
return numpy.array_equal(numpy.abs((a_state - t_state).sum(axis = 1)), numpy.array([1, 1]))
def __is_atcorners(self, area, _y, _x):
h, w = area.shape
mergin = 2
c = _y == mergin or _y == h - mergin - 1
c *= _x == mergin or _x == w - mergin - 1
return c
from movingobject import *
class Agent(MovingObject):
def __init__(self, area):
MovingObject.__init__(self, area)
self.value = 255
self.q = self.__creat_q_table()
self.around = numpy.zeros(2).astype('int')
self.act_index = -1
self.reward = 0
# public method
def lookout(self, area):
y, x = self.state
self.around = self.__get_object_states(area, y, x)
def act(self, area, epsilon):
_y, _x = self.state
area.state[_y, _x] = 0
if self.__is_alone():
epsilon = 1.0
self.act_index = MovingObject.get_act_index(self, epsilon)
y, x = MovingObject.move_state(self, self.act_index)
if area.state[y, x] == 0:
self.state = numpy.array([y, x])
area.state[y, x] = self.value
else:
area.state[_y, _x] = self.value
def update_q(self, area, alpha, gamma):
sy, sx = self.around
self.reward = self.__get_reward(area)
act_index = self.act_index
q = self.q[act_index, sy, sx]
self.q[act_index, sy, sx] = q + alpha * (self.reward + gamma * self.__get_max_q() - q)
def save_q(self, fname):
numpy.save(fname, self.q)
# private method
def __creat_q_table(self):
q = numpy.zeros((4, 26, 26))
return q
def __get_object_states(self, area, y, x):
mergin = area.mergin
around = area.state[y - mergin: y + mergin + 1, x - mergin: x + mergin + 1].reshape((1, -1))
t_state = numpy.argwhere(around == 127)[:, 1]
_a_state = numpy.argwhere(around == 255)[:, 1]
a_state = numpy.delete(_a_state, numpy.argwhere(_a_state == 12)[:, 0], axis = 0)
if numpy.array_equal(t_state, numpy.array([])):
t_state = numpy.array([25])
if numpy.array_equal(a_state, numpy.array([])):
a_state = numpy.array([25])
return numpy.r_[t_state, a_state]
def __is_alone(self):
return numpy.array_equal(self.around, numpy.array([25, 25]))
def __get_reward(self, area):
return 3 if self.__is_surrounding(area) else -1
def __is_surrounding(self, area):
t_state = numpy.argwhere(area.state == 127)
a_state = numpy.argwhere(area.state == 255)
check = numpy.array_equal(numpy.abs((a_state - t_state).sum(axis = 1)), numpy.array([1, 1]))
check += numpy.array_equal(a_state.mean(axis = 0), t_state)
return check
def __get_max_q(self):
y, x = self.state
actions = self.actions[y, x]
sy, sx = self.around
_sy, _sx = self.__next_around(sy, sx)
return self.q[actions, _sy, _sx].max()
def __next_around(self, sy, sx):
act_index = self.act_index
_sy = self.__next_state(act_index, sy)
_sx = self.__next_state(act_index, sx)
return numpy.array([_sy, _sx])
def __next_state(self, act_index, z):
if z != 25:
if act_index == 0 and (z + 1) % 5 != 0:
z += 1
elif act_index == 1 and z / 5 != 0:
z -= 5
elif act_index == 2 and z % 5 != 0:
z -= 1
elif act_index == 3 and z / 5 != 4:
z += 5
else:
z = 25
return z
import numpy
class Area:
def __init__(self, shape, mergin):
self.shape = shape
self.mergin = mergin
self.state = self.__init_state()
# public method
def update_state(self, mvobj):
y, x = mvobj.state
self.state[y, x] = mvobj.value
def reset_state(self):
self.state = self.__init_state()
# private method
def __init_state(self):
return numpy.zeros(self.shape).astype('uint8')
from model.area import Area
from model.agent import Agent
from model.target import Target
class GameService:
def __init__(self):
'SERVICE'
def construct(self, area_shape, mergin):
area = Area(area_shape, mergin)
agent1 = Agent(area)
area.update_state(agent1)
agent2 = Agent(area)
area.update_state(agent2)
target = Target(area)
area.update_state(target)
return (area, agent1, agent2, target)
def turn(self, area, agent1, agent2, target, epsilon):
agent1.lookout(area)
agent2.lookout(area)
agent1.act(area, epsilon)
agent2.act(area, epsilon)
target.act(area)
def reset(self, area, agent1, agent2, target):
area.reset_state()
agent1.reset_state(area)
area.update_state(agent1)
agent2.reset_state(area)
area.update_state(agent2)
target.reset_state(area)
area.update_state(target)
from service.gameservice import GameService
import numpy
import cv2
from matplotlib import pyplot
if __name__ == '__main__':
# init
h = 24
w = 24
area_shape = (h, w)
mergin = 2
epsilon = 0.3
alpha = 0.3
gamma = 0.7
image_shape = (200, 200)
# count
total_plays = 300
total_steps = numpy.zeros(total_plays).astype('int')
play = 0
steps = 0
# construct
gameservice = GameService()
area, agent1, agent2, target = gameservice.construct(area_shape, mergin)
# video writer
writer = cv2.VideoWriter('q-learning.mv4', cv2.cv.FOURCC('m', 'p', '4', 'v'), 20, image_shape)
while True:
# disp
print u'play: %d, steps: %d' % (play, steps)
# act
gameservice.turn(area, agent1, agent2, target, epsilon)
# update area and agents' q talbe
agent1.update_q(area, alpha, gamma)
agent2.update_q(area, alpha, gamma)
# show image
image = cv2.resize(area.state[mergin:h - mergin, mergin:w - mergin], image_shape, interpolation = cv2.INTER_NEAREST)
writer.write(cv2.cvtColor(image, cv2.COLOR_GRAY2BGR))
cv2.imshow('', image)
if cv2.waitKey() == 27:
break
# refresh state if agents catch the taget
steps += 1
if agent1.reward == 3:
print '\033[32m' + '!!!catch the target!!!' + '\033[0m'
gameservice.reset(area, agent1, agent2, target)
agent1.save_q('q1.npy')
agent2.save_q('q2.npy')
total_steps[play] = steps
steps = 0
play += 1
# count
if play == total_plays:
break
pyplot.plot(numpy.arange(0, total_plays), total_steps)
pyplot.savefig('graph.png')
cv2.destroyAllWindows()
print '!!!finish!!!'
ポイント
それぞれのエージェントは独立した $Q$ 値を持ちます.
状態 $s$ はエージェントの座標とターゲットの座標で表現します.座標は左上から順に決定しています.
認識領域内にエージェントあるいはターゲットがいない場合を $25$ という数値で表します.
行動は,左下右上の順に $0, 1, 2, 3$ の数値で表現します.
(行動数)$\times$(ターゲットの座標)$\times$(エージェントの座標)となるので,$Q$ 値のサイズは $4 \times 26 \times 26$ となります.
$Q$ 値の更新の際,エージェントやターゲットがどのように行動するかを知ることはできないので,
移動先での周囲の状態は自分の行動にのみ依存し,その行動の分変化するものとしています.
0 | 1 | 2 | 3 | 4 |
5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 |
↑エージェントの認識する範囲とその座標
結果
グラフ
ターゲットを捕獲するのにかかったステップ数を縦軸に,試行回数を横軸にしたグラフを以下に示します.
独立した $300$ 回の試行を $30$ 回行い,各試行におけるステップ数の平均を縦軸の値としています.
学習が進むにつれて,ターゲットを捕獲するのにかかるステップ数が減少しているのが確認できます.
行動
学習された行動の一例を画像にしました.白色がエージェント,灰色がターゲットです.
ターゲットを追い詰めるような行動を学習できています.
実際は学習がうまくいっていない画像もありますが,考察できていないので割愛します.
おわりに
pythonでマルチエージェント追跡問題を解いてみました.
実装がうまくいっていない点もあるかもしれないので,指摘あればお願いします.
今回エージェントとターゲットが見えている状態での学習結果しか確認していませんが,
どちらか一方しか見えない状態での学習結果も確認してみようと思います.
発見があったら随時追記していきます.