2
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

[翻譯]人間のための中斷と時間制限

Posted at

たとえあなたの書いたコードが完璧だったとしてもそれ以外の部分は信用できない。ある時は他の人の書いたプログラムが落ちたり、ある時はネットワークが機能しなくなったり、またある時はプリンターに火が付くかもしれない。あなたはそういった事態に備えないといけないのだ。ネットワークでいうなら少なくとも

  • 成功
  • 失敗
  • 日が経ち、秋が過ぎ、冬が来れど未だ返事は来ない(応答なし)

の三つの可能性に備えないといけない。最初の二つは特に難しくないだろう。問題は三つ目で、適切に対応するには制限時間を設けないといけない。基本的に外部とやりとりする全ての場所でそれは必要で、もししなければ不具合の種となりうる。

お互い腹を割って話そう。もしあなたが並の開発者なら制限時間を設けなかったことによる不具合がたくさんあったはずだ。私はもちろんある。不思議なのが、時間制限というのは入出力を正しく行う為の基本的な機能であるから ほとんどのライブラリがその為の簡単で安全な方法を提供してくれているとあなたは期待するかもしれない。でも実際はそんなことはなく書くのが面倒くさかったり間違えやすかったりで実用的ではない場合がほとんどなのだ。だから気を落とすことなんてない。不具合があったとしてもそれは君のせいじゃなくて使っているライブラリのせいなんだから!

そこを何とかしたくて私は今Trioというライブラリを作っている。でも使いやすい時間制限のAPIというのはそう一筋縄ではいかないんだ。...(省略)

で、時間制限の何がそんなに難しんだろう?

単純な時間制限ではAPIの実装が面倒

最初に思いつく最も単純な時間制限の方法は 呼び出し元を長く待たす可能性のある関数(以後は"ブロッキング関数"と呼ぶことにする) 全てに制限時間を引数として渡せるようにすることだろう。これはthreading.Lock.acquire()のようなPython標準ライブラリで目にする物だ。

lock = threading.Lock()

# lockを得るまで最大で10秒待つ
lock.acquire(timeout=10)

またsocketmoduleでは制限時間を一度だけ設定すれば済むものの

sock = socket.socket()

# 制限時間は一度だけ設定
sock.settimeout(10)
# 接続が確立されるまで最大で10秒待つ
sock.connect(...)
# 接続先からの受信を最大で10秒待つ
sock.recv(...)

各method呼び出しそれぞれに独立した10秒の制限時間を設けていることに変わりはないからthreading.Lockとの違いは表面的なものだ。

「これの何がいけないんだ?十分わかりやすいだろ!」だって?もし私達がいつもこれら下位層のAPIを触っているのならこれでも十分かもしれない。でも例えばAWS S3からfileを落とすためにboto3S3.Client.get_objectを使うとしたら?S3.Client.get_objectrequestsを使ってS3 serverに一連のHTTP requestを送り、requestsmoduleはsocketmoduleに実際のネットワーク処理を委ねる事になる。つまり以下の3つのAPIを通る事になるだろう。(実際はもっとある1)。

s3client.get_object(...)
requests.get("https://...")
sock.recv(...)

そしてこれらに上のやり方で制限時間を設けると次のようになる。

s3client.get_object(..., timeout=10)
requests.get("https://...", timeout=10)
sock.recv(..., timeout=10)

すると問題が起こるのだ。これらAPIの実装が面倒になるのである。何故かって?例えばHTTP Responseを捌く場面を考えて欲しい。何処かでheaderに記されたbodyの長さに従ってbodyを受けとるための次のようなloopが出てくるだろう。

def read_body(sock, content_length):
    body = bytearray()
    while len(body) < content_length:
        max_to_receive = content_length - len(body)
        body += sock.recv(max_to_receive)
    assert len(body) == content_length
    return body

これに制限時間を設けることを考えて欲しい。これを使う側は「bodyを受けとるのに最大で10秒待つ」などとしたいはずだがその際に制限時間をそのまま下位層のsock.recv()に渡すことはできない。何故なら例えば最初のsock.recv()呼び出しに6秒かかって かつbodyの一部分しか受け取れなかった場合を考えて欲しい。処理全体を10秒で終える為には二回目のsock.recv()には4秒の制限時間を与えないといけないだろう。つまりこのやり方ではAPIの層を跨ぐ度に以下のような制限時間の再計算をする羽目になりかねないのだ。

