9
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

カートポール問題を量子強化学習で解く(QiskitとPytorchで実装に挑戦)

Last updated at Posted at 2023-03-20

まえがき

量子ソフトウェア研究拠点主催の量子ソフトウェア勉強会のグループワークに参加し、量子強化学習に取り組んでみました。
量子強化学習は、量子回路を含むモデルで強化学習を行う手法です。
すでに、量子強化学習によりカートポール問題を解く記事がTensorflow Quantumのチュートリアルにおいて公開されていたので、
Parametrized Quantum Circuits for Reinforcement Learning
ワークではそちらを別のライブラリを用いて追実装してみることにしました。
※カートポール問題は強化学習でベンチマークとして扱われる典型的な問題です。

今回は、その追実装のためにQiskitとPytorchを用いてカートポール問題を解いてみることにしました。QiskitとPytorchを選んだのは、それらのライブラリを触ったことがあり、多少使い勝手がわかっているからです。最終的に実装したコードを記事の末尾に乗せました(2023年当時のものなのでライブラリのバージョンアップによりコードが動かない箇所があるかもしれません(2025年追記))。

本題の量子強化学習の説明に入る前に、まず強化学習の説明をしようと思います。
注:本記事では、量子コンピュータの基礎や機械学習の基礎は説明していないため、適宜他の情報源を参照いただければと思います。

強化学習とは

強化学習は、機械学習のアルゴリズムの一種です。
強化学習では、事前に教師データが与えられていない中で、ある環境においてどのような行動を起こすのが最適か(どの方策が最適か)、エージェントと環境の相互作用を通じて学習します。エージェントは環境の中で行動を起こし、その行動の良し悪しに応じて環境から報酬または罰則という形でフィードバックを受けます。このフィードバックについて未来にとる行動を考慮して、フィードバックの総和の期待値(価値)を求め、それを最大化する方策をエージェントに学習させます。

この強化学習の中にも、いくつかのアルゴリズムがあります。
アルゴリズムは、価値ベースのアルゴリズムと方策ベースのアルゴリズムにわけて説明されることがあります。
価値ベースのアルゴリズムとは、価値(環境からのフィードバックの総和の期待値)を最大化するように方策を決定するアルゴリズムです。
対して、方策ベースのアルゴリズムとは、直接方策を最適化するアルゴリズムです。
ふんわりとした説明ですので、より詳しくは、このQittaQiitaの記事などを必要に応じてご参照ください。
深層強化学習アルゴリズムまとめ
以下の強化学習のアルゴリズムの説明においても、この記事を大いに参考にしております。

Q学習とは

今回の量子強化学習では、価値ベースのアルゴリズムの一つであるQ学習と量子コンピューティングを組み合わせます。
Q学習は、行動価値関数$Q(s,a)$を最大化するように学習を進めます。行動価値関数$Q(s,a)$とは、ある状態$s$においてある行動$a$をとったときの点数(価値)のことです。この点数は、その行動をとった時点のフィードバックだけでなく、未来に受け取るフィードバックについても勘案しています。
Q学習における$Q(s,a)$のアップデート方法はTD法として知られています。現在の状態における$Q(s_t,a_t)$を次の状態における行動価値関数の最大値 $\max_aQ(s_{t+1},a)$を使って更新します。
数式で書くと、以下のようになります。

Q(s_t,a_t) \leftarrow Q(s_t,a_t) + \alpha \{r_t + \gamma \max_aQ(s_{t+1},a) - Q(s_t,a_t) \}

$s_t, a_t, r_t, s_{t+1}$は、それぞれ、時点$t$での状態、その時とった行動と報酬、そして次の状態であり、環境での行動を通じてサンプリングして得られるものです。$\alpha, \gamma$は学習時のパラメータとなります。
「なぜこの式で行動価値関数$Q(s,a)$を更新して最大化できるのか」については説明を割愛しますが、
ベルマン最適方程式とQ学習というキーワードで調べていただくと理解が深まるかと思います。

さて、この$Q(s,a)$をプログラム上で表すことを考えます。このとき、状態$s$と行動$a$を指定して要素が決まる2次元配列として表現できますが、状態$s$が多数あるときや連続的な状態$s$を環境が持つ場合では、そのような表現は配列のサイズを考慮すると現実的ではないです。そのため、$Q(s,a)$をニューラルネットワークで近似的に置き換える深層Qネットワーク(DQN)やその派生がQ学習で頻繁に用いられます。今回も、深層Qネットワークと量子コンピューティングを組み合わせることで、量子Q学習を実現しようと思うので、DQNについてもう少し深堀します。

深層Qネットワーク(DQN)とは

DQNとは

DQNは、最適な行動価値関数を推定するために深層ニューラルネットワークを使用するものです。
例えば、カートポール問題では図1ような入出力を持つ深層ニューラルネットワークを利用します。

DQNre.jpg
図1 第14回 深層強化学習DQN(Deep Q-Network)の解説 からDQNの模式図をお借りしました。

このようにニューラルネットワークをDQNで使用するため、その学習はニューラルネットワークの出力と目的値の誤差を小さくするように、パラメータを更新していく必要があります。そこで、先ほどの$Q(s,a)$の更新式を参考にして、図1の画像中にもありますが、次のような誤差関数を最小化するように学習を進めます。

