38
35

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

【強化学習】マルチエージェントによる追跡

Last updated at Posted at 2016-05-31

はじめに

pythonでマルチエージェント追跡問題を解いてみました.
教科書として『学習とそのアルゴリズム』を使いました.

本記事の構成

  • はじめに
  • マルチエージェント追跡
    • ルール
    • Q 値の更新
  • 実装
    • コード
    • ポイント
  • 結果
  • おわりに

マルチエージェント追跡

$2$ つのエージェントで $1$ つのターゲットを捕獲するための最適な行動を学習します.
エージェントがターゲットを挟み込んだ状態を,捕獲した状態と定義します.
「挟み込む」とは,ターゲットの $4$ 近傍のうち $2$ 箇所にエージェントが存在する状態を指します.

ルール

詳細なルールを以下に示します.『学習とそのアルゴリズム』のルールを採用しています.

  1. 環境として $20 \times 20$ の格子状トーラス平面を設ける.
  2. $2$ つのエージェントと $1$ つのターゲットを環境中にランダムに配置する.
  3. エージェントは自分を中心とした $2$ 格子分の状態を観測することができる.
    すなわち,自分のいる格子を含め $25$ 格子分を状態として認識する.
  4. それぞれのエージェントは,独立に Q-learning を進める.
    学習結果に基づいて,$1$ ステップにつき上下左右のいずれかに $1$ 格子移動する行動が選択される.
    各行動につき $-1$ の報酬がつねに与えられる.
    移動先にすでに別のエージェントあるいはターゲットが存在する場合はその場に留まる.
  5. ターゲットは,$1$ ステップにつき上下左右のいずれかの方向へランダムに移動する.移動先にエージェントが存在する場合は,別のランダムな行動を選択する.
  6. $2$ つのエージェントがターゲットを挟み込んだ時点で $2$ つのエージェントそれぞれに報酬を与え,$1$ 回の試行を終了する.その後,2. に戻り次の試行を開始する.

Q 値の更新

以下の式により $Q$ 値を更新します.

Q_{new}(s_t, a) = Q_{old}(s_t, a) + \alpha\bigl[r_{t+1} + \gamma max_{a'}Q(s_{t+1}, a') - Q_{old}(s_t, a)\bigr]

$Q(s_t, a)$ は,ある状態 $s_t$ での行動 $a$ の価値を表します.状態 $s_t$ での行動 $a$ により報酬 $r_{t+1}$ が得られます.
$Q$ 値を更新する際,直近の報酬 $r_{t+1}$ に,その次の状態 $s_{t+1}$ における行動 $a'$ によって得られる価値の最大値に $\gamma$ を乗じたものを加えます.
これは,ある状態での行動を直近の報酬だけでなく,その後の状態における価値を加味して行動を選択することを意味します.
$\gamma$ を割引率,$\alpha$ を学習率と呼びます.

実装

コードとポイントに分けて載せます.

コード

以下のように実装しました.

.
├── main.py
├── model
│   ├── agent.py
│   ├── area.py
│   ├── movingobject.py
│   ├── target.py
└── service
    └── gameservice.py
movingobject.py
import numpy
import random
import copy

class MovingObject:

    def __init__(self, area):
        self.state = self.__random_state(area)
        self.actions = self.__create_actions(area.shape)
        self.moves = self.__create_moves()


# public method
    def reset_state(self, area):
        self.state = self.__random_state(area)


    def get_act_index(self, epsilon):
        return self.__select_action(epsilon)


    def move_state(self, act_index):
        return self.state + self.moves.get(act_index)


# private method
    def __random_state(self, area):
        h, w = area.shape
        mergin = area.mergin

        while True:
            y = numpy.random.randint(mergin, h - mergin)
            x = numpy.random.randint(mergin, w - mergin)

            if area.state[y, x] == 0:
                break

        return numpy.array([y, x])


    def __create_actions(self, area_shape):
        actions = []
        h, w = area_shape

        for j in range(h):
            actions.append([])
            for i in range(w):
                action_org = [0, 1, 2, 3]
                action = self.__remove_actions(action_org, h, w, j, i)
                actions[j].append(action)

        return numpy.array(actions)


    def __remove_actions(self, action_org, h, w, j, i):
        action = copy.deepcopy(action_org)
        mergin = 2
        if j == mergin:
            action.remove(3)
        if j == h - mergin - 1:
            action.remove(1)
        if i == mergin:
			action.remove(0)
        if i == w - mergin - 1:
			action.remove(2)

        return action


    def __create_moves(self):
        # 0->left, 1->down, 2-> right, 3->up
        return {0: numpy.array([0, -1]), 1: numpy.array([1, 0]), 2: numpy.array([0, 1]), 3: numpy.array([-1, 0])}


    def __select_action(self, epsilon):
        y, x = self.state
        action = self.actions[y, x]
        if numpy.random.rand() > epsilon:
            return self.__greedy_action(action)
        else:
            return self.__random_action(action)


    def __greedy_action(self, action):
        sy, sx = self.around
        qmax = self.q[action, sy, sx].max()
        indexes = list(numpy.argwhere(self.q[action, sy, sx] == qmax))
        random.shuffle(indexes)

        return action[indexes[0]]


    def __random_action(self, action):
        act_indexes = copy.deepcopy(action)
        random.shuffle(act_indexes)

        return act_indexes[0]
