15
10

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

多腕バンディット問題:固定信頼度のもとでの最適腕識別の手法

Last updated at Posted at 2019-08-18

本記事は2019年8月21日の反実仮想モデル勉強会(@サイバーエージェント)での発表のために書きました.内容は最適腕識別における代表的な手法の実装です.

Google Colaboratoryにコードをアップロードしました.
https://colab.research.google.com/drive/1-tDFJgrP2g0U8rOwfMWT9vr3J4Azfti2

概要

使う機能に共通のものが多いので,ベースとなるクラスを定義しておきます.

class BasicPolicy(object):
    def __init__(self, num_arm, epsilon=None, T=None, delta=None):
        self.num_arm = num_arm
        self.sum_of_reward = np.zeros(num_arm)
        self.num_of_trial = np.zeros(num_arm)
        self.T = T
        self.delta = delta
        self.epsilon = epsilon

逐次削除方策

一様選択にもとづく逐次削除方策を実装します.

class SR(BasicPolicy):
    def __init__(self, num_arm, epsilon, delta):
        super().__init__(num_arm, epsilon=epsilon, delta=delta)
        self.ucb_score = np.zeros(num_arm)
        self.n = 1
        self.R = [i for i in range(self.num_arm)]
        self.index = 0
        self.best_arm = None
        self.log = []

    def __call__(self):
        pull_arm = self.R[self.index]
        self.index += 1
            
        return pull_arm

    def update(self, arm, reward):
        self.sum_of_reward[arm] += reward
        self.num_of_trial[arm] += 1
        
        if len(self.R) == self.index:
            if len(self.R) > 1:
                mean = self.sum_of_reward/self.num_of_trial
                beta = np.log((4*self.num_arm*self.n**2)/(self.delta))
                ucb_score = mean + np.sqrt(beta/(2*self.n))
                lcb_score = mean - np.sqrt(beta/(2*self.n))
                
            arg_max_i = np.argmax(mean)
                
            max_ucb = 0
            for i in self.R:
                if i != arg_max_i:
                    if max_ucb < ucb_score[i]:
                        max_ucb = ucb_score[i]
                        
            if lcb_score[arg_max_i] + self.epsilon > max_ucb:
                self.best_arm = arg_max_i
            else:
                for i in self.R:
                    if i != arg_max_i:
                        if lcb_score[arg_max_i] > ucb_score[i]:
                             self.R.remove(i)
                                
            self.log.append(len(self.R))
            self.index = 0
            self.n += 1

逐次削除方策の実験1

腕が5本の場合にどのように腕が削除されていくのかを観察します.まず,擬似データを作成します.

N = 1000000
num_arm = 5
means = np.random.uniform(0, 0.9, size=(num_arm))
means[0] = 1
variances = (1/4)*np.identity(num_arm)
X = np.random.multivariate_normal(means, variances, size=N)
X[X < 0] = 0
X[X > 1] = 1

信頼度$\delta=0.05$,許容度$\epsilon=0.01$で逐次削除方策を上記の擬似データに対して実行します.

delta = 0.05
epsilon = 0.01

sr = SR(num_arm, epsilon=epsilon, delta=delta)

for t in range(N):
    sr_arm = sr()    
    sr.update(sr_arm, X[t, sr_arm])
    
    if sr.best_arm is not None:
        break
    
print(sr.best_arm)
print(t)

結果をプロットします.

plt.plot(sr.log)

image.png

少しずつ腕が減っていくことが分かります.

逐次削除方策の実験2

次に腕が100本の場合にどのように腕が削除されていくのかを観察します.まず,擬似データを作成します.

N = 1000000
num_arm = 100
means = np.random.uniform(0, 0.9, size=(num_arm))
means[0] = 1
variances = (1/4)*np.identity(num_arm)
X = np.random.multivariate_normal(means, variances, size=N)
X[X < 0] = 0
X[X > 1] = 1

信頼度$\delta=0.05$,許容度$\epsilon=0.01$で逐次削除方策を上記の擬似データに対して実行します.コードは上で用いたものと同じものなので省略します.結果をプロットすると以下のようになります.

plt.plot(sr.log)

image.png

少しずつ腕が減っていくことが分かりますが,ある程度の数になるまでそこそこ時間がかかります.このため腕の数が多いと最適腕の可能性が低い腕までたくさん引くことになります.

LUCB方策