L(w) = ( r_t + \gamma \max_aQ(s_{t+1},a;w) - Q(s_t,a_t;w) )^2

もちろん2乗誤差以外にも誤差関数の形を選ぶことができ、2乗誤差と似ていますが2乗誤差よりも誤差評価が緩やかなHuber関数もDQNの誤差関数として利用できます。今回もHuber関数を誤差関数として利用します。

DQNで利用される典型的なテクニック

DQNでは、単にQ関数をニューラルネットワークで表すだけでなく、アルゴリズムの安定性と収束性を向上させるために、経験再生とターゲットネットワークという工夫を行うのが常です。経験再生は過去の経験からまんべんなく学習することを可能にし、ターゲットネットワークは学習プロセスを安定化させるのに役立ちます。

それぞれのテクニックについてより詳細に説明すると、

経験リプレイ

経験リプレイは、環境と相互作用するエージェントの過去の経験(すなわち、状態、行動、報酬、次の状態)をリプレイバッファ保存します。学習中、モデルはリプレイバッファから経験をバッチでサンプリングし、それを使って重みを更新します。この手法により、データを非相関化し、直近の経験に過剰適合するリスクを低減することができます。

ターゲットネットワーク

ターゲットネットワークは、DQNのニューラルネットワーク(メインネットワーク)を定期的にコピーして作成します。学習時において、ターゲットネットワークの出力をメインネットワークの目的値に利用します。ターゲットネットワークの更新頻度をメインネットワークの更新頻度よりも落とすことで、学習時に目的値がぶれにくくなり、学習プロセスを安定化します。
数式で表現すると、誤差関数のかっこ内部の左側のQ関数がメインネットワークの目標になっており、これをパラメータを$w^-$で表現したターゲットネットワークで表現します。

L(w) = ( r_t + \gamma \max_aQ(s_{t+1},a;w^-) - Q(s_t,a_t;w) )^2

これらのテクニックも今回使用します。

量子強化学習とは

さて、ようやくタイトルにある量子強化学習の説明に入ります。
量子強化学習では、強化学習モデル(の一部)を量子回路による期待値計算に置き換えて学習を行います。
量子Q学習であれば、深層Qネットワーク(DQN)において、Q関数を表現するニューラルネットワークを量子回路による期待値計算で置き換えることになります。

古典的には先ほどの図1のようなニューラルネットワークを教師学習のモデルとするところを、量子的には図2のような量子回路から期待値を計算をするモデルに置き換えます。

回路.png
図2 量子回路(この回路をカートポール問題に使います)

カートポール問題

今回取り扱う量子Q学習のモデルの詳細を理解するためには、カートポール問題について知らなければなりません。
そこで、モデルの詳細説明の前に、カートポール問題について説明します。

