本記事は2019年8月21日の反実仮想モデル勉強会(@サイバーエージェント)での発表のために書きました.内容は最適腕識別における代表的な手法の実装です.
Google Colaboratoryにコードをアップロードしました.
https://colab.research.google.com/drive/1-tDFJgrP2g0U8rOwfMWT9vr3J4Azfti2
概要
使う機能に共通のものが多いので,ベースとなるクラスを定義しておきます.
class BasicPolicy(object):
def __init__(self, num_arm, epsilon=None, T=None, delta=None):
self.num_arm = num_arm
self.sum_of_reward = np.zeros(num_arm)
self.num_of_trial = np.zeros(num_arm)
self.T = T
self.delta = delta
self.epsilon = epsilon
逐次削除方策
一様選択にもとづく逐次削除方策を実装します.
class SR(BasicPolicy):
def __init__(self, num_arm, epsilon, delta):
super().__init__(num_arm, epsilon=epsilon, delta=delta)
self.ucb_score = np.zeros(num_arm)
self.n = 1
self.R = [i for i in range(self.num_arm)]
self.index = 0
self.best_arm = None
self.log = []
def __call__(self):
pull_arm = self.R[self.index]
self.index += 1
return pull_arm
def update(self, arm, reward):
self.sum_of_reward[arm] += reward
self.num_of_trial[arm] += 1
if len(self.R) == self.index:
if len(self.R) > 1:
mean = self.sum_of_reward/self.num_of_trial
beta = np.log((4*self.num_arm*self.n**2)/(self.delta))
ucb_score = mean + np.sqrt(beta/(2*self.n))
lcb_score = mean - np.sqrt(beta/(2*self.n))
arg_max_i = np.argmax(mean)
max_ucb = 0
for i in self.R:
if i != arg_max_i:
if max_ucb < ucb_score[i]:
max_ucb = ucb_score[i]
if lcb_score[arg_max_i] + self.epsilon > max_ucb:
self.best_arm = arg_max_i
else:
for i in self.R:
if i != arg_max_i:
if lcb_score[arg_max_i] > ucb_score[i]:
self.R.remove(i)
self.log.append(len(self.R))
self.index = 0
self.n += 1
逐次削除方策の実験1
腕が5本の場合にどのように腕が削除されていくのかを観察します.まず,擬似データを作成します.
N = 1000000
num_arm = 5
means = np.random.uniform(0, 0.9, size=(num_arm))
means[0] = 1
variances = (1/4)*np.identity(num_arm)
X = np.random.multivariate_normal(means, variances, size=N)
X[X < 0] = 0
X[X > 1] = 1
信頼度$\delta=0.05$,許容度$\epsilon=0.01$で逐次削除方策を上記の擬似データに対して実行します.
delta = 0.05
epsilon = 0.01
sr = SR(num_arm, epsilon=epsilon, delta=delta)
for t in range(N):
sr_arm = sr()
sr.update(sr_arm, X[t, sr_arm])
if sr.best_arm is not None:
break
print(sr.best_arm)
print(t)
結果をプロットします.
plt.plot(sr.log)
少しずつ腕が減っていくことが分かります.
逐次削除方策の実験2
次に腕が100本の場合にどのように腕が削除されていくのかを観察します.まず,擬似データを作成します.
N = 1000000
num_arm = 100
means = np.random.uniform(0, 0.9, size=(num_arm))
means[0] = 1
variances = (1/4)*np.identity(num_arm)
X = np.random.multivariate_normal(means, variances, size=N)
X[X < 0] = 0
X[X > 1] = 1
信頼度$\delta=0.05$,許容度$\epsilon=0.01$で逐次削除方策を上記の擬似データに対して実行します.コードは上で用いたものと同じものなので省略します.結果をプロットすると以下のようになります.
plt.plot(sr.log)
少しずつ腕が減っていくことが分かりますが,ある程度の数になるまでそこそこ時間がかかります.このため腕の数が多いと最適腕の可能性が低い腕までたくさん引くことになります.
LUCB方策
class LUCB(BasicPolicy):
def __init__(self, num_arm, epsilon, delta):
super().__init__(num_arm, epsilon=epsilon, delta=delta)
self.ucb_score = np.zeros(num_arm)
self.t = 1
self.initialize = True
self.initialize_index = 0
self.best_arm = None
self.next_pull_arm = None
self.log = []
def __call__(self):
if self.initialize:
pull_arm = self.initialize_index
self.initialize_index += 1
self.t += 1
if self.num_arm == self.initialize_index:
self.initialize = False
elif self.next_pull_arm is not None:
pull_arm = self.next_pull_arm
self.next_pull_arm = None
self.t += 2
else:
mean = self.sum_of_reward/self.num_of_trial
beta = np.log((5*self.num_arm*self.t**4)/(4*self.delta))
ucb_score = mean + np.sqrt(beta/(2*self.num_of_trial))
lcb_score = mean - np.sqrt(beta/(2*self.num_of_trial))
arg_max_i1 = np.argmax(mean)
ucb_score[arg_max_i1] = -100
arg_max_i2 = np.argmax(ucb_score)
self.log.append(ucb_score[arg_max_i2] - lcb_score[arg_max_i1])
if ucb_score[arg_max_i2] < lcb_score[arg_max_i1] + self.epsilon:
self.best_arm = arg_max_i1
pull_arm = arg_max_i1
self.next_pull_arm = arg_max_i2
return pull_arm
def update(self, arm, reward):
self.sum_of_reward[arm] += reward
self.num_of_trial[arm] += 1
delta = 0.05
epsilon = 0.01
lucb = LUCB(num_arm, epsilon=epsilon, delta=delta)
for t in range(N):
lucb_arm = lucb()
lucb.update(lucb_arm, X[t, lucb_arm])
if lucb.best_arm is not None:
break
print('最適腕', lucb.best_arm)
print('使ったサンプル数', t)
私の実験では最適腕0を正しく返し,使ったサンプルは189,298でした.次にアルゴリズムが停止するまでに各腕を引いた回数のヒストグラムを表示します.
left = np.array([i for i in range(100)])
plt.bar(left, lucb.num_of_trial)
図を見ると最適腕0を引きすぎていることが分かります.これを改善するものが以下のUGapEc方策です.
UGapEc方策
LUCB方策を改良したアルゴリズムです.性能は良くなるのですが,ハイパーパラメータの$b$の値を変えると大きく結果が変わります.
class UGapEc(BasicPolicy):
def __init__(self, num_arm, epsilon, delta, b=0.5):
super().__init__(num_arm, epsilon=epsilon, delta=delta)
self.ucb_score = np.zeros(num_arm)
self.t = 1
self.initialize = True
self.initialize_index = 0
self.best_arm = None
self.next_pull_arm = None
self.B = np.zeros(self.num_arm)
self.b = b
self.log = []
def __call__(self):
if self.initialize:
pull_arm = self.initialize_index
self.initialize_index += 1
self.t += 1
if self.num_arm == self.initialize_index:
self.initialize = False
else:
mean = self.sum_of_reward/self.num_of_trial
nume = np.log((4*self.num_arm*self.t**3)/(self.delta))/2
beta = self.b*np.sqrt(nume/(self.num_of_trial))
ucb_score = mean + beta
lcb_score = mean - beta
for i in range(self.num_arm):
temp_value = 100
for j in range(self.num_arm):
if i != j:
bk = ucb_score[j]
if temp_value > bk:
temp_value = bk - lcb_score[i]
self.B[i] = temp_value
J = np.min(self.B)
self.log.append(np.max(beta[self.B == J]))
if np.all(beta[self.B == J] < self.epsilon) == True:
self.best_arm = np.where(self.B == J)
temp_value0 = -100
temp_value1 = 100
for i in range(self.num_arm):
if self.B[i] == J:
if temp_value1 > lcb_score[i]:
temp_value1 = lcb_score[i]
L = i
else:
if temp_value0 < ucb_score[i]:
temp_value0 = ucb_score[i]
U = i
if beta[L] < beta[U]:
pull_arm = U
else:
pull_arm = L
self.t += 1
return pull_arm
def update(self, arm, reward):
self.sum_of_reward[arm] += reward
self.num_of_trial[arm] += 1
delta = 0.05
epsilon = 0.01
ugapec = UGapEc(num_arm, epsilon=epsilon, delta=delta)
for t in range(N):
ugapec_arm = ugapec()
ugapec.update(ugapec_arm, X[t, ugapec_arm])
if ugapec.best_arm is not None:
break
print(ugapec.best_arm)
print(t)
私の実験では最適腕の集合(array([0]),)を正しく返し,使ったサンプルは180,716でした.次にアルゴリズムが停止するまでに各腕を引いた回数のヒストグラムを表示します.
left = np.array([i for i in range(100)])
plt.bar(left, lucb.num_of_trial)
最適腕0を引きすぎないようになっています.
LUCB方策とUGapEc方策の比較
最適腕のLCBと次善の腕のUCBとの差がどのぐらいの速さで小さくなるかを検証します.
plt.ylabel('LCB of the candidate of the best arm - UCB')
plt.xlabel('Number of Samples')
plt.ylim(0, 0.5)
plt.plot(lucb.log, label='LUCB')
plt.plot(ugapec.log, label='UGapEc')
plt.legend()
UGapEc方策の差の方が圧倒的速さで収束しているのが分かります.