class LUCB(BasicPolicy):
    def __init__(self, num_arm, epsilon, delta):
        super().__init__(num_arm, epsilon=epsilon, delta=delta)
        self.ucb_score = np.zeros(num_arm)
        self.t = 1
        self.initialize = True
        self.initialize_index = 0
        self.best_arm = None
        self.next_pull_arm = None
        self.log = []

    def __call__(self):
        if self.initialize:
            pull_arm = self.initialize_index
            self.initialize_index += 1
            
            self.t += 1
            
            if self.num_arm == self.initialize_index:
                self.initialize = False
                
        elif self.next_pull_arm is not None:
            pull_arm = self.next_pull_arm
            self.next_pull_arm = None
            self.t += 2
        
        else:
            mean = self.sum_of_reward/self.num_of_trial
            beta = np.log((5*self.num_arm*self.t**4)/(4*self.delta))
            ucb_score = mean + np.sqrt(beta/(2*self.num_of_trial))
            lcb_score = mean - np.sqrt(beta/(2*self.num_of_trial))
            
            arg_max_i1 = np.argmax(mean)
            
            ucb_score[arg_max_i1] = -100
            arg_max_i2 = np.argmax(ucb_score)
                            
            self.log.append(ucb_score[arg_max_i2] - lcb_score[arg_max_i1])
            if ucb_score[arg_max_i2] < lcb_score[arg_max_i1] + self.epsilon:
                self.best_arm = arg_max_i1

            pull_arm = arg_max_i1
            self.next_pull_arm = arg_max_i2
                                    
        return pull_arm

    def update(self, arm, reward):
        self.sum_of_reward[arm] += reward
        self.num_of_trial[arm] += 1
delta = 0.05
epsilon = 0.01

lucb = LUCB(num_arm, epsilon=epsilon, delta=delta)

for t in range(N):
    lucb_arm = lucb()
    lucb.update(lucb_arm, X[t, lucb_arm])
    
    if lucb.best_arm is not None:
        break
    
print('最適腕', lucb.best_arm)
print('使ったサンプル数', t)

私の実験では最適腕0を正しく返し,使ったサンプルは189,298でした.次にアルゴリズムが停止するまでに各腕を引いた回数のヒストグラムを表示します.

left = np.array([i for i in range(100)])
plt.bar(left, lucb.num_of_trial)

image.png

図を見ると最適腕0を引きすぎていることが分かります.これを改善するものが以下のUGapEc方策です.

UGapEc方策

LUCB方策を改良したアルゴリズムです.性能は良くなるのですが,ハイパーパラメータの$b$の値を変えると大きく結果が変わります.

class UGapEc(BasicPolicy):
    def __init__(self, num_arm, epsilon, delta, b=0.5):
        super().__init__(num_arm, epsilon=epsilon, delta=delta)
        self.ucb_score = np.zeros(num_arm)
        self.t = 1
        self.initialize = True
        self.initialize_index = 0
        self.best_arm = None
        self.next_pull_arm = None
        self.B = np.zeros(self.num_arm)
        self.b = b
        self.log = []

    def __call__(self):
        if self.initialize:
            pull_arm = self.initialize_index
            self.initialize_index += 1
            
            self.t += 1
            
            if self.num_arm == self.initialize_index:
                self.initialize = False
        
        else:
            mean = self.sum_of_reward/self.num_of_trial
            nume = np.log((4*self.num_arm*self.t**3)/(self.delta))/2
            beta = self.b*np.sqrt(nume/(self.num_of_trial))
            ucb_score = mean + beta
            lcb_score = mean - beta
                        
            for i in range(self.num_arm):
                temp_value = 100
                for j in range(self.num_arm):
                    if i != j:
                        bk = ucb_score[j]
                        if temp_value > bk:
                            temp_value = bk  - lcb_score[i]
                self.B[i] = temp_value
            
            J = np.min(self.B)
            
            self.log.append(np.max(beta[self.B == J]))
            if np.all(beta[self.B == J] < self.epsilon) == True:
                self.best_arm = np.where(self.B == J)
            
            temp_value0 = -100
            temp_value1 = 100
            
            for i in range(self.num_arm):
                if self.B[i] == J:
                    if temp_value1 > lcb_score[i]:
                        temp_value1 = lcb_score[i]
                        L = i
                else:
                    if temp_value0 < ucb_score[i]:
                        temp_value0 = ucb_score[i]
                        U = i
                
            if beta[L] < beta[U]:
                pull_arm = U
            else:
                pull_arm = L
                
            self.t += 1
                                    
        return pull_arm

    def update(self, arm, reward):
        self.sum_of_reward[arm] += reward
        self.num_of_trial[arm] += 1
delta = 0.05
epsilon = 0.01

ugapec = UGapEc(num_arm, epsilon=epsilon, delta=delta)

for t in range(N):
    ugapec_arm = ugapec()
    ugapec.update(ugapec_arm, X[t, ugapec_arm])
    
    if ugapec.best_arm is not None:
        break
    
print(ugapec.best_arm)
print(t)

私の実験では最適腕の集合(array([0]),)を正しく返し,使ったサンプルは180,716でした.次にアルゴリズムが停止するまでに各腕を引いた回数のヒストグラムを表示します.

left = np.array([i for i in range(100)])
plt.bar(left, lucb.num_of_trial)

image.png

最適腕0を引きすぎないようになっています.

LUCB方策とUGapEc方策の比較

最適腕のLCBと次善の腕のUCBとの差がどのぐらいの速さで小さくなるかを検証します.

plt.ylabel('LCB of the candidate of the best arm - UCB')
plt.xlabel('Number of Samples')
plt.ylim(0, 0.5)
plt.plot(lucb.log, label='LUCB')
plt.plot(ugapec.log, label='UGapEc')
plt.legend()

image.png

UGapEc方策の差の方が圧倒的速さで収束しているのが分かります.

15
10
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
15
10

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?