カートポール問題は、制御理論の分野でよく知られている強化学習の問題の1つであり、カート上の不安定な棒のバランスを制御することを目的としています。
具体的には、長い棒がカートに垂直に立てて取り付けられており、その棒が倒れないようにカートを水平方向に動かして制御します。
このカートポールをプログラムで利用するために、OpenAIから強化学習用の環境がライブラリとして提供されているので、それをインポートして利用します(OpenAI Gym CartPole

OpenAI Gym CartPoleの環境では、
カートポールの状態が”カートの位置”、”カートの速度”、”ポールの角度”、”ポールの角速度”の4つの要素で定義されています。
カートの動作が、カートを左右に押す2つの動きで定義されています。この押す動きによる速度の増減は一定ではなく、ポールの角度で変化します。
そして、カートポールの報酬には、できるだけ長く棒を絶たせておくことをカートポールの目標としているので、棒が立っている間の毎度のステップに対して+1の報酬が割り当てられています。
このような環境を持つカートポール問題をDQNで解く際には、カートポールの状態とカートの動作がDQNの入力と出力になっていることが、図1を見るとわかります。

OpenAI Gym CartPoleには、v0とv1の二つのバージョンがありますが、今回はv1のバージョンを利用します。
v1では、カートポールが以下の状態になると、ポールの制御タスクを終了します。
・ポールの角度が±12°より大きい。
・カートの位置が±2.4以上(カートの中心がディスプレイの端に達する)
・エピソードの長さが500(v0は200)を超える。
※エピソードとは、ポール制御タスクの開始から終了までを指す言葉です。

今回取り扱う量子Q学習のモデルの詳細

このカートポール問題を解くために用いる量子Q学習のモデルについて説明します。

モデルに使う量子回路

使う量子回路は図2として載せたもので、Tensorflow Quantumのチュートリアルで使用されていた回路をそのまま流用しています。
学習に用いる量子回路なので、入力を受け取る回転ゲートと、学習対象となるパラメータを持つ回転ゲートが配置されています。
量子ビットは、カートポールの状態を定義する4要素を量子ビット1つ1つに対して入力できるようにするため、カートポールの状態の数と同じく4個利用されます。

回路.png
図2 量子回路(再掲)

モデルの工夫

re-Uploading

量子回路内において、学習対象となるパラメータを持つ回転ゲートが複数個あるのは自然なことですが、それと同時に入力を受け取る回転ゲートも複数個あります。これは、re-Uploadingと呼ばれるテクニックで回路の表現力を向上させるために使用されています。具体的には、用意した量子ビットにカートポールの状態を入力する際に、カートポールの状態の各要素を$\arctan$関数で角度にしたうえで、対応する量子ビットのRxゲート(input_i_j)の回転角として、複数回回路に入力しています。

Outputへの正規化と重みづけ

図2の量子回路からカートの動作を出力するためには、2種類の期待値 $ < Z_0 Z_1 > $ と $ <Z_1 Z_2> $ を計算します(説明のため図3を追加しました。)
この期待値を単純に出力すると-1 ~ 1 の値しか取れないので、それぞれの期待値について0 ~ 1の値に正規化してさらに学習可能な重み$Wo_i$ を掛け合わせます。量子Q学習ではQ関数を最大化しようとしているので、Q関数が頭打ちになることを避けるため、このoutputへの正規化と重みづけが導入されています。
また、学習可能な重みを使用することで、重みを固定した場合に比べて、初期パラメータのランダムネスやハイパーパラメータの選び方に敏感でなくなるというメリットがあります。

Inputへの重みづけ

量子Q学習のモデルについて、Tensorflow Quantumのチュートリアルや参考論文などを見ると、もう一点工夫がなされています。
それは、モデルのinputデータ、つまり、カートポールの状態への学習可能な重みづけです。図3上、$Wi_i$と表現しています。
この重みを導入することで、モデルの表現力が増すと論文で報告されています。

回路2.png
図3 量子Q学習のモデル。図2に、量子回路への入力と量子回路からの出力を追加しました。

モデルの学習方法

量子Q学習においても、DQNとモデルは異なれど、学習方針は同じです。
再掲となりますが、以下の誤差関数を小さくするように、パラメータを更新していきます。

L(w) = ( r_t + \gamma \max_aQ(s_{t+1},a;w^-) - Q(s_t,a_t;w) )^2

パラメータの更新は、勾配降下法をもとに行います。
勾配を用いない最適化もあるとは思うのですが、あまり詳しくなく今回はこの方式を採用します。

パラメータ付き量子回路で、パラメータの変化に対する出力の変化(つまり勾配)を求めるための方法には、
パラメータシフト法を利用します。以下の式がパラメータシフトによる勾配計算の式です。

\frac{\partial\langle\psi\left(\theta\right)|\hat{O}\left(\omega\right)|\psi\left(\theta\right)\rangle}{\partial\theta} =  \left(\langle\psi\left(\theta+\pi/2\right)|\hat{O}\left(\omega\right)|\psi\left(\theta+\pi/2\right)\rangle -\langle\psi\left(\theta-\pi/2\right)|\hat{O}\left(\omega\right)|\psi\left(\theta-\pi/2\right)\rangle\right) / 2.

一見すると、普通の差分で勾配を計算しているように見えますが、パラメータの差分のサイズが微小量ではなく$\pi/2$となっています。
詳しい証明は、別の資料をご確認いただきたいです(Quantum Circuit Learning)。

この手法のミソは、差分のサイズが大きくなることで、勾配計算が安定して取得できることにあります。
GYUDONさんが、Pytorchで前進差分法の実装を行っていたので、私のコードでもそちらを参考にして実装しています(PyTorch 計算グラフを ぶった切り)。

入出力の重みの最適化は、誤差逆伝搬法(BackPropagation)により行います。

結果

量子Q学習でカートポール問題に取り組んだ結果を図4に示します。

matome2.png
図4 1000エピソードの間カートポール問題に取り組んだ結果(3回トライ)。右下の図は3回の平均です。
   横軸はエピソード数、縦軸は棒を立て続けていられたステップ数(期間)を表しています。

以下のようなパラメータと初期値の設定で、3回、1000エピソードの間カートポール問題に取り組んでいます。

【パラメータ】
量子回路の深さ(回転ゲートの繰り返しの数) : 5
Q関数更新に必要なハイパーパラメータ $\gamma$ : 0.99
学習を行う頻度 : 1回/10ステップ
ターゲットネットワークと同期を行う頻度 : 1回/30 ステップ
1回の学習で扱うデータセット数(BATCH SIZE) : 16
経験再生のメモリのサイズ : 10000
データセットをためるときにランダムに行動する確率$\epsilon$の最大値 : 1.0
データセットをためるときにランダムに行動する確率$\epsilon$の最小値$\epsilon$ : 0.01
データセットをためるときにランダムに行動する確率$\epsilon$が減少する割合 : 0.99/episode
入力の重みの学習率 : 1e-3 (AdamW)
量子回路のパラメータの重みの学習率 : 1e-3 (AdamW)
出力の重みの学習率 : 1e-1 (AdamW)

【初期値】
入力の重み : すべて1.0
量子回路のパラメータ : すべて0 ~ 3.14のランダムな値(一様分布に従う)
出力の重み : すべて1.0

このパラメータと初期値設定は、同様の量子回路で問題を解いているTensor Quantum flowチュートリアルの設定とほとんどが一致しています。
異なる点は、
・学習コストを抑えるため、入力の重みの最適化を行っていない点(実質的には入力の重みがない状態です)。
・オプティマイザーがAdamWの点(Tensor Quantum のチュートリアルではAdam)
入力の重みの最適化を行わなかった理由は、コードが洗練されておらず、学習スピードがとても遅いためです。
入力の重みの最適化を取り除いても、1000エピソードの学習に数日かかりました。
オプティマイザーが異なる理由は、ただ単に合わせたつもりが合わせられていなかったという、実装ミスです。。。

上記のパラメータ・初期値の設定と学習方法では、残念ながら一度もカートポール問題で最高点数の500ステップを達成することができませんでした。
3トライの結果がかなりばらついていることから、量子Q学習のパフォーマンスは不安定だと推察されますが、図5に示すようにTensorflow Quantumのチュートリアルの実装では、数回試した限りだと、そのすべての試行において最高点数の500ステップを継続的に獲得できるまで成長していました。

output_sSRMtk-swpUe_0.png
図 5 Tensorflow Quantumを使ってカートポール問題に取り組んだ結果

カートポール問題でTensorflow Quantumのチュートリアルと同様の結果(500ステップの間ポールを立て続けることに達成)ができなかった理由を少し考えますと以下のような理由が考えられます。
・私の方の学習で入力の重みの最適化を組み込んでいない
・オプティマイザーAdamWに適した学習率を選べなかった
・たまたまパフォーマンスが下振れした
・コードにバグがある

テストできるコード、もしくは、品質が保証されたライブラリを使いたかった。。。

参考論文 (Quantum agents in the Gym: a variational quantum algorithm for deep Q-learning)

参考にした論文の情報では、カートポール問題以外にもFlozen lakeという別の問題にDQNの量子版(量子Q学習とこの記事で呼んでいるもの)で取り組んでいますが、ここではカートポール問題への取り組みに注目して実験結果をまとめます。

古典DQNと量子Q学習の比較

実験では、古典DQNと量子Q学習の二つのモデルでカートポール問題に取り組んでいました。
両タイプのモデルにおいて、ハイパーパラメータの値を変えながら、それぞれ10回の試行が行われてモデルが評価されました。
モデルは、モデルがタスククリアまでに必要としたエピソード数が少ないほど、パフォーマンスが高いと評価されます。
このカートポール問題のタスククリアの合格条件は、Tensorflowのチュートリアルや今回私が取り組んだカートポール問題よりも低く設定され、200ステップの間棒を立てておけばクリアとなっています。

各タイプ各設定で、10回分の試行を平均した結果が図5で示したものです。

DQN_PQC_VS_NN_1.png
図6 古典DQNと量子Q学習でハイパーパラメータを変えてパフォーマンスを比較した結果

古典DQNと量子Q学習のベストパフォーマンスを比較したときに、
古典モデル((20,20),562パラメータ)では、量子モデル(25, 302パラメータ)に比べて、約半分のエピソードで学習を完了しています。
ただし、この古典モデルは、比較した量子モデルよりも倍のパラメータを持っています。

次に、10回分の平均ではなく、1回の試行において、各モデルのベストパフォーマンスを比較した図6を示します。
DQN_PQC_VS_NN_2.png
図7 古典DQNと量子Q学習において、ベストパフォーマンスであった試行を比較

この場合、量子と古典で、ほぼパフォーマンスに差はないです。
そして、量子モデルは、比較した古典モデルの$1/20$ぐらいのパラメータしか持っていません。

量子モデルは試行ごとのパフォーマンスのばらつきが大きいものの、軽量なモデルで課題を解けるという強みがありそうです。

まとめ

今回は、量子強化学習の理解を深めるために、カートポール問題と量子Q学習について調べ、量子Q学習のコードをQiskitとPytorchで実装できないか挑戦しました。
調べた参考論文では、量子モデルは古典モデルに比べて試行ごとのパフォーマンスのばらつきが大きいものの、軽量なモデルでベンチマーク課題が解けることが報告されていました。また、古典と量子のベストパフォーマンスの試行を比較したとき、違いがほぼ見られませんでした。量子モデルをベストパフォーマンスで安定化させることができれば、古典よりも優位なモデルとなり、魅力的になると思います。現状の研究でもパフォーマンスの安定化について、多くの知見があると思いますので、調べてみる価値はありそうです。
この量子Q学習について、私もQiskitとPytorchでコードを追実装しましたが、Tensorflow Quantumのチュートリアルと同等の結果が得られませんでした。自身の実装には、実行速度が遅い、バグが取り除けているかわからない、という課題があるので、実行速度が速いライブラリQulacsや量子機械学習のライブラリscikit-qulacsなどを使い、その点を解決していきたいです。
満足いく結果にはならなかったものの、量子コンピュータと機械学習の両面で、学ぶことが大きい取り組みでした。

参考

当時(2023年)に作成したコードです。Goole Colabで無料版で実行できました。

サンプルコード
# %%bash
!pip3 install gym[classic_control]
!pip install qiskit[visualization]
!pip install torchvision
!pip install pylatexenc

from google.colab import drive
drive.mount('/content/drive')

import os

import numpy as np
import matplotlib
import matplotlib.pyplot as plt
%matplotlib inline

import qiskit
from qiskit.quantum_info.operators import Operator, Pauli
from qiskit.quantum_info import Statevector

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import Function

import math
import pickle
 
# if gpu is to be used
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

量子回路のクラスを用意。
次の回路は、入力データをゲートパラメータとして複数回取り込む。Re-Uploadingという。

class ReUploadingPQC:
    """ 
    This class provides a simple interface for interaction 
    with this runnable quantum circuit 
    """
    
    def __init__(self, n_qubits, c_depth, backend, shots):
        # --- Circuit definition ---
        self.n_qubits = n_qubits
        self.c_depth = c_depth
        self.backend = backend
        self.shots = shots
        self.num_parameters = 0 # it will be inclimented when you add a parmetrized circuit.
                                # See the method get_parameterized_circuit.

        # prepare small circuits that are components to create the whole reuploading circuit
        self.PQC_list = []
        self.parameters = []
        self.input_circuit_list = []
        self.input_parameters = []
        for d in range(c_depth):
            self.PQC_list.append(self.get_parameterized_circuit())
            for param in self.PQC_list[-1].parameters: # appendしたcircuitのparameters
                self.parameters.append(param) 
            self.PQC_list.append(self.get_CZs_circuit())
            self.input_circuit_list.append(self.get_input_circuit(d))
            for input_param in self.input_circuit_list[-1].parameters:
                self.input_parameters.append(input_param)
        self.PQC_list.append(self.get_parameterized_circuit())
        for param in self.PQC_list[-1].parameters:
            self.parameters.append(param)

        # create the whole reuploading circuit
        self._circuit = self.compose_circuit()
        self._circuit = qiskit.transpile(self._circuit) 

    def get_input_circuit(self, depth):
        qc = qiskit.QuantumCircuit(self.n_qubits) 
        for i in range(self.n_qubits):
            qc.rx(qiskit.circuit.Parameter("input_" + str(depth) + "_" + str(i)), i) # input_depht_posofqubit
        return qc

    def get_parameterized_circuit(self):
        qc = qiskit.QuantumCircuit(self.n_qubits)
        for i in range(self.n_qubits):
            qc.rx(qiskit.circuit.Parameter("'" + str(self.num_parameters + qc.num_parameters) + "'"), i)
            qc.ry(qiskit.circuit.Parameter("'" + str(self.num_parameters + qc.num_parameters) + "'"), i)
            qc.rz(qiskit.circuit.Parameter("'" + str(self.num_parameters + qc.num_parameters) + "'"), i)
        self.num_parameters += qc.num_parameters # counting parameters
        return qc

    def get_CZs_circuit(self):
        qc = qiskit.QuantumCircuit(self.n_qubits)
        for i in range(self.n_qubits):
            qc.cz(i, (i+1)%self.n_qubits)
        return qc

    def compose_circuit(self):
        qc = qiskit.QuantumCircuit(self.n_qubits)
        for d in range(self.c_depth):
            qc.compose(self.PQC_list[d*2], inplace=True)
            qc.compose(self.PQC_list[d*2 + 1], inplace=True)
            qc.barrier()
            qc.compose(self.input_circuit_list[d], inplace=True)
        qc.compose(self.PQC_list[-1], inplace=True)
        # qc.measure_all() # if you get a expectation value by sampling circuits result, you need to measure the qubits,
        return qc

    def run(self, x, thetas):
        params = self.parameters
        input_params = self.input_parameters
        keys = params + input_params
        values = thetas + x
        d = dict([(key, value) for key, value in zip(keys, values)])

        backend = qiskit.Aer.get_backend('statevector_simulator')
        shots = 1
        circuit_copy = self._circuit.copy()
        circuit_copy.assign_parameters(d, inplace = True)
        qobj = qiskit.assemble(circuit_copy, backend=backend)
        job = backend.run(qobj, shots=shots)
        result = job.result()

        return result

    def draw(self):
        qc = self._circuit
        fig = qc.draw(output='mpl')
        display(fig)

カートポールでは、カートを左右どちらに動かすべきか決める必要がある。
そのため、2通りの動作に対応する期待値を計算する。

def cal_expectaion_values(result):
    """
    return numpy array([float. float])
    """

    result_statevec = qiskit.quantum_info.Statevector(result.get_statevector())
    obs01 = Pauli('ZZII')
    obs02 = Pauli('IIZZ')
    expectation01 = result_statevec.expectation_value(obs01)
    expectation02 = result_statevec.expectation_value(obs02)

    expectation_values = np.array([np.real(expectation01), np.real(expectation02)])

    return expectation_values

qiskit と pytorchを連携して量子強化学習を行う。
量子回路をNNのレイヤーに取り込む。
NNの最適化に勾配を用いるので、パラメータ量子回路において、出力のパラメータ依存性(勾配)を計算する必要があり、そのためにパラメータシフト法に基づく勾配計算を行う。

# https://qiita.com/gyu-don/items/dbbe0f87bff6553655b0 を参考に
class DifferenceBySampling(Function):
    """
    The gradients of a runnable quantum circuit layer can be obtained by utilzing Parameter-Shift rule, not the auto grad of Pytorch.
    """

    @staticmethod
    def forward(ctx, f, xs, thetas):
        # print('xs in forward', xs)
        def f_each(x, thetas):
            return torch.tensor([f(x, thetas) for x in xs], dtype=torch.float32, device=device)
        ys = f_each(xs, thetas)
        ctx.save_for_backward(xs, ys, thetas)
        ctx.f_each = f_each
        return ys

# https://qiita.com/gyu-don/items/ace6f69c07feb92d49b1#f%E3%82%92f_str%E3%81%AB%E5%A4%89%E3%81%88%E3%81%A6%E3%81%BF%E3%82%8B
# https://qiita.com/notori48/items/0ab84113afc9204d3e90
# https://www.investor-daiki.com/it/qiskit-parameter-shift
    @staticmethod
    def backward(ctx, grad_output):
        xs, ys, thetas = ctx.saved_tensors

        dtheta = np.pi/2
        diff_thetas = []
        thetas = thetas.detach() # auto_gradを使わず
        for i in range(len(thetas)):
            thetas[i] += dtheta
            forward = ctx.f_each(xs, thetas)
            thetas[i] -= 2 * dtheta
            backward = ctx.f_each(xs, thetas)
            gradient = 0.5 * (forward - backward)
            diff_thetas.append(torch.sum(grad_output * gradient))
            thetas[i] += dtheta # shift前に戻す
        diff_thetas = torch.tensor(diff_thetas, dtype=torch.float32, device=device)

        dx = np.pi/2
        diff_xs_list = []
        xs = xs.detach() # auto_gradを使わず
        for i in range(xs.size()[1]):
            dxtensor = torch.zeros(xs.size()[1])
            dxtensor[i] = dx
            dxtensor = dxtensor.repeat(xs.size()[0], 1)
            xs += dxtensor
            forward = ctx.f_each(xs, thetas)
            xs -= 2 * dxtensor
            backward = ctx.f_each(xs, thetas)
            gradient = 0.5 * (forward - backward)
            diff_xs_list.append(torch.sum(grad_output * gradient, 1, keepdim=True)) # 3 * 2 -> 3 * 1
            xs += dxtensor # shift前に戻す
        diff_xs = torch.cat(diff_xs_list, dim=1)
        diff_xs.to(device=device, dtype=torch.float32)

        return None, diff_xs, diff_thetas

class ReUploadingPQCLayer(nn.Module):
    
    def __init__(self, n_observations, n_actions, c_depth, backend, shots):
        super(ReUploadingPQCLayer, self).__init__()
        n_qubits = n_observations
        self.re_uploading_PQC = ReUploadingPQC(n_qubits=n_qubits, c_depth=c_depth, backend=backend, shots=shots)
        self.c_depth  = c_depth
        self.shots = shots
        self.thetas = nn.Parameter(torch.rand(self.re_uploading_PQC.num_parameters, dtype=torch.float32, device=device) * np.pi)
        
    def forward(self, xs):
        def f(x, thetas):
            input_x = x.tolist() * self.c_depth # re-uploadingの回数だけxを複製
            result = self.re_uploading_PQC.run(input_x, thetas.tolist())
            expectation_values = cal_expectaion_values(result)
            expectation_values_scaled = (1 + expectation_values) /2 # 0 ~ 1に規格化
            return expectation_values_scaled.tolist()
        return DifferenceBySampling.apply(f, xs, self.thetas)

量子回路をレイヤーにもつNN。
inputとoutputに訓練可能な重みを付けた。NNの出力はQ関数にあたり、これは0~1以上の値を持つため、重みづけが必要。論文に訓練可能な重みが良いと記載あり。

class CustomMultiplyLayer(nn.Module):

    def __init__(self, in_features: int, device=None, dtype=None) -> None:
        factory_kwargs = {'device': device, 'dtype': dtype}
        super(CustomMultiplyLayer, self).__init__()
        self.in_features = in_features
        self.weight = nn.Parameter(torch.empty((in_features), **factory_kwargs))
        self.reset_parameters()

    def reset_parameters(self) -> None:
        nn.init.ones_(self.weight)

    def forward(self, input: torch.Tensor) -> torch.Tensor:
        return torch.mul(input, self.weight)


n_depthGB = 5
class QQN(nn.Module):
    """ 
    This class define a NN using a runnable quantum circuit for its internal layer.
    """  

    def __init__(self, n_observations, n_actions):
        super(QQN, self).__init__()
        self.custom_multiply_layer1 = CustomMultiplyLayer(n_observations)
        self.re_uploading_PQC_layer = ReUploadingPQCLayer(n_observations, n_actions, n_depthGB, qiskit.BasicAer.get_backend('qasm_simulator'), 100)
        self.custom_multiply_layer2 = CustomMultiplyLayer(n_actions)

    # Called with either one element to determine next action, or a batch
    # during optimization. Returns tensor([[left0exp,right0exp]...]).
    def forward(self, x):
        # print('xs0' ,x)
        weighted_x = self.custom_multiply_layer1(x)
        # print('weighted' ,weighted_x)
        angle = torch.arctan(weighted_x)
        # print('angle' ,angle)
        Q = self.re_uploading_PQC_layer(angle)
        # print('Q' ,Q)
        weighted_Q = self.custom_multiply_layer2(Q)
        return weighted_Q

カートポールの環境を定義

import gym

print(gym.__version__)
if gym.__version__[:4] == '0.26':
    env = gym.make('CartPole-v1')
elif gym.__version__[:4] == '0.25':
    env = gym.make('CartPole-v1', new_step_api=True)
else:
    raise ImportError(f"Requires gym v25 or v26, actual version: {gym.__version__}")

# Get number of actions from gym action space
n_actions = env.action_space.n
# Get the number of state observations
n_observations = env.observation_space.shape[0]
print("Action Space: {}".format(n_actions)) # 行動空間
print("Env Space: {}".format(n_observations)) # 状態空間

state = env.reset() # 環境の初期化
print(state)

Q学習を成功させるために、経験再生とターゲットネットワークという手法が良く用いられる。

from collections import namedtuple, deque
import random

Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))