tartget.py
from movingobject import *

class Target(MovingObject):

	def __init__(self, area):
		MovingObject.__init__(self, area)
		self.value = 127


# public method
	def act(self, area):
		_y, _x = self.state
		if self.__check_catched(area, _y, _x) == False:
			area.state[_y, _x] = 0
			while True:
				act_index = MovingObject.get_act_index(self, 1.0)
				y, x = MovingObject.move_state(self, act_index)
				if area.state[y, x] == 0:
					self.state = numpy.array([y ,x])
					area.state[y, x] = self.value
					break


# private method
	def __check_catched(self, area, _y, _x):
		check = self.__is_surrounded(area)
		check *= self.__is_atcorners(area, _y, _x)

		return check


	def __is_surrounded(self, area):
		t_state = numpy.argwhere(area.state == 127)
		a_state = numpy.argwhere(area.state == 255)

		return numpy.array_equal(numpy.abs((a_state - t_state).sum(axis = 1)), numpy.array([1, 1]))


	def __is_atcorners(self, area, _y, _x):
		h, w = area.shape
		mergin = 2
		c = _y == mergin or _y == h - mergin - 1
		c *= _x == mergin or _x == w - mergin - 1

		return c
agent.py
from movingobject import *

class Agent(MovingObject):

	def __init__(self, area):
		MovingObject.__init__(self, area)
		self.value = 255
		self.q = self.__creat_q_table()
		self.around = numpy.zeros(2).astype('int')
		self.act_index = -1
		self.reward = 0


# public method
	def lookout(self, area):
		y, x = self.state
		self.around = self.__get_object_states(area, y, x)


	def act(self, area, epsilon):
		_y, _x = self.state
		area.state[_y, _x] = 0
		if self.__is_alone():
			epsilon = 1.0
		self.act_index = MovingObject.get_act_index(self, epsilon)
		y, x = MovingObject.move_state(self, self.act_index)
		if area.state[y, x] == 0:
			self.state = numpy.array([y, x])
			area.state[y, x] = self.value
		else:
			area.state[_y, _x] = self.value


	def update_q(self, area, alpha, gamma):
		sy, sx = self.around
		self.reward = self.__get_reward(area)
		act_index = self.act_index
		q = self.q[act_index, sy, sx]
		self.q[act_index, sy, sx] = q + alpha * (self.reward + gamma * self.__get_max_q() - q)


	def save_q(self, fname):
		numpy.save(fname, self.q)


# private method
	def __creat_q_table(self):
		q = numpy.zeros((4, 26, 26))

		return q

	def __get_object_states(self, area, y, x):
		mergin = area.mergin
		around = area.state[y - mergin: y + mergin + 1, x - mergin: x + mergin + 1].reshape((1, -1))
		t_state = numpy.argwhere(around == 127)[:, 1]
		_a_state = numpy.argwhere(around == 255)[:, 1]
		a_state = numpy.delete(_a_state, numpy.argwhere(_a_state == 12)[:, 0], axis = 0)
		if numpy.array_equal(t_state, numpy.array([])):
			t_state = numpy.array([25])
		if numpy.array_equal(a_state, numpy.array([])):
			a_state = numpy.array([25])

		return numpy.r_[t_state, a_state]


	def __is_alone(self):
		return numpy.array_equal(self.around, numpy.array([25, 25]))


	def __get_reward(self, area):
		return 3 if self.__is_surrounding(area) else -1


	def __is_surrounding(self, area):
		t_state = numpy.argwhere(area.state == 127)
		a_state = numpy.argwhere(area.state == 255)

		check = numpy.array_equal(numpy.abs((a_state - t_state).sum(axis = 1)), numpy.array([1, 1]))
		check += numpy.array_equal(a_state.mean(axis = 0), t_state)

		return check


	def __get_max_q(self):
		y, x = self.state
		actions = self.actions[y, x]
		sy, sx = self.around
		_sy, _sx = self.__next_around(sy, sx)
		return self.q[actions, _sy, _sx].max()


	def __next_around(self, sy, sx):
		act_index = self.act_index
		_sy = self.__next_state(act_index, sy)
		_sx = self.__next_state(act_index, sx)

		return numpy.array([_sy, _sx])


	def __next_state(self, act_index, z):
		if z != 25:
			if act_index == 0 and (z + 1) % 5 != 0:
				z += 1
			elif act_index == 1 and z / 5 != 0:
				z -= 5
			elif act_index == 2 and z % 5 != 0:
				z -= 1
			elif act_index == 3 and z / 5 != 4:
				z += 5
			else:
				z = 25

		return z
area.py
import numpy

class Area:

    def __init__(self, shape, mergin):
        self.shape = shape
        self.mergin = mergin
        self.state = self.__init_state()