def read_body(sock, content_length, timeout):
    read_body_deadline = timeout + time.monotonic()  # 訳注:与えられた制限時間(timeout)を時刻に変換している (deadlineは期限という意味)
    body = bytearray()
    while len(body) < content_length:
        max_to_receive = content_length - len(body)
        recv_timeout = read_body_deadline - time.monotonic()  # 訳注:制限時間を再計算
        body += sock.recv(max_to_receive, timeout=recv_timeout)
    assert len(body) == content_length
    return body

(しかもこのcodeはこれでも簡略化されている。実際にはsocktimeout引数じゃなくてsock.settimeout()を使う事になるからtry/finallyを使って確実に元の制限時間に戻す処理が要るだろう。)

現実にこのような事をする者は誰も居ない。私の知るPython用ライブラリで制限時間を引数にとる物は全てそのまま下位層へ渡すのだ。そしてそれは結果としてAPIの意味自体を変えてしまう。例えば現在もよく使われているであろう次のライブラリを見てほしい。

import threading
lock = threading.Lock()
lock.acquire(timeout=10)

import requests
requests.get("https://...", timeout=10)

どちらもtimeout引数を取っているが意味は完全に異なっている。1つ目が意味しているのは「lockの取得を試みるが最大で10秒しか待たない」だ。2つ目が意味しているのは「指定されたURLからの取得を試みるが、もし下層のsocket操作が一度でも10秒以上かかったら処理全体を諦める」だ。これはもし意地悪なserverが10秒毎に1byteしか送って来なかったら制限時間はその都度10秒から数え直され処理は終わらなくなるという事だ。requestsが愛されている理由がそういった下層の事を気にしなくていいからだったと思うが気の毒な事にしないといけない部分があるのだ。

私は何もrequestsにケチをつけようとしているのではない。この問題はPython用ライブラリの至るところに在るのだ。requestsを例に用いたのはその開発者であるKenneth Reitzが人間にとって分かりやすいAPIを作る事で有名だからで、timeoutは彼がそれをしくじった数少ない場所の一つである。

彼にすらできなかった事がどうやって私達にできるというのだ。諦めてtimeoutを引っ叩くだけにして終わってもいいが、それで終わりたくはない。

時刻を用いれば実装し易くなるが...

timeoutが駄目なら代わりに何があるだろう?実は一部の人々が薦めているやり方がある。上に出てきたread_bodyの例で時間を時刻に変換したのに気付いただろうか?もし全てのAPIが時間ではなく時刻を受け取るようになっていれば実装はうんと楽になる。時刻はそのまま異なる層に渡せるからだ。

def read_body(sock, content_length, deadline):
    body = bytearray()
    while len(body) < content_length:
        max_to_receive = content_length - len(body)
        body += sock.recv(max_to_receive, deadline=deadline)  # そのまま渡す
    assert len(body) == content_length
    return body

 # bodyを受け取るのに最大で10秒待つ
 deadline = time.monotonic() + 10
 read_body(sock, content_length, deadline)

(このような設計でよく知られているAPIにGo言語のsocket layerがある)

しかしこのやり方にも欠点はある。APIの実装者にとって煩わしい部分をAPIの外側に追い出すことには成功したものの、その代償を利用者が払う事になるからだ。利用者は依然として「〜するまで最大で10秒待つ」といった風に時間で指定したいはずだから時間から時刻への変換作業を利用者がする羽目になるだろう。あるいは全ての関数がtimeout=deadline=のどちらでも受け取れるようにする手もあるかもしれないが、その場合はその全ての関数に

  • timeoutdeadlineに変換する処理
  • timeoutdeadlineの両方が渡された時に例外を投げる処理

のための定型codeが入ってしまう。時刻による時間制限は時間よりも優れてはいるがまだ何か足りていない。

cancel token

cancel tokenが実装者の都合を隠す

足りていない物はこれだ。次のように二種類の時間制限に対応する代わりに