class ReplayMemory(object):

    def __init__(self, capacity):
        self.memory = deque([],maxlen=capacity)

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

エージェントクラスのメンバ変数にブレインクラスのインスタンスを用意。
エージェントの行動決定のために、ブレインインスタンスが利用される。
量子回路を含むNNやターゲットネットワーク、そして経験再生のためのメモリは、このブレインインスタンスが管理する。

def save_pkl(path, obj):
    with open(path, 'wb') as p:
        pickle.dump(obj, p)

def load_pkl(path):
    with open(path, 'rb') as p:
        file = pickle.load(p)
    return file

# https://book.mynavi.jp/manatee/detail/id=88997

# CartPoleで動くエージェントクラスです、棒付き台車そのものになります
class Agent:
    def __init__(self, n_observations, n_actions, env):
        # 課題の状態と行動の数を設定
        self.n_observations = n_observations  # CartPoleは状態数4を取得
        self.n_actions = n_actions  # CartPoleの行動(右に左に押す)の2を取得
        self.brain = None
        
        self.path_brain = '/content/drive/MyDrive/Colab Notebooks/brain.bin'
        is_file = os.path.isfile(self.path_brain)
        if is_file:# if file exists
            self.load_brain()
            print("The brain has been loaded.")
        else:
            self.brain = Brain(n_observations, n_actions, env)  # エージェントが行動を決定するための脳を生成
            print("There is no pretrained_brain.")

    def get_action(self, state):
        # 行動の決定
        return self.brain.get_action(state)

    def store_experience(self, state, action, next_state, reward):
        self.brain.store_experience(state, action, next_state, reward)

    def optimize_model(self):
        # Q関数の更新
        self.brain.optimize_model()

    def update_target_network(self):
        self.brain.update_target_network()

    def update_epsilon(self):
        self.brain.update_epsilon()

    def save_model(self):
        self.brain.save_model()

    def save_brain(self):
        save_pkl(self.path_brain, self.brain)

    def load_brain(self):
        self.brain = load_pkl(self.path_brain)
        print("The pretratined_paramters have been loaded.")
        print("The parameters of the nets:")
        print(self.brain.policy_net.state_dict())
        print("")
        print("The epislon was set to " + str(self.brain.epsilon) + ", where we have reached previous optimizing.")

