この記事は、株式会社Jijでのインターンシップ中に作成した記事です。
会社のホームページ j-ij.com
この記事の内容
- PyQUBOでクラスタリングのハミルトニアン→QUBO変換
- OpenJijのSQAsamplerでクラスタリングを実装
- クラスタリングのベンチマークを取得(SA/SQA)
参考資料
ライブラリのインストール
PyQUBO, OpenJijがない人は、インストールしましょう。
※OpenJijは、ver0.0.7とver0.0.8でベンチマーク関数の仕様変更がありました。古いバージョンの方は、アップデートしましょう。
pip install openjij # ver0.0.8
pip install pyqubo
データの準備
平均の異なるガウス分布により、明らかに分割できそうな点の集合を用意します。
# ライブラリをインポート
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
>
# 人工データの生成
data = []
for i in range(100): # 100個のデータを生成
p = np.random.uniform(0, 1)
cls =1 if p>0.5 else -1 #点の配置は平均の異なる2つのガウス分布のどちらかに従う。
data.append(np.random.normal(0, 0.5, 2) + np.array([cls, cls]))
>
df1 = pd.DataFrame(data, columns=["x", "y"], index=range(len(data)))
>
# 可視化
df1.plot(kind='scatter', x="x", y="y")
plt.show()
<シュミレーテッドアニーリングでクラスタリング問題を解く より引用>
PyQUBO+Openjijでの実装
参考にした記事の手法で、クラスタリングができているか確認します。
# ライブラリをインポート
from pyqubo import Array
import openjij as oj
from scipy.spatial import distance_matrix
# 関数を定義
def clustering_openjij(df):
# 距離行列(d_ij)(各サンプル同士の距離を要素に持つ行列)を計算
d_ij = distance_matrix(df, df)
# spinを定義
spin = Array.create("spin", shape= len(df), vartype="SPIN")
# ハミルトニアンを定義
H = - 0.5* sum(
[d_ij[i,j]* (1 - spin[i]* spin[j]) for i in range(len(df)) for j in range(len(df))]
)
# ハミルトニアンをコンパイル
model = H.compile()
# コンパイルしたモデルをqubo化
qubo, offset = model.to_qubo()
# openjijのシュミレーテッドアニーリング(SASampler)で解を求めたいので、サンプラーのインスタンス化
step_num_list = [20, 40, 60, 80, 100]
iteration = 10
for step_num in step_num_list:
sampler = oj.SASampler(beta_max=10.0, beta_min=0.01, step_num=step_num, iteration=iteration)
# 実際に解を求める
response = sampler.sample_qubo(qubo)
# pyquboのdecode_solutionを使いたいのでラベルと解を合わせて辞書化する。
# openjijのsamplerではいくつか解が返ってくるので、response.energiesが最小のものを最適解として採用する。
raw_solution = dict(zip(response.indices, response.states[np.argmin(response.energies)]))
# pyquboのdecoded_solutionでデコードされた解、壊れたスピン、エネルギーを得る
decoded_solution, broken, energy= model.decode_solution(raw_solution, vartype="SPIN")
# 最終的にほしい各点ごとのラベルのリスト
labels = [int(decoded_solution["spin"][idx] ) for idx in range(len(df))]
return labels, response.states[np.argmin(response.energies)], response.energies
クラスタリング用のラベル、最適解のスピン配列、残留エネルギーを表示します。
labels, correct_state, energies =clustering_openjij(df1[["x", "y"]])
print('Optimal Labels : ', labels) # クラスタリングに必要な代表ラベル
print('Correct State : ', correct_state) # 代表ラベルを与えるスピン配列
print('energies : ', energies) # サンプリング10回分の各残留エネルギー
残留エネルギーの値が収束しているのが確認できるので、、解が最適解までたどり着いたとみなしてよいでしょう。
よって、最小エネルギーを取るスピン配列を取得し、プロットに必要なラベルを計算しています。
では、実際にプロットしてみましょう。
for idx, label in enumerate(labels):
if label:
plt.scatter(df1.loc[idx]["x"], df1.loc[idx]["y"], color="b")
else:
plt.scatter(df1.loc[idx]["x"], df1.loc[idx]["y"], color="r")
確かに、クラスタリングできていますね!!
ベンチマーク取得
OpenJijにはデフォルトでTTS(Time To Solution)、残留エネルギー、成功確率を評価してくれる関数が付いています。
これを使って、SAsampler のベンチマークを取得します。
# 最適解は、先ほど計算したので、、
correct_state = correct_state
# ステップ数とアニーリングの反復数を与えます
step_num_list = list(range(20, 101, 20)) #[20, 40, 60, 80, 100]
iteration = 10
# 距離行列(d_ij)(各サンプル同士の距離を要素に持つ行列)を計算
df = df1[["x", "y"]]
d_ij = distance_matrix(df, df)
# spinを定義
spin = Array.create("spin", shape= len(df), vartype="SPIN")
# ハミルトニアンを定義
H = - 0.5* sum(
[d_ij[i,j]* (1 - spin[i]* spin[j]) for i in range(len(df)) for j in range(len(df))]
)
# ハミルトニアンをコンパイル
model = H.compile()
# コンパイルしたモデルをqubo化
qubo, offset = model.to_qubo()
# benchmark 関数で TTS 残留エネルギー 成功確率を計算。
result = oj.solver_benchmark(
solver= lambda time, **args: oj.SASampler(step_num=time, iteration=iteration).sample_qubo(qubo),
time_list=step_num_list, solutions=[correct_state], p_r=0.99
)
TTS(Time To Solution)、残留エネルギー、成功確率はそれぞれ次のようになります。
fig, (axL,axC,axR) = plt.subplots(ncols=3, figsize=(15,3))
plt.subplots_adjust(wspace=0.4)
fontsize = 10
axL.plot(result['time'], result['tts'])
axL.set_xlabel('annealing time', fontsize=fontsize)
axL.set_ylabel('TTS', fontsize=fontsize)
axC.plot(result['time'], result['residual_energy'])
axC.set_xlabel('annealing time', fontsize=fontsize)
axC.set_ylabel('Residual energy', fontsize=fontsize)
axR.plot(result['time'], result['success_prob'])
axR.set_xlabel('annealing time', fontsize=fontsize)
axR.set_ylabel('Success probability', fontsize=fontsize)
TTSと成功確率が増加傾向になりましたね。
残留エネルギーについては、ほとんど同じ値が10回分出力されていましたので、確かにこのグラフの通りになりそうです。
同様のやり方で、シミュレーティド・量子アニーリング(SQA)などのベンチマークも取得できますので、試しに取ってみます。
# benchmark 関数で TTS 残留エネルギー 成功確率を計算。
result = oj.solver_benchmark(
solver= lambda time, **args: oj.SQASampler(step_num=time, iteration=iteration).sample_qubo(qubo),
time_list=step_num_list, solutions=[correct_state], p_r=0.99
fig, (axL,axC,axR) = plt.subplots(ncols=3, figsize=(15,3))
plt.subplots_adjust(wspace=0.4)
fontsize = 10
axL.plot(result['time'], result['tts'])
axL.set_xlabel('annealing time', fontsize=fontsize)
axL.set_ylabel('TTS', fontsize=fontsize)
axC.plot(result['time'], result['residual_energy'])
axC.set_xlabel('annealing time', fontsize=fontsize)
axC.set_ylabel('Residual energy', fontsize=fontsize)
axR.plot(result['time'], result['success_prob'])
axR.set_xlabel('annealing time', fontsize=fontsize)
axR.set_ylabel('Success probability', fontsize=fontsize)
このように、クラスタリングアルゴリズムについてSAやSQAでベンチマークを取得することができました!
ただし、今回のクラスタリングアルゴリズムに用いた最適化関数(ハミルトニアン)は、k-meansに用いられる最適化関数と異なっています。
実際に、従来のソルバーと比較する際には、注意が必要です。