# 利用者側にとって好ましい形
requests.get(..., timeout=...)

# 実装者側にとって好ましい形
requests.get(..., deadline=...)

# 両者の都合を考えた実装
def get(..., deadline=None, timeout=None):
    deadline = normalize_deadline(deadline, timeout)
    ...

時刻を別のオブジェクトに隠した上で便利なコンストラクタを用意してあげたらどうだろう?

class Deadline:
    def __init__(self, deadline):
        self.deadline = deadline

def after(timeout):
    return Deadline(time.monotonic() + timeout)

# URLからの取得を試み、最大で10秒待つ
requests.get("https://...", deadline=after(10))

これは利用者にとって扱いやすいうえ内部では時刻を持っているため実装者にとっても優しい。

こうなるとさらに汎用性を高めたくなる。制限時間だけが何も処理を諦める理由ではない。「10秒経ったら諦める」はつまる所「何らかの条件を満たしたら諦める」の一種でしかない。例えばウェブブラウザーを作っているとしたら「URLからの取得を試みるがユーザーが中止ボタンを押したら諦める」といった事もしたいだろう。そして上位層のライブラリはDeadlineオブジェクトには触らずにただ下位層に渡すだけにし、いずれ下位層のどこかで適切に中断が行われるようにするのだ。だからこのオブジェクトをただ「期限(deadline)だけを表す物」とは考えずに「任意の中断条件を表す物」として考え、名前もDeadlineからCancelTokenに変えてしまおう。

# このライブラリは実在しないよ、ごめん
from cancel_tokens import cancel_after, cancel_on_callback

# 10秒後に"中断"状態になるCancelTokenオブジェクト
cancel_token = cancel_after(10)
# だからこのrequestは10秒後に中断される
requests.get("https://...", cancel_token=cancel_token)

# 'cancel_callback'が呼ばれたら"中断"状態になるCancelTokenオブジェクト
cancel_callback, cancel_token = cancel_on_callback()
# 中止ボタンが押されたらcancel_callbackが呼ばれるようにする
stop_button.on_press = cancel_callback
# だからこのrequestは中止ボタンが押されたら中断される
requests.get("https://...", cancel_token=cancel_token)

"中断条件"を一級市民へ昇格させた事でAPIは使いやすくなっただけでなく劇的に強力にもなった。時間制限だけでなくあらゆる中断に対応できるようになったのだ。例えば「2つのrequestを同時に送り、片方が完了次第もう片方を中断する」といったよくありそうな並行処理にも利用できるだろう、素晴らしい。私の知る限りこの案を最初に考えたのはJoe Duffyで、彼のC#におけるcancellation tokenから来ている。またGo言語のcontext objectも基本的に同じ考え方だ。そして実はcancel tokenは従来のやり方にあった様々な問題も解いてくれる。

cancel tokenの中断は状態である

この投稿は時間制限に始まり中断全般にまで話が広がってきた。でももし中断の事から考えていたならとある手法が巷でよく使われている事に気付いただろう。それはtask/threadの中で例外のような物を起こす手法でasyncio.Task.cancel() curio.Task.cancel()、pthread、JavaのThread.interrupt等で見られる物だ。ここではこの手法の事を"スレッド割り込み方式"の中断と呼ぶことにする。スレッド割り込み方式は瞬間的なeventであり、また常に1対1の対応になっているのだが(一回の中断がtask/thread内での一回の例外に対応する)このやり方には2つ問題がある。

一つ目は分かりやすい。もし普通に呼びたい関数があってそれを中断できるようにしたい時、その為だけに新たなtask/threadを作る必要がある事だ。

http_thread = spawn_new_thread(requests.get, "https://...")
# 中止ボタンが押された時にhttp_thread.interrupt()が呼ばれるようにする
stop_button.on_click = http_thread.interrupt
try:
    http_response = http_thread.wait_for_result()
except Interrupted:
    ...

このようにtask/threadが並行処理の為ではなくただ単に中断を実現する為だけに使われる事になる。それにもし中断対象の関数が内部で別のthreadを立てていたらどうなると思う?この例のrequests.getがもしそうなっていたら中断された時にそのthreadは取り残されるかもしれない。この問題に対応する為にはthreadに対する繊細な監視が求められるだろう。