# エージェントが持つ脳となるクラスです、Q学習を実行します
class Brain:
 
    def __init__(self, n_observations, n_actions, env):
        self.n_observations = n_observations  # CartPoleは状態数4を取得
        self.n_actions = n_actions  # CartPoleの行動(右に左に押す)の2を取得
        self.env = env

        self.policy_net = QQN(n_observations, n_actions).to(device)
        self.target_net = QQN(n_observations, n_actions).to(device)
        # self.TAU = 0.005 # for soft updating target network
        self.path_net = '/content/drive/MyDrive/Colab Notebooks/policy_net.pth'

        print("The initial parameters of the nets:")
        print(self.policy_net.state_dict())
        self.target_net.load_state_dict(self.policy_net.state_dict()) # targetネットワークと同期

        # LR = 1e-3
        # self.optimizer = optim.AdamW(self.policy_net.parameters(), lr=LR, amsgrad=True)
        # https://pytorch.org/docs/stable/optim.html
        model_params_dic_list = []
        lrs = [1e-3, 1e-3, 1e-1]
        for param, lr in zip(self.policy_net.parameters(), lrs):
            model_params_dic_list.append({'params':param, 'lr':lr})
        self.optimizer = optim.AdamW(model_params_dic_list, amsgrad=True)

        self.GAMMA = 0.99

        # use the condition that is indicated at a Tensorflow tutorial
        self.epsilon = 1.0  # Epsilon greedy parameter
        print("The epislon was initialized to " + str(self.epsilon))
        self.epsilon_min = 0.01  # Minimum epsilon greedy parameter
        self.decay_epsilon = 0.99 # Decay rate of epsilon greedy parameter 0.99でうまくいかず

        self.memory = ReplayMemory(10000)
        self.BATCH_SIZE = 16 #3

    def get_action(self, state):
        sample = random.random()
        if sample > self.epsilon:
            with torch.no_grad():
                # t.max(1) will return largest column value of each row.
                # second column on max result is index of where max element was found,
                # so we pick action with the larger expected reward.
                state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
                return self.policy_net(state).max(1)[1].view(1, 1)
        else:
            return torch.tensor([[self.env.action_space.sample()]], device=device, dtype=torch.long)

    def store_experience(self, state, action, next_state, reward):
        state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
        if next_state is not None:
            next_state = torch.tensor(next_state, dtype=torch.float32, device=device).unsqueeze(0)
        reward = torch.tensor([reward], device=device)
        self.memory.push(state, action, next_state, reward)
    
    def optimize_model(self):
        if len(self.memory) < self.BATCH_SIZE:
            return
        transitions = self.memory.sample(self.BATCH_SIZE)
        # Transpose the batch (see https://stackoverflow.com/a/19343/3343043 for
        # detailed explanation). This converts batch-array of Transitions
        # to Transition of batch-arrays.
        batch = Transition(*zip(*transitions))

        # Compute a mask of non-final states and concatenate the batch elements
        # (a final state would've been the one after which simulation ended)
        non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                              batch.next_state)), device=device, dtype=torch.bool)
        non_final_next_states = torch.cat([s for s in batch.next_state
                                                    if s is not None])
        state_batch = torch.cat(batch.state)
        action_batch = torch.cat(batch.action)
        reward_batch = torch.cat(batch.reward)

        # Compute Q(s_t, a) - the model computes Q(s_t), then we select the
        # columns of actions taken. These are the actions which would've been taken
        # for each batch state according to policy_net
        state_action_values = self.policy_net(state_batch).gather(1, action_batch)

        # Compute V(s_{t+1}) for all next states.
        # Expected values of actions for non_final_next_states are computed based
        # on the "older" target_net; selecting their best reward with max(1)[0].
        # This is merged based on the mask, such that we'll have either the expected
        # state value or 0 in case the state was final.
        next_state_values = torch.zeros(self.BATCH_SIZE, device=device)
        with torch.no_grad():
            next_state_values[non_final_mask] = self.target_net(non_final_next_states).max(1)[0]
        # Compute the expected Q values
        expected_state_action_values = (next_state_values * self.GAMMA) + reward_batch

        # Compute Huber loss
        criterion = nn.SmoothL1Loss()
        loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))

        # Optimize the model
        self.optimizer.zero_grad()
        loss.backward()
        # In-place gradient clipping
        # torch.nn.utils.clip_grad_value_(self.policy_net.parameters(), 100)
        self.optimizer.step()

    def update_target_network(self):
        # this update method that is indicated at a Pytorch tutorial
        # # θ′ ← τ θ + (1 −τ )θ′
        # target_net_state_dict = self.target_net.state_dict()
        # policy_net_state_dict = self.policy_net.state_dict()
        # for key in policy_net_state_dict:
        #     target_net_state_dict[key] = policy_net_state_dict[key]*self.TAU + target_net_state_dict[key]*(1-self.TAU)
        # self.target_net.load_state_dict(target_net_state_dict)

        # this update method that is indicated at a Tensorflow tutorial
        self.target_net.load_state_dict(self.policy_net.state_dict())

    def update_epsilon(self):
        self.epsilon = max(self.epsilon * self.decay_epsilon, self.epsilon_min)

    def save_model(self):
        # Save the parameters of the model
        torch.save(self.policy_net.state_dict(), self.path_net)

    def load_model(self):
        pretrained_params = torch.load(self.path_net)
        self.policy_net.load_state_dict(pretrained_params)

