この記事では Prolog を使ったモデル検査器の実装を紹介します。
モデル検査器は、プログラムやアルゴリズムについて主に次の2つの性質の検証を目的としています:
- 望ましい状態にいつかなる
- たとえば、いつかロックを獲得できる、いつか全ノードの状態が一致する、…
- 望ましくない状態に陥らない
- たとえば、デッドロックにならない、結果が不定にならない、…
上記の性質の検証はいずれも従来のテストでは難しく、ここにモデル検査が活躍できる余地があります。このうち、今回はデッドロックを検知するプログラムを実装します。
このプログラムでは、検査対象のプログラムやアルゴリズムの取りうる状態を洗い出し、ここにまずいものが含まれていないことを検査します。まずは状態の洗い出しから説明します。
状態の洗い出し
今回の実装ではプログラムの状態を変数と実行位置の2つに絞り、これらの変化を調べていきます。
たとえば、以下の Go 言語で書かれたプログラムの状態を洗い出してみましょう:
var x int
x = 0
x = 1
簡単にわかるのは、x
が未定義のときと 0
のとき、1
のときの3つの状態です。そして、この3つの状態は次の流れで変わっていきます:
x の値は未定義
|
v
x の値は 0
|
v
x の値は 1
次に、並列処理の概念を加えます。というのも、モデル検査器は非同期処理の領域で高い威力を発揮できるからで、今回の実装でもここまでは拡張します。では、次の並列処理を含むプログラムの状態の遷移を考えてみましょう:
x := 0
for i := 0; i < 2; i++ {
go func() {
y := x // 処理1
x = y + 1 // 処理2
}()
}
上のプログラムでは、処理1と2と書かれた部分は非同期に実行されます。とりあえず並列に実行されると単純化1して考えると、次の6通りの処理の順番がありえます:
- スレッドAで処理1 → スレッドAで処理2 → スレッドBで処理1 → スレッドBで処理2
- スレッドAで処理1 → スレッドBで処理1 → スレッドAで処理2 → スレッドBで処理2
- スレッドAで処理1 → スレッドBで処理1 → スレッドBで処理2 → スレッドAで処理2
- スレッドBで処理1 → スレッドAで処理1 → スレッドBで処理2 → スレッドAで処理2
- スレッドBで処理1 → スレッドAで処理1 → スレッドAで処理2 → スレッドBで処理2
- スレッドBで処理1 → スレッドBで処理2 → スレッドAで処理1 → スレッドAで処理2
この処理の流れの順番を列挙する方法は、(1) 実行できるスレッドを1つ選び、(2) このスレッド内で実行されるこれ以上分割できない操作(アトミックな操作)を実行して、(1) に戻るの繰り返しです。
そして、それぞれの処理の流れを次のような状態遷移としても表現できます(変数 y
はスレッドAとBで別々の変数なので便宜上 y_A
と y_B
と分けて表現します):
x の値は 0
|
+---------------+----------------+---- (略)
| | |
v v v
x の値は 0 x の値は 0 x の値は 0
y_A の値は 0 y_A の値は 0 y_A の値は 0
| | |
v v v
x の値は 1 x の値は 0 x の値は 0
y_A の値は 0 y_A の値は 0 y_A の値は 0
y_B の値は 0 y_B の値は 0
| | |
v v v
x の値は 1 x の値は 1 x の値は 1
y_A の値は 0 y_A の値は 0 y_A の値は 0
y_B の値は 1 y_B の値は 0 y_B の値は 0
| | |
v v v
x の値は 2 x の値は 1 x の値は 1
y_A の値は 0 y_A の値は 0 y_A の値は 0
y_B の値は 1 y_B の値は 0 y_B の値は 0
| | |
| +--------+-------+
| |
v v
x の値は 2 x の値は 1
これまでの例で、プログラムのとりえる状態の洗い出しの方法はなんとなく掴めたと思います。次に、洗い出された状態を検査する方法を説明します。
状態の検査
次のプログラムは、典型的なデッドロックを引き起こしうるプログラムです:
f := func (a, b *sync.Mutex) {
a.Lock()
b.Lock()
b.Unlock()
a.Unlock()
}
var mu1 sync.Mutex
var mu2 sync.Mutex
go f(&mu1, &mu2) // go文1
go f(&mu2, &mu1) // go文2
ここでも、前の例と同様の単純な仮定(go 文1, 2 は並列で処理される)をおきます。
前の節では、並列処理の流れを洗い出す方法として、(1) 実行できるスレッドを1つ選び、(2) このスレッド内で実行されるこれ以上分割できない操作を実行して、(1) に戻ると説明しました。実はここに落とし穴があって、(1) で実行できるスレッドが1つもないもないということがありえます。
上のコードでは、go文1で a.Lock
(= mu1
)→ go文2で a.Lock
(= mu2
)→ という順番で処理されると、go文1と2のいずれも b.Lock
以降を実行できず、デッドロックになります。これを状態遷移でみてみると、次のようになっています:
mu1 は lock されていない
mu2 は lock されていない
|
+--------------+-------------+------------(略)
| |
v v
mu1 は lock されている mu1 は lock されている
mu2 は lock されていない mu2 は lock されていない
| |
v v
mu1 は lock されていない mu1 は lock されている
mu2 は lock されていない mu2 は lock されている
|
v :
mu1 は lock されていない (実行できるスレッドがない)
mu2 は lock されている
|
v
mu1 は lock されていない
mu2 は lock されていない
|
v
終了
重要なのは、デッドロックは次の行き先がない状態として現れるということです。つまり、次の行き先がない状態を探せば、デッドロックを見つけられます2。
Prolog による実装
このような組み合わせの列挙や探索に向いている言語が Prolog です。これまでの処理を Prolog で記述してみましょう。
アトミックな操作の抽出
ソースコードを解析してアトミックな処理の繋がりを抽出できれば便利ですが、これはとても大変です。そのため、ここではすでにこのデータが用意されている状態から始めます:
// 検査対象のプログラム
f := func (a, b *sync.Mutex) {
a.Lock() // lock_1
b.Lock() // lock_2
b.Unlock() // unlock_2
a.Unlock() // unlock_1
}
var mu1 sync.Mutex
var mu2 sync.Mutex
go f(&mu1, &mu2) // GO_A
go f(&mu2, &mu1) // GO_B
% 対応するデータ
% 各スレッドの初期の実行位置(GO_A_(数字) と GO_B_(数字) は上のコメントに対応する goroutine の実行位置を表している)。
init_locations(['GO_A_0', 'GO_B_0']).
% 追跡する変数(リストの左が mu1、右が mu2 の Mutex の状態を表している)。
init_vars(['unlocked', 'unlocked']).
% 1つめの goroutine (GO_A)内の、アトミックな操作による状態遷移の記述。
% 読み方は以下のとおり:
% transit(操作のラベル, 実行位置, 次の実行位置, 元の変数の状態、後の変数の状態).
transit('GO_A_lock_1', 'GO_A_0', 'GO_A_1', ['unlocked', Mu2], ['locked', Mu2]).
transit('GO_A_lock_2', 'GO_A_1', 'GO_A_2', [Mu1, 'unlocked'], [Mu1, 'locked']).
transit('GO_A_unlock_2', 'GO_A_2', 'GO_A_3', [Mu1, 'locked'], [Mu1, 'unlocked']).
transit('GO_A_unlock_1', 'GO_A_3', 'GO_A_4', ['locked', Mu2], ['unlocked', Mu2]).
% 2つめの goroutine (GO_B)内の、アトミックな操作による状態遷移の記述。
transit('GO_B_lock_1', 'GO_B_0', 'GO_B_1', [Mu1, 'unlocked'], [Mu1, 'locked']).
transit('GO_B_lock_2', 'GO_B_1', 'GO_B_2', ['unlocked', Mu2], ['locked', Mu2]).
transit('GO_B_unlock_2', 'GO_B_2', 'GO_B_3', ['locked', Mu2], ['unlocked', Mu2]).
transit('GO_B_unlock_1', 'GO_B_3', 'GO_B_4', [Mu1, 'locked'], [Mu1, 'unlocked']).
簡単にここでの Prolog の読み方を説明します。
Prolog で中心となる概念は「述語」です。大雑把にいうと、述語とはなんらかのものごとに対して真偽のいずれかを答えるものだと思ってください。例えば数値の大小比較は述語とみなせて、変数 x
より変数 y
が大きければ真となる述語とできます。
このような述語をうまく使うとデータベースのようなものを構成できます。さらに、このデータベースへの問い合わせも述語で記述します。今回のコードでも「事実」と「規則」の2種類の述語を宣言しています。
init_locations
と init_vars
は事実と呼ばれるもので、事実(値1, 値2, ...).
のように記述します3。事実の述語は、宣言されている値に一致する組み合わせについては真、そうでなければ偽と判定されます。ここでは、検査対象のプログラムの初期実行位置(init_locations
)、変数の初期状態(init_vars
)を事実として宣言しています。
次の transit
も事実と呼ばれると思いますが(自信ない)、前の2つとは違って変数を含んでいます。Prolog では大文字始まりの識別子を変数とみなします。上のコードでは Mu1
と Mu2
が変数に当たります。上のコードを抜粋した次のコードでは、変数をつかって &mu2
の値(locked
または unlocked
)がこの処理の前後で変わらないという制約を課しています:
transit('GO_A_lock_1', 'GO_A_0', 'GO_A_1', ['unlocked', Mu2], ['locked', Mu2]).
^^^^^^^^^^^^^ ^^^^^^^^ ^^^^^^ ^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^
A B C D E
% A: GO_A の goroutine の a.Lock() の対応する状態遷移であることを説明
% B: GO_A の goroutine の操作前の実行位置
% C: GO_A の goroutine の操作後の実行位置
% D: 操作前の &mu1 と &mu2 の状態
% E: 操作後の &mu1 と &mu2 の状態
% (&mu1 は 'unlocked' から 'locked' へ変わり、&mu2 は変わらない)
まとめると、ここではプログラムの初期の実行位置と変数の状態、そしてアトミックな操作の前後の状態の変化を宣言しています。
状態の洗い出し
これまでの宣言をつかって、検査対象のプログラムのとりえる状態を洗い出します。先にここでの実装のビジョンを示した方が実装の流れを追いやすいので、次に示します:
% 与えられた状態をプログラムがとりえれば真、そうでなければ偽を帰す。変数の意味は:
% Ls: 各スレッドの実行位置
% Vs: 処理前の変数の状態
reachable(Ls_n, Vs_n) :- TODO.
% 「:-」は左の述語が真となる条件を書くときの構文。
reachable
は与えられた実行位置と変数の状態の組み合わせをとりえるかどうかを答える述語です。Prolog は、述語を満たす答えを自動で見つけられるという強力な機能を持っています4。そのため、reachable
が定義できればプログラムのとりえる状態を洗い出せます。
では、reachable
の部品をボトムアップで実装し、最終的に reachable
の中身を実装していきます。
まず、並列処理の順番を洗い出す部分を前に宣言した transit
から実装します。この実装は、なんども登場している「実行できるスレッドを1つ選び、…」に相当します:
% 実行できるスレッドを1つ選び、操作を実行できれば真、そうでなければ偽。変数の意味は:
% T: 操作のラベル
% Ls: 各スレッドの実行位置
% Ls_1: 実行後の各スレッドの実行位置
% Vs: 処理前の変数の状態
% Vs_1: 処理後の変数の状態
transit_composition(T, Ls, Ls_1, Vs, Vs_1) :-
select(L, Ls, L_1, Ls_1),
transit(T, L, L_1, Vs, Vs_1).
% select でスレッドの実行位置を Ls から1つ選び L に入れ、L を後で確定する L_1 で
% 置き換えたリストを Ls_1 とする。select は Ls が具体的なリストでかつ空でなければ真。
% 「,」は前後の述語がいずれも真であれば全体として真となる述語をつくる構文(論理積に相当)。
% 上で選ばれた L と Vs の位置で実行できる操作があれば真。
% つまり、与えられた Ls と Vs のもとで実行できる transit が見つかれば、
% 真となる T と Ls_1 と Vs_1 が自動的に決まる規則になっている。
select
(厳密には select/4
)が独特ですが、かなり短いコードで並列処理の順番の規則を実装できています。
次に、ここで実装した transit_composition
に「ある状態Aから次の状態Bへ遷移でき、ここからさらに状態Cへ遷移できれば、状態Aから状態Cへも遷移できるとみなす(推移律)」性質を導入します。この作業はこの節のゴールである reachable
の実装を簡単にするためのものです。
推移律は次のような規則の宣言によって実装できます:
% 遷移できる状態からさらに遷移できる状態ならば真。変数の意味は:
% Ls: 各スレッドの実行位置
% Ls_1: 処理後の各スレッドの実行位置
% Vs: 処理前の変数の状態
% Vs_1: 処理後の変数の状態
transit_composition_transitive(Ls, Ls_1, Vs, Vs_1) :-
transit_composition(_, Ls, Ls_1, Vs, Vs_1).
% 直接遷移できる状態は遷移できるとみなす。
transit_composition_transitive(Ls, Ls_n, Vs, Vs_n) :-
transit_composition(_, Ls, Ls_1, Vs, Vs_1),
transit_composition_transitive(Ls_1, Ls_n, Vs_1, Vs_n).
% 間接的に遷移できる状態も遷移できるとみなす。
Prolog では、同じ名前で値または変数の数が同じものを複数書くことで、このうちいずれかが真となれば全体として真となる(論理和に相当)ように記述できます。
さて、これでここでのゴールだった reachable
を実装する準備が整いました:
% 初期状態から遷移できる状態であれば真。変数の意味は:
% Ls: 各スレッドの実行位置
% Vs: 変数の状態
reachable(Ls_n, Vs_n) :-
init_locations(Ls_0),
init_vars(Vs_0),
transit_composition_transitive(Ls_0, Ls_n, Vs_0, Vs_n).
% 初期状態から直接/間接的に遷移できる状態が、プログラムのとりえる状態である。
reachable(Ls_0, Vs_0) :-
init_locations(Ls_0),
init_vars(Vs_0).
% 初期状態もプログラムのとりえる状態に含めておく。
ここまでかければ、Prolog は reachable
をつかってプログラムのとりえる状態を列挙できるようになります。これまでのコードを ./main.pl
として以下のように Prolog の対話環境を起動します(今回は SWI-Prolog をつかっています):
ここまでのコード
reachable(Ls_n, Vs_n) :-
init_locations(Ls_0),
init_vars(Vs_0),
transit_composition_transitive(Ls_0, Ls_n, Vs_0, Vs_n).
reachable(Ls_0, Vs_0) :-
init_locations(Ls_0),
init_vars(Vs_0).
transit_composition_transitive(Ls, Ls_1, Vs, Vs_1) :-
transit_composition(_, Ls, Ls_1, Vs, Vs_1).
transit_composition_transitive(Ls, Ls_n, Vs, Vs_n) :-
transit_composition(_, Ls, Ls_1, Vs, Vs_1),
transit_composition_transitive(Ls_1, Ls_n, Vs_1, Vs_n).
transit_composition(T, Ls, Ls_1, Vs, Vs_1) :-
select(L, Ls, L_1, Ls_1),
transit(T, L, L_1, Vs, Vs_1).
% ---- Target specific ----
init_locations(['GO_A_0', 'GO_B_0']).
init_vars(['unlocked', 'unlocked']).
transit('GO_A_lock_1', 'GO_A_0', 'GO_A_1', ['unlocked', Mu2], ['locked', Mu2]).
transit('GO_A_lock_2', 'GO_A_1', 'GO_A_2', [Mu1, 'unlocked'], [Mu1, 'locked']).
transit('GO_A_unlock_2', 'GO_A_2', 'GO_A_3', [Mu1, 'locked'], [Mu1, 'unlocked']).
transit('GO_A_unlock_1', 'GO_A_3', 'GO_A_4', ['locked', Mu2], ['unlocked', Mu2]).
transit('GO_B_lock_1', 'GO_B_0', 'GO_B_1', [Mu1, 'unlocked'], [Mu1, 'locked']).
transit('GO_B_lock_2', 'GO_B_1', 'GO_B_2', ['unlocked', Mu2], ['locked', Mu2]).
transit('GO_B_unlock_2', 'GO_B_2', 'GO_B_3', ['locked', Mu2], ['unlocked', Mu2]).
transit('GO_B_unlock_1', 'GO_B_3', 'GO_B_4', [Mu1, 'locked'], [Mu1, 'unlocked']).
$ swipl -f ./main.pl
Welcome to SWI-Prolog (threaded, 64 bits, version 8.0.2)
SWI-Prolog comes with ABSOLUTELY NO WARRANTY. This is free software.
Please run ?- license. for legal details.
For online help and background, visit http://www.swi-prolog.org
For built-in help, use ?- help(Topic). or ?- apropos(Word).
?-
ここで、reachable
にあてはまる(=プログラムのとりえる)スレッドの実行位置と変数の状態を探索するよう指示します:
?- reachable(Ls, Vs).
すると、Prolog は一番はじめにみつかった状態を提示し、探索を続けるかどうかを聞いてきます:
?- reachable(Ls, Vs).
Ls = ['GO_A_0', 'GO_B_1'],
Vs = [unlocked, locked]
探索を中断したい場合は .
を入力し、探索を継続して別の答えを見たい場合は ;
を入力します。試しに数回 ;
を入力すると次のようにプログラムのとりえる状態を次々と列挙してくれます:
?- reachable(Ls, Vs).
Ls = ['GO_A_0', 'GO_B_1'],
Vs = [unlocked, locked] ;
Ls = ['GO_A_0', 'GO_B_3'],
Vs = [unlocked, locked] ;
Ls = ['GO_A_0', 'GO_B_4'],
Vs = [unlocked, unlocked] ;
Ls = ['GO_A_0', 'GO_B_2'],
Vs = [locked, locked] ;
Ls = ['GO_A_1', 'GO_B_0'],
Vs = [locked, unlocked] ;
Ls = ['GO_A_1', 'GO_B_3'],
Vs = [locked, locked]
ここまでの実装で、プログラムの状態の洗い出しができるようになりました。
デッドロックの検出
ここまでくればあと一歩です。
おさらいになりますが、デッドロックは次に遷移できる状態がない状態として見つかります。つまり、この条件にあてはまるもので真になる規則を実装すればデッドロックを見つけられます:
% deadlock であれば真。
% Ls: 各スレッドの実行位置
% Vs: 変数の状態
deadlock(Ls_n, Vs_n) :-
reachable(Ls_n, Vs_n),
\+ transit_composition(_, Ls_n, _, Vs_n, _).
% プログラムのとりうるスレッドの実行位置と変数の状態のうち、
% そこから先の遷移が存在しなければ真(\+ は否定の意味)。
見てのとおり、たった3行のコードでデッドロックを見つけられます。Prolog の威力を思い知らされる短さです。
では、この述語をつかってデッドロックを見つけてみましょう。先ほどと同様に Prolog の対話環境を開き、deadlock
にあてはまるものを問い合わせます:
?- deadlock(Ls, Vs).
Ls = ['GO_A_1', 'GO_B_1'],
Vs = [locked, locked] ;
Ls = ['GO_A_4', 'GO_B_4'],
Vs = [unlocked, unlocked] ;
false.
2つの状態がデッドロックとして検出されました!前者は確かにデッドロックしたときの状態です。後者は正常に終了したケースなのでデッドロックではありません。しかし、正常な終了も次の状態がないという条件を満たしているので出てきてしまっています。
実用的には、次の行き先がない状態がデッドロックとプログラムの終了のどちらなのかを区別する工夫が必要です。簡単ですが融通の効かない方法としては、スレッドに終了フラグをもたせ、すべてのスレッドの終了フラグが立っていないものをデッドロックとみなす方法があります。
状態グラフの可視化
せっかくなので、可視化もやってみましょう(コードは少し長くなるので説明は端折ります)。
可視化のためのコード追加後
deadlock(Ls_n, Vs_n) :-
reachable(Ls_n, Vs_n),
\+ transit_composition(_, Ls_n, _, Vs_n, _).
reachable(Ls_n, Vs_n) :-
init_locations(Ls_0),
init_vars(Vs_0),
transit_composition_transitive(Ls_0, Ls_n, Vs_0, Vs_n).
reachable(Ls_0, Vs_0) :-
init_locations(Ls_0),
init_vars(Vs_0).
transit_composition_transitive(Ls, Ls_1, Vs, Vs_1) :-
transit_composition(_, Ls, Ls_1, Vs, Vs_1).
transit_composition_transitive(Ls, Ls_n, Vs, Vs_n) :-
transit_composition(_, Ls, Ls_1, Vs, Vs_1),
transit_composition_transitive(Ls_1, Ls_n, Vs_1, Vs_n).
transit_composition(T, Ls, Ls_1, Vs, Vs_1) :-
select(L, Ls, L_1, Ls_1),
transit(T, L, L_1, Vs, Vs_1).
write_composition :-
writeln("strict digraph {"),
findall(_, (reachable(Ls0, Vs0), transit_composition(T, Ls0, Ls1, Vs0, Vs1), write_composition_edge(T, Ls0, Ls1, Vs0, Vs1)), _),
findall(_, (deadlock(Ls, Vs), write(" "), write_state(Ls, Vs), write("[style=\"solid,filled\", fillcolor=\"#FF7777\"]\n")), _),
writeln("}").
write_composition_edge(T, Ls0, Ls1, Vs0, Vs1) :-
write(" "),
write_state(Ls0, Vs0),
write(" -> "),
write_state(Ls1, Vs1),
format(" [label=~w]\n", [T]).
write_state(Ls, Vs) :- write_atoms(Ls, "_"), write("_"), write_atoms(Vs, "_").
write_thread :-
writeln("strict digraph {"),
findall(_, (transit(T, L0, L1, _, _), write_edge(T, L0, L1)), _),
writeln("}").
write_edge(T, L0, L1) :- format(" ~w -> ~w [label=~w]\n", [L0, L1, T]).
write_atoms([X0,X1|Xs], C) :- write(X0), write(C), write_atoms([X1|Xs], C).
write_atoms([X], _) :- write(X).
% ---- Target specific ----
init_locations(['GO_A_0', 'GO_B_0']).
init_vars(['unlocked', 'unlocked']).
transit('GO_A_lock_1', 'GO_A_0', 'GO_A_1', ['unlocked', Mu2], ['locked', Mu2]).
transit('GO_A_lock_2', 'GO_A_1', 'GO_A_2', [Mu1, 'unlocked'], [Mu1, 'locked']).
transit('GO_A_unlock_2', 'GO_A_2', 'GO_A_3', [Mu1, 'locked'], [Mu1, 'unlocked']).
transit('GO_A_unlock_1', 'GO_A_3', 'GO_A_4', ['locked', Mu2], ['unlocked', Mu2]).
transit('GO_B_lock_1', 'GO_B_0', 'GO_B_1', [Mu1, 'unlocked'], [Mu1, 'locked']).
transit('GO_B_lock_2', 'GO_B_1', 'GO_B_2', ['unlocked', Mu2], ['locked', Mu2]).
transit('GO_B_unlock_2', 'GO_B_2', 'GO_B_3', ['locked', Mu2], ['unlocked', Mu2]).
transit('GO_B_unlock_1', 'GO_B_3', 'GO_B_4', [Mu1, 'locked'], [Mu1, 'unlocked']).
SWI-Prolog はコマンドライン引数から問い合わせの述語を指定できるので、ここから可視化のための述語を指定します:
$ swipl -f ./main.pl -q -g 'write_composition, halt.'
strict digraph {
GO_A_4_GO_B_1_unlocked_locked -> GO_A_4_GO_B_2_locked_locked [label=GO_B_lock_2]
GO_A_4_GO_B_2_locked_locked -> GO_A_4_GO_B_3_unlocked_locked [label=GO_B_unlock_2]
...
GO_A_4_GO_B_4_unlocked_unlocked[style="solid,filled", fillcolor="#FF7777"]
GO_A_1_GO_B_1_locked_locked[style="solid,filled", fillcolor="#FF7777"]
}
ここで出力されているのは DOT 言語で、これを Graphviz などのツールに入力すると図を描画できます。ここで生成された図は次のとおりです(赤い丸がデッドロックとして検出されたもの):
素晴らしいですね!
終わりに
この記事では、Prolog をつかって簡易的なモデル検査器を実装してみました。
モデル検査や Prolog に興味を持ってもらえれば嬉しい限りです。
謝辞
この記事は 2019/10/19 に開催された「デッドロック発見器を作って学ぶマルチスレッドプログラミング ★共有変数編★」に参加して作成した成果物を、主催者の @hatsugai さんから許諾をいただき執筆したものです。