はじめに
Guarded Horn Clauses (GHC) は1985年に発表された並行論理プログラミング言語です。
並行論理型プログラミング言語は、
ストリーム通信、未完成メッセージ、ショートサーキット、黒板モデルなどの
プログラミング手法が使える面白い言語です。
GHC では、
H :- G1, G2,..., Gn | B1, B2,..., Bm.
のガード付きホーン節の形でプログラムを書きます。
コミットバー(|
)の左側(H :- G1, G2,..., Gn
)をガード、
右側(B1, B2,..., Bm
)をボディと呼びます。
ガードは更に節頭部(H
) と
ガード部(G1, G2,..., Gn
)とに
分けることができます。
ガードに書かれた条件にマッチした場合に B1, B2,..., Bm
を実行する、というコミッテドチョイス動作を行います。
ガード部になにも書く必要がない場合、
H :- true | B1, B2,..., Bm.
と書いたり
H :- B1, B2,..., Bm.
と書いたりすることができます。
「並列論理型言語 GHC とその応用」に載っている、以下のような GHC コードを
SWI-Prolog 上で動かしたいと思いました。
% 素数ストリームを得る
primes(Max, Ps) :- true | gen(2, Max, Ns), sift(Ns, Ps).
% 整数ストリーム生成
gen(N0, Max, Ns0) :-
N0 =< Max | Ns0 = [N0 | Ns1], N1 is N0 + 1, gen(N1, Max, Ns1).
gen(N0, Max, Ns0) :-
N0 > Max | Ns0 = [].
% 素数フィルタを適用する
sift([P|Xs1], Zs0) :-
true | Zs0 = [P | Zs1], filter(P, Xs1, Ys), sift(Ys, Zs1).
sift([], Zs0) :-
true | Zs0 = [].
% 素数フィルタ
filter(P, [X|Xs1], Ys0) :-
X mod P =\= 0 | Ys0 = [X | Ys1], filter(P, Xs1, Ys1).
filter(P, [X|Xs1], Ys0) :-
X mod P =:= 0 | filter(P, Xs1, Ys0).
filter(_, [], Ys0) :-
true | Ys0 = [].
インタプリタの実装
実装の概要
GHC ソースコードのガード付きホーン節をそのまま Prolog 上に述語定義として書いて、
それを Prolog 上のインタプリタ ghc/1 で実行できるようにします。
述語 ghc/1 は、
与えられたクエリにマッチする定義節を探して、それを GHC プログラムとして実行する、
ということを繰り返します。
- 与えられたクエリをゴールとして実行する。クエリが連言なら個別のゴールに分解して解釈する。
- ゴールが組み込み述語 true,
=/2
,is/2
,prolog/1
なら即座に実行する。- true ならばなにもせず成功。
-
=/2
,is/2
ならば Prolog でそのまま実行する。 -
prolog/1
ならば引数を Prolog コードとして実行する。
- ゴールを元に定義節を検索する
- 定義節頭部に項引数が含まれるなら、ガード条件としてチェックする。当該ゴール引数が未定義なら定義されるまで待ち合わせを登録する。チェックに失敗したら次の定義節処理に移る(3に戻る)。
- 定義節にガード部が含まれるなら、ガード条件としてチェックする。ガード条件にマッチしたらボディ部の処理に進む。マッチしなければ次の定義節処理に移る(3に戻る)。
- ボディ部を処理する。これは、ボディ部をクエリとして再帰的に1の処理に進むことで実現できる。
トレース表示
インタプリタデバッグ用に述語 ghc_trace/1
を作っておきます。
% デバッグ時には以下を切り替える
% ghc_trace(P) :- format('trace: ~w~n', P).
ghc_trace(_).
後戻り禁止
当方の Prolog 力は弱いため、思いもよらない場所でバックトラックが起こって
わけがわからなくなることがよく起きます。
バックトラックを意図しない処理でバックトラックが発生したときに、
内部障害発生扱いとして警告したうえでバックトラックを抑止する(成功扱いにする)
述語を作ります。
% 引数に与えられた処理を実行する。
% 実行に失敗した場合には報告する。
always_success(P) :-
( call(P), !
; format('failed to execute: ~w~n', P)).
最終的に、バックトラックは絶対起きない、と確信が持てたなら
always_success(P)
を P
に置き換えてしまって差し支えありません。
失敗の報告
内部障害とは別に、与えられた GHC プログラムに間違いがあるなどして
ボディ部の処理に失敗した場合には報告することにします。
% 与えられたクエリの実行に失敗した場合の表示
report_fail(P) :-
format('fail: ~w~n', P).
連言の分解
まず、与えられたクエリが連言 P, Q
の形ならば、
個別の P
と Q
に分解して処理することにします。
% 連言の場合は分解して順次処理
ghc((P, Q)) :- !, ghc(P), ghc(Q).
% 以下のように逆順に処理しても結果は同じになるはず。
% ghc((P, Q)) :- !, ghc(Q), ghc(P).
GHC の場合、(P, Q)
の評価を P → Q の順で行っても Q → P の順序で行っても
結果に違いはないはずです。
ghc((P, Q)) :- !, ghc(Q), ghc(P).
の部分を逆順にすると、これを確かめることができます。
一方で、ストリーム入出力などの組み込み述語を作る場合には評価順序を強制したい
場合があります。
ここでは、独自の仕様として、P -> Q
の形で評価順序を明示できるようにしてみました。
% 順序を明示できるように(本来の GHC にはない)
ghc((P -> Q)) :- !, ghc(P) -> ghc(Q).
P の処理完了を待ち合わせるような処理はしていないので、単なる気持ちの表明レベルですが、
「連言で書いてある部分は順序を変えて実行するが
->
で書いてある部分は順序を変えない」
といった実験が可能です。
true, =/2, is/2, prolog/1 の実行
個別のゴール処理は以下のようにしました。
-
true
ならばなにもせず成功 -
A = B
ならばそのまま実行(アクティブユニフィケーション) -
A is B
ならばそのまま実行(アクティブユニフィケーション)- 但し、B に未実現変数が含まれるなら実現を待ち合わせてから実行する。
-
prolog(P)
ならば P を Prolog コードとしてそのまま実行
% true はなにもしない。
ghc(true) :-
!,
ghc_trace(ghc(true)).
% A = B なら(アクティブ)ユニフィケーション実施。
ghc(A = B) :-
!,
ghc_trace(ghc(A = B)),
(A = B ; report_fail(A = B)).
% A is B なら A is B を実行する。
% 但し、B に未実現変数が含まれるなら実現を待ってから実行。
ghc(A is B) :-
!,
ghc_trace(ghc(A is B)),
term_variables(B, Vars),
(Vars = [] -> (A is B ; report_fail(A is B))
; ghc_build_nonvars(Vars, Waits)
-> when(Waits, (A is B ; report_fail(A is B)))).
ghc(prolog(P)) :-
!,
ghc_trace(ghc(prolog(P))),
(call(P), ! ; report_fail(prolog(P))).
ゴール処理(頭部のパターンマッチ、待ち合わせ)
上記いずれにも属さない場合にはゴールに対応する述語定義を検索して処理を進めます。
% 組み込み述語でないなら GHC プログラムとして解釈する。
ghc(Head) :-
!,
ghc_trace(ghc(Head)),
( functor(Head, F, N),
findall(Clause,
(Clause = [Copy,Body], functor(Copy, F, N), clause(Copy, Body)),
Clauses),
ghc_execute_clauses(_, Head, Clauses), !
; report_fail(Head)).
ghc_execute_clauses(_, _, []) :- !.
ghc_execute_clauses(Flag, Head, [[Copy, GuardBody]|Clauses]) :-
!,
always_success(ghc_execute(Flag, Head, Copy, GuardBody)),
ghc_execute_clauses(Flag, Head, Clauses).
ここでは、2つの functor/3
呼び出しで頭部の空コピーを作った上で clause/2
で述語の問い合わせを行う、という処理を行っています。
一見無駄なようですが、述語として登録された頭部が変数なのか、項なのかは
待ち合わせ処理のために重要です。
複数の登録がある場合、clause/2
呼び出しを繰り返すことで異なる登録を取得できます。
ここで、インタプリタの書き方の戦略として
- 登録内容の実行を試み、失敗したらバックトラックにより別の登録内容について実行を繰り返す。
- 登録内容を一度に全て取り出したうえでそれぞれの実行を試みる。
という二種類が考えられます。
バックトラックによる繰り返しの方が実行効率は良さそうですが、
バックトラック発生要因となるいろいろなパターン全てに対応するのは結構難しい
と感じました。
なので、findall/3
で登録述語すべてを取得して、
それら全てについての実行を試みる形にします。
ghc_execute/4
の4つの引数はそれぞれ
- Flag: ゴール呼び出し完了フラグ。いずれかの呼び出しに成功したら fired を設定する
- Head: ゴールとして呼び出し側が引数に与えた値を含む
- Copy: 述語として登録された頭部
- GuardBody: 述語として登録された本体
です。例えば、sift(A, B) として呼び出そうとしているとき
sift([P|Xs1], Zs0) :-
true | Zs0 = [P | Zs1], filter(P, Xs1, Ys), sift(Ys, Zs1).
がマッチした場合ならば、
Copy は sift([P|Xs1], Zs0)
で
GuardBody は true | Zs0 = [P | Zs1], filter(P, Xs1, Ys), sift(Ys, Zs1)
です。
この場合、A
は [P|Xs1]
とのパッシブユニフィケーションのために
実体化の待ち合わせが必要です。
SWI-Prolog の when/2
を使うと実体化の待ち合わせが簡単に実装できます。
上記の例だと、
B=Zs0,
when(nonvar(A),
(A=[P|Xs1],
ghc(Zs0 = [P | Zs1], filter(P, Xs1, Ys), sift(Ys, Zs1))))
のように待ち合わせを登録しておけば、A が実体化されるまで待って処理を継続できます。
述語の定義として複数のガード付きホーン節が与えられている場合、
いずれか一つのガードがマッチすれば他のホーン節の実行は不要です。
実体化待ち合わせ&パッシブユニフィケーション(パターンマッチ)に成功すると、
Flag に fired を設定することにします。
逆に既に Flag が fired になっていたら別の定義が実行されているので
実行を停止することにします。
このため、待ち合わせ処理は若干複雑になって
B=Zs0,
when((nonvar(Flag) ; nonvar(A)), % Flag または A の実体化を待ち合わせる
( nonvar(Flag) ; % Flag が設定されていたら停止
(A=[P|Xs1],
ghc(Zs0 = [P | Zs1], filter(P, Xs1, Ys), sift(Ys, Zs1)))))
のような形になります。
ghc_execute(Flag, Head, Copy, GuardBody) :-
ghc_trace(Copy :- GuardBody),
% 待ち合わせが必要な変数があるか調べ、あればそのリストを Waits に抽出する。
always_success(ghc_wait_args(Head, Copy, Waits)),
% Waits が空なら即座に実行できる。空でないなら待ち合わせが必要。
( Waits = []
-> (Head = Copy -> always_success(ghc_process_guard_body(Flag, GuardBody))
; true)
; always_success(ghc_build_nonvars(Waits, NonVars)) ->
when((nonvar(Flag) ; NonVars),
( nonvar(Flag) % 完了済みなので停止
; ( Head = Copy
->
always_success(ghc_process_guard_body(Flag, GuardBody))
; true)))).
% Copy が実体を要求している場合には、Head が実体か調べる。
% 実体でなければ Waits に待ち合わせ対象変数として返す。
ghc_wait_args(Head, Copy, Waits) :-
Head =.. [_|HeadArgs],
Copy =.. [_|CopyArgs],
ghc_wait_args2(HeadArgs, CopyArgs, Waits).
ghc_wait_args2([], [], []).
ghc_wait_args2([H|Hs], [C|Cs], Waits) :-
( var(H), nonvar(C), functor(C, _, _)
-> Waits = [H|Waits2],
ghc_wait_args2(Hs, Cs, Waits2)
; ghc_wait_args2(Hs, Cs, Waits)).
% 変数リストを nonvar/1 でラップされた変数のリストとして返す。
ghc_build_nonvars([X], (nonvar(X))).
ghc_build_nonvars([X|Xs], (nonvar(X), Ys)) :-
ghc_build_nonvars(Xs, Ys).
ゴール処理(ガード部のパターンマッチ、待ち合わせ、ボディ部の実行)
本体部分を
G1, G2,..., Gn | B1, B2,..., Bm.
を
G1
と G2,..., Gn | B1, B2,..., Bm
に分解して順次パッシブユニフィケーションを行っていく方針で以下のように書きました。
SWI-Prolog では |
は ;
の別名扱いなので、;
でパターンを書いています
(なぜか |
で書くとうまく動かない)。
% 実行完了済みなら実行停止
ghc_process_guard_body(Flag, _) :- nonvar(Flag).
% ガード部先頭の取り出し
% G1,...|Body を G1 とそれ以外に分ける
ghc_process_guard_body(Flag, (G1, Gs) ; Body) :-
!,
ghc_process_guard_body(Flag, G1, (Gs ; Body)).
% G|Body を G と Body に分ける
ghc_process_guard_body(Flag, Guard ; Body) :-
!,
ghc_process_guard_body(Flag, Guard, Body).
ガード部先頭を見て
- ガードがない場合
-
true
の場合 -
wait(G)
の場合 -
G=[G1|G2]
の場合 - 上記以外のガードの場合
それぞれについて分岐して処理します。
ガードがない場合はそのまま実行します:
% H :- B1,B2,...,Bn の場合
ghc_process_guard_body(Flag, Body) :-
!,
Flag = fired, % 実行完了にしておく
always_success(ghc(Body)).
ガードに true が書かれている場合も同様に、無条件にボディ部を実行します:
% H :- true |B1,B2,...,Bn の場合
ghc_process_guard_body(Flag, true, GuardBody) :-
!,
always_success(ghc_process_guard_body(Flag, GuardBody)).
ガードに wait/1
が書かれている場合、wait/1
引数の実現を待ち合わせます:
% H :- wait(G1) ,G2,...,Gn|B1,B2,...,Bm の場合
ghc_process_guard_body(Flag, wait(G), GuardBody) :-
( var(G)
-> when((nonvar(Flag) ; nonvar(G)),
( nonvar(Flag) % 完了済みなので停止
; always_success(ghc_process_guard_body(Flag, GuardBody))))
; nonvar(G)
-> always_success(ghc_process_guard_body(Flag, GuardBody))).
ガードに G=[G1|G2]
が書かれている場合、G
の実現を待ち合わせて G1 と G2 に分解します:
% H :- G=[G1|G2] ,...,Gn|B1,B2,...,Bm の場合
ghc_process_guard_body(Flag, G1=[G2|G3], GuardBody) :-
( var(G1)
-> when((nonvar(Flag) ; nonvar(G1)),
( nonvar(Flag) % 完了済みなので停止
; ( G1=[G2|G3],
always_success(ghc_process_guard_body(Flag, GuardBody))
; true))) % ガード条件のマッチに失敗しても無視
; nonvar(G1)
-> ( G1=[G2|G3],
always_success(ghc_process_guard_body(Flag, GuardBody))
; true)). % ガード条件のマッチに失敗しても無視
上記いずれにも当てはまらない場合には、ガードに含まれている変数が全て実現されていることを確認したうえでガード条件チェックします。
ガード条件チェックでは未実現の変数を実現させてはいけません。
ここでは単純に、ガードの引数に実現していない変数が含まれる場合には
待ち合わせを強制するという形でこれを実現することにしました:
% passive unification
ghc_process_guard_body(Flag, G, GuardBody) :-
term_variables(G, Vars),
( ghc_build_nonvars(Vars, Waits)
-> when((nonvar(Flag) ; Waits),
( nonvar(Flag) % 完了済みなので停止
; ( call(G),
always_success(ghc_process_guard_body(Flag, G, GuardBody))
; true))) % ガード条件のマッチに失敗しても無視
; Vars = []
-> ( call(G),
always_success(ghc_process_guard_body(Flag, GuardBody))
; true)). % ガード条件のマッチに失敗しても無視
組み込み述語
outputstream/1
計算結果の出力くらいはしたいので、述語 outstream/1 を GHC 上で実装してみます。
述語 outstream/1 は引数に与えられたストリームの先頭から順に出力操作を実行します。
ここでは、必要最低限の操作として以下を受け付けられるようにしてみます:
- write(P)
- nl
- told
outstream([C|Cs]) :- outstream(C, Cs).
% write(P) が与えられたときには項を出力する。
outstream(write(P), Cmds) :- prolog(write(P)) -> outstream(Cmds).
% nl が与えられた時は改行を出力する。
outstream(nl, Cmds) :- prolog(nl) -> outstream(Cmds).
% ttyflush が与えられたときはフラッシュする。
outstream(ttyflush, Cmds) :- prolog(ttyflush) -> outstream(Cmds).
% told が与えられたときは outstream プロセスを終了する
outstream(told, _).
出力を行ってからストリームの続きの処理に移る、ということをしたいので、ここでは
上に記した独自拡張 ->
を使った書き方をしています。
実行例
primes/2
をインタプリタ上で実行すると、以下のように表示が得られました。
?- ghc(primes(20, Ps)).
Ps = [2, 3, 5, 7, 11, 13, 17, 19] ;
false.
ボディ部の記述順序を逆にしても実行可能です。例えば
sift([P|Xs1], Zs0) :-
true | Zs0 = [P | Zs1], filter(P, Xs1, Ys), sift(Ys, Zs1).
を
sift([P|Xs1], Zs0) :-
true | sift(Ys, Zs1), filter(P, Xs1, Ys), Zs0 = [P | Zs1].
に変えても同じ結果が得られます。
ストリームを先頭から順に具体化されるたびに出力していく
プログラムを書けば primes/2
がリスト全体を生成し終わるのを待たずに、
素数を生成していく様子を表示できます。
% 引数に与えられたストリームを読み取り、順次カンマ区切りで出力していく。
printstream(Ps) :- outstream(Stream), printstream(Ps, Stream).
% ストリームが空なら終わり。
printstream([], Stream) :-
true | Stream = [told].
% ストリーム先頭の出力。
printstream([P|Ps], Stream) :-
true | Stream = [write(P), ttyflush | Stream2], printstream2(Ps, Stream2).
% ストリーム終端に達したら、出力ストリームを閉じる。
printstream2([], Stream) :-
true | Stream = [nl, told].
% カンマ区切りで出力ストリームに出力する。
printstream2([P|Ps], Stream) :-
true | Stream = [write(','), write(P), ttyflush | Stream2], printstream2(Ps, Stream2).
これを使うには
?- ghc((printstream(Ps), primes(100, Ps)).
のようにします。
また別の実験として、->
/2 の定義を
ghc((P -> Q)) :- !, ghc(Q), !, ghc(P).
に変えてみると、変更前が
?- ghc((printstream(Ps), primes(100, Ps)).
2,3,5,7,11,13,17,19,23,29,31,37,41,43,47
Ps = [2, 3, 5, 7, 11, 13, 17, 19, 23|...] ;
false.
だったのに対し、変更後は
?- ghc((printstream(Ps), primes(100, Ps)).
23,5,7,11,13,17,19,23,29,31,37,41,43,47,
Ps = [2, 3, 5, 7, 11, 13, 17, 19, 23|...] ;
false.
のように順序が異なる結果が得られ、->/2
導入の効果を確認することができました。
まとめ
SWI-Prolog の when/2
を使って、簡単な GHC インタプリタを実装しました。
ガードによる待ち合わせが可能であるため、ボディ部の記述順序は前後させることができます。
但し、実行効率は悪くなります。
追記
本を読んでいるときには「ガード節に true を書くのは無駄、省略すればよい」と
思っていたのですが、実際に試してみると省略してはだめなケースがありました。
例えば、
printstream([P|Ps], Stream) :-
true |
Stream = [write(P) | Stream2],
printstream2(Ps, Stream2).
において、ガード部 true |
を省略して以下のように書いたとします:
printstream([P|Ps], Stream) :-
Stream = [write(P), write(',') | Stream2],
printstream(Ps, Stream2).
すると、clause/2
での問い合わせで得られるのは以下のようなものになってしまいます:
% '|' を省略した結果、Stream が単一化されてしまった
printstream([P|Ps], [write(P), write(',') | Stream2]) :-
printstream(Ps, Stream2).
これだと、GHC では第一引数、第二引数ともに実体化を待ち合わせてパターンマッチする、
という意味になってしまいます。
このような間違いを防ぐために、ガード条件がなにもない場合でも
true を条件として書くのが安全なようです。