Q学習の進捗具合を可視化

# set up matplotlib
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
    from IPython import display

plt.ion()
    
def plot_durations(episode_durations):
    plt.figure(1)
    plt.clf()
    durations_t = torch.tensor(episode_durations, dtype=torch.float)
    plt.title('Training by QuantumQLearning')
    plt.xlabel('Epsiode')
    plt.ylabel('Collected rewards')
    plt.plot(durations_t.numpy())
    plt.show()

量子強化学習実行

from itertools import count

agent = Agent(n_observations, n_actions, env)

if torch.cuda.is_available():
    num_episodes = 6000
else:
    num_episodes = 2000

total_steps = 0
path_total_steps = '/content/drive/MyDrive/Colab Notebooks/path_total_steps.bin'
is_file = os.path.isfile(path_total_steps)
if is_file:# if file exists
    total_steps = load_pkl(path_total_steps)
    print("The training has already finish" + str(total_steps) + "steps.")
    print("We continue the trainning.")
else:
    print("The training starts")

steps_per_update = 10#1 # Train the model every x steps
steps_per_target_update = 30#1 # Update the target model every x steps

episode_durations = [] # あるエピソードにおいて、どの程度期間カートポールを立てていられていたか
path_epdu = '/content/drive/MyDrive/Colab Notebooks/episode_duration.bin'
is_file = os.path.isfile(path_epdu)
if is_file:# if file exists
    episode_durations = load_pkl(path_epdu)
    print("The episode_durations was loaded.")