# public method
    def update_state(self, mvobj):
        y, x = mvobj.state
        self.state[y, x] = mvobj.value


    def reset_state(self):
        self.state = self.__init_state()


# private method
    def __init_state(self):
        return numpy.zeros(self.shape).astype('uint8')
gameservice.py
from model.area import Area
from model.agent import Agent
from model.target import Target

class GameService:

    def __init__(self):
        'SERVICE'

    def construct(self, area_shape, mergin):
        area = Area(area_shape, mergin)
    	agent1 = Agent(area)
        area.update_state(agent1)
    	agent2 = Agent(area)
        area.update_state(agent2)
    	target = Target(area)
        area.update_state(target)

        return (area, agent1, agent2, target)


    def turn(self, area, agent1, agent2, target, epsilon):
        agent1.lookout(area)
        agent2.lookout(area)
        agent1.act(area, epsilon)
        agent2.act(area, epsilon)
        target.act(area)


    def reset(self, area, agent1, agent2, target):
        area.reset_state()
        agent1.reset_state(area)
        area.update_state(agent1)
        agent2.reset_state(area)
        area.update_state(agent2)
        target.reset_state(area)
        area.update_state(target)
main.py
from service.gameservice import GameService
import numpy
import cv2
from matplotlib import pyplot

if __name__ == '__main__':

	# init
	h = 24
	w = 24
	area_shape = (h, w)
	mergin = 2
	epsilon = 0.3
	alpha = 0.3
	gamma = 0.7
	image_shape = (200, 200)

	# count
	total_plays = 300
	total_steps = numpy.zeros(total_plays).astype('int')
	play = 0
	steps = 0

	# construct
	gameservice = GameService()
	area, agent1, agent2, target = gameservice.construct(area_shape, mergin)

	# video writer
	writer = cv2.VideoWriter('q-learning.mv4', cv2.cv.FOURCC('m', 'p', '4', 'v'), 20, image_shape)

	while True:
		# disp
		print u'play: %d, steps: %d' % (play, steps)

		# act
		gameservice.turn(area, agent1, agent2, target, epsilon)

		# update area and agents' q talbe
		agent1.update_q(area, alpha, gamma)
		agent2.update_q(area, alpha, gamma)

		# show image
		image = cv2.resize(area.state[mergin:h - mergin, mergin:w - mergin], image_shape, interpolation = cv2.INTER_NEAREST)
		writer.write(cv2.cvtColor(image, cv2.COLOR_GRAY2BGR))
		cv2.imshow('', image)
		if cv2.waitKey() == 27:
		 	break

		# refresh state if agents catch the taget
		steps += 1
		if agent1.reward == 3:
			print '\033[32m' + '!!!catch the target!!!' + '\033[0m'
			gameservice.reset(area, agent1, agent2, target)
			agent1.save_q('q1.npy')
			agent2.save_q('q2.npy')
			total_steps[play] = steps
			steps = 0
			play += 1

		# count
		if play == total_plays:
			break

	pyplot.plot(numpy.arange(0, total_plays), total_steps)
	pyplot.savefig('graph.png')
	cv2.destroyAllWindows()
	print '!!!finish!!!'

ポイント

それぞれのエージェントは独立した $Q$ 値を持ちます.
状態 $s$ はエージェントの座標とターゲットの座標で表現します.座標は左上から順に決定しています.
認識領域内にエージェントあるいはターゲットがいない場合を $25$ という数値で表します.
行動は,左下右上の順に $0, 1, 2, 3$ の数値で表現します.
(行動数)$\times$(ターゲットの座標)$\times$(エージェントの座標)となるので,$Q$ 値のサイズは $4 \times 26 \times 26$ となります.
$Q$ 値の更新の際,エージェントやターゲットがどのように行動するかを知ることはできないので,
移動先での周囲の状態は自分の行動にのみ依存し,その行動の分変化するものとしています.

0 1 2 3 4
5 6 7 8 9
10 11 12 13 14
15 16 17 18 19
20 21 22 23 24

↑エージェントの認識する範囲とその座標

結果

グラフ

ターゲットを捕獲するのにかかったステップ数を縦軸に,試行回数を横軸にしたグラフを以下に示します.
独立した $300$ 回の試行を $30$ 回行い,各試行におけるステップ数の平均を縦軸の値としています.
学習が進むにつれて,ターゲットを捕獲するのにかかるステップ数が減少しているのが確認できます.

行動

学習された行動の一例を画像にしました.白色がエージェント,灰色がターゲットです.
ターゲットを追い詰めるような行動を学習できています.
実際は学習がうまくいっていない画像もありますが,考察できていないので割愛します.

おわりに

pythonでマルチエージェント追跡問題を解いてみました.
実装がうまくいっていない点もあるかもしれないので,指摘あればお願いします.
今回エージェントとターゲットが見えている状態での学習結果しか確認していませんが,
どちらか一方しか見えない状態での学習結果も確認してみようと思います.
発見があったら随時追記していきます.

38
35
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
38
35

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?