cancel tokenはこの問題を解いてくれる。cancel tokenによる中断はただtokenを渡すだけなので新たにtask/threadを作る必要はない。またtokenの受け手が一つである必要はなく、複数の関数でも内部で立てた複数のthreadでもその間にある物でも何でも構わないので、内部で立てたthreadに只tokenを渡すだけで良い。

2つ目の問題はスレッド割り込み方式では中断が イベント として扱われる事だ。対してcancel tokenでは 状態 として扱われる。("未中断"状態に始まりどこかで"中断"状態に変わる)。この差は些細ではあるがAPIをより強固にしてくれる。これをedge-triggeredとlevel-triggeredの違いと思ってもらっても構わない。スレッド割り込み方式がedge-triggered式で中断を知らせ、cancel tokenがlevel-triggered式で知らせるのだ。edge-triggered式は使いづらいことで有名である。...(省略)

話がイマイチ掴みづらいと思うのでここで具体的な物を見てみよう。try/finallyを用いて確実に接続を切るといった典型例を考え欲しい。以下のcodeは少しわざとらしいがwebsocketで繋いでmessageを送りsend_messageが如何なる例外を起こしても確実に接続を切るとした物だ。2

def send_websocket_messages(url, messages):
    open_websocket_connection(url)
    try:
        for message in messages:
            ws.send_message(message)
    finally:
        ws.close()

この関数の実行中に通信先が落ちてしまいsend_messageが応答しなくなった状況を思い浮かべて欲しい。こちらはいつまでも待ってられないのでいずれ中断をかけることになる。

スレッド割り込み方式ではsend_messageの地点で例外が起こり直ちにfinally節の後片付けが行われる事になる。今のところ特に問題は無い。でもここで面白い事実がある。websocketの規格では切断の際にはcloseメッセージを送ることになっているのだ(お互いが綺麗に通信を終えるためにこれは一般的に良いことである)。なのでws.close()はcloseメッセージを送ろうとする。でも今接続を切ろうとしたのは接続先がこちらの送信に対して応答しなくなったからである。だからws.close()までもが応答しなくなってしまい、結果として中断をかけたのにも関わらず処理はそこで停まってしまうのだ。

cancel tokenを使えばそのような事は起きない。

def send_websocket_messages(url, messages, cancel_token):
    open_websocket_connection(url, cancel_token=cancel_token)
    try:
        for message in messages:
            ws.send_message(message, cancel_token=cancel_token)
    finally:
        ws.close(cancel_token=cancel_token)

tokenが一度"中断"状態になるとそのtokenを使った全ての操作は中断されるためws.close()が詰まる事はない。こちらの方が堅牢な設計である事が分かるだろう。

多くの古いAPIがこの部分で間違えているのが不思議である。もしこの投稿のように複数のブロッキング操作で構成された一つの上流操作に制限時間を設ける事を考えていたなら、最初のブロッキング操作が制限時間を使い切ったら続く操作を直ちに失敗させなければいけない事は明らかである。時間切れは状態で表す事が合っているのだ。それは時間制限を中断へ一般化させた場合にも当てはまる。でももし下位層のAPIの事しか考えていなかったらそれに気付く事は無かっただろう。あるいはasyncioやTwistedのように一般的な中断手法を用いて時間制限を実装しようとしていた場合も中断を状態で表す事の優位点は簡単に見過ごされてしまっただろう。

人間は怠惰なのでcancel tokenでは不十分

cancel tokenは直接timeoutやdeadlineを扱うより間違いなく優れているがまだ使い勝手に難がある。中断に対応させたい関数は必ずcancel_token=を引数として受け取らねばならないし その関数が内部で呼ぶ別の関数へいちいちtokenを渡さないといけないのだ。...(省略)

人間はそのような同じ事の繰り返しが苦手だ。いや君がそうだとは言わない。私は君が全ての関数に中断の機能を実装して毎日歯間ブラシを使うような真面目なプログラマーだと信じている。でももし君の仕事仲間がそこまで真面目じゃなかったら?あるいはプロジェクトが依存している外部ライブラリがあったとしてどれだけそれを信用できる?プロジェクトの規模が大きくなるにつれて全ての人間が間違えずにプログラムを作る可能性は零に近くなる。