else:
    print("The episode_durations is empty.")

for i_episode in range(num_episodes):
    # Initialize the environment and get it's state
    state = env.reset()

    for t in count():
        action = agent.get_action(state)
        observation, reward, terminated, truncated, _ = env.step(action.item())
        done = terminated or truncated

        if terminated:
            next_state = None
        else:
            next_state = observation

        # Store the transition in memory
        agent.store_experience(state, action, next_state, reward)

        # Move to the next state
        state = next_state

        total_steps += 1
        # Perform one step of the optimization (on the policy network)
        if total_steps % steps_per_update == 0:
            agent.optimize_model()

        # Soft update of the target network's weights
        if total_steps % steps_per_target_update == 0:
            agent.update_target_network()

        if done:
            agent.update_epsilon()
            episode_durations.append(t+1)

            #↓のデータをトランザクションにして、これらのデータセットの整合性持たせたい
            save_pkl(path_total_steps, total_steps)
            save_pkl(path_epdu, episode_durations)
            agent.save_brain()
            agent.save_model() # modelを自体も確保しておく。
            
            plot_durations(episode_durations)
            break

    if (i_episode+1)%10 == 0:
        avg_rewards = np.mean(episode_durations[-10:])
        print("Episode {}/{}, average last 10 rewards {}".format(i_episode+1, num_episodes, avg_rewards))
        if avg_rewards >= 500.0:
            break

print('Complete')
plt.ioff()
plt.show()
9
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?