幾つか例を挙げてもいいかな?上記のような設計を数年に渡って推し進めている有名な言語にC#とGoがあるけど彼らの最下層のネットワーク機能は未だcancel tokenには対応していない3。(省略)。彼らは代わりに制限時間期限をsocket objectに設定するという古いやり方を採っている。だからもしcancel tokenを使いたいのならそれらのAPIとの橋渡しをどうするか自分で考えないといけない。

実はその見本をGo言語の標準ライブラリが示してくれてはいる。ネットワーク接続を確立するための関数(Pythonのsocket.connect相当)はcancel tokenを受け付けるのだ。ただその実現には40行のコードと裏方taskを要し、また業務環境で一年経ってから競合状態に関する不具合が見つかりもした。なので...もしGoおけるsocket操作でcancel token(いやGoだからContextと呼ぶべきか)を使いたいのならそれがやり方になるのかもしれない...ご幸運を。

私は別にからかっているわけじゃない、中断というのはそれだけ難解なものなのだ。でもC#とGoというのは優秀な常勤開発者と50の大手企業によって支えられている巨大なプロジェクトだ。そんな彼らができない事を誰ができるというんだ?私ではない。私はPythonの入出力機能を作り直そうとしている一介の人間に過ぎない。そんな複雑な事はできっこない。

cancel scope:Trioの人に優しい解決策

投稿の最初の方で触れたPythonのsocketが、各操作毎に制限時間を受け取るのではなく一度だけ設定すればその後の全ての操作にそれが適用される物だったのを覚えているだろうか?また直前の章で述べたようにC#とGoも同じ事をしている。おそらく彼らは気付いているのだろう、全ての関数呼び出しに跨って渡すべき何らかのデータがある時には怠惰な人間ではなく機械にやらせるべきだと。ただし今必要なのはsocket専用の物じゃなくより汎用化された物だ。

以下がTrioにおいてHTTP Requestに10秒の制限時間を設けるやり方だ。

# Trioの下位層のAPIを用いたやり方
with trio.open_cancel_scope() as cancel_scope:
    cancel_scope.deadline = trio.current_time() + 10
    await request.get("https://...")

(訳注:trio.open_cancel_scope()は古いAPIで、現在はtrio.CancelScope()としないといけない)

もちろん普通はwrapperを用いて以下のように書く。

# Trioでの普通の書き方
with trio.move_on_after(10):
    await request.get("https://...")

でもこの投稿は内部の設計に焦点を当てたいので前者を使って話を進めることにする。(謝辞:with構文を時間制限に用いる手法はDave Beazley氏のCurioで初めて見かけた物だ。そこから大分いじったんだけどね。詳しくは4)

with trio.open_cancel_scope()はcancel tokenを作りはするけど表には出さないと考えてほしい。その代わりtokenは内部のスタックに積まれ、with block内で行われる全てのブロッキング操作に自動で適用される。だからrequestはtokenを受け渡す必要はなく下層のsocket操作が行われる際に自動で時間制限が適用されるのだ。

...(省略)

  1. boto3 → botocore → requests → urllib3 → http.client → socket

  2. 実際にはwithを用いてwith open_websocket_connection(url) as ws:とするだろうが、そうするとws.closeが見えなくなるため余計に問題の原因がわかりにくくなる。

  3. 驚くことにC#の高水準async network APIはcancel tokenを受け取りはするが使わない

  4. Curioの時間制限はthread割り込み方式が基になっていて(JavaやC#のThread.interruptに似ている)、私がDaveに訴えるまでは入れ子には対応していなかったし、自身のtaskにしか適用されないため子taskを作っていたとしても無視されてしまう。Trioのcancel scopeは言ってみれば Curioの時間制限 + C#のcancel token + 分かりやすい入れ子 + shield + 子task達にstack構造を尊重させる為のnursery式の並行処理 を足した物だ。これらが何なのかは投稿を読んでいくと分かる。

2
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?