やりたい事
Gradioアプリをあるサーバにホストしました。このサーバにはグローバルIPが割り振られてお、外側からアクセスできます。この状況下で、事前に決めたパスワードを知っている者しかGradioアプリの機能を使えないようにしたいです。そのために、ここではチャレンジレスポンス認証を実装しようと思います。
セキュリティに関してはとても素人なので、参考程度にお願いします。また、誤りがありましたら教えていただけると幸いです。
チャレンジレスポンス認証
サーバを$S$、クライアントを$C$、チャレンジを$c$、正解のパスワードを$p^*$、ハッシュ関数を$\texttt{hash}$とします。チャレンジレスポンス認証は次のように行われます:
番号 | 概要 | 詳細 |
---|---|---|
1 | パスワードの作成 | 適当な$p^*$を作り、$S$に保存する。 |
2 | チャレンジの作成 | $S$が適当な$c$を生成する。 |
3 | チャレンジの送信 | $S$が$c$を送信し、$C$が受け取る。 |
4 | レスポンスの作成 | $C$がユーザなどからパスワード$p$を訊き、$r:=\texttt{hash}(c+p)$を計算する。 |
5 | レスポンスの送信 | $C$が$r$を送信し、$S$が受け取る。 |
6 | レスポンスの照合 | $S$は$r==\texttt{hash}(c+p^*)$かどうかを判断する。真の場合、認証は成功したと言える。 |
パスワード$p$を一度も平文でやり取りすることなく、代わりに$r$を介して、$p=p^*$かどうかを間接的に確認しているのです。
この方法では、中間者による若干の攻撃が想定されます。
仮に何かしらの方法で$r$が傍受されると、チャレンジ$c$が有効である期間内においては、その$r$を再び送れば認証がされてしまいます。チャレンジ$c$の有効期間を短めに設定すれば、そのような攻撃の可能性を減らせます。
今回は、レスポンスの照合が成功するたびにチャレンジ$c$を更新するようにします。
実装
今回は次のようなモジュールを使います:
import gradio as gr
from pathlib import Path
import nanoid
from dataclasses import dataclass
import hashlib
from typing import Dict
また、こういうフロントを組みます:
with gr.Blocks(title="Gradio Challenge Response", fill_width=True, head=head) as app:
with gr.Row():
challenge_tb = gr.Textbox(label="チャレンジ", value="")
password_tb = gr.Textbox(label="パスワード", value="")
with gr.Row():
content1_tb = gr.Textbox(label="内容1", value="1")
content2_tb = gr.Textbox(label="内容2", value="2")
content3_tb = gr.Textbox(label="内容3", value="3")
output_md = gr.Markdown(label="返り")
submit_button = gr.Button(value="送信")
1. パスワードの作成
パスワードを作成し、サーバ側に保存しておきます。このパスワードはサーバと私の記憶の中以外には出さないようにします。
PASSWORD_ANSWER = "hoge"
2. チャレンジの作成
Gradioで新しいセッションが開始されたら新しいチャレンジを作成し、そのセッションのIDに紐づけるようにします。セッションのIDはgradio.Request.session_hash
から取得できます。また、Gradioのハンドラはgradio.Request
引数を受け取ることができます。
# 2. セッションハッシュ to チャレンジ
challenges: Dict[str, str] = {}
# 2. チャレンジを作成してセッションのIDに紐づける
def update_or_initialize_challenge(request: gr.Request) -> str:
session_hash = request.session_hash
challenge = nanoid.generate()
challenges[session_hash] = challenge
print(f"update_or_initialize_challenge: {session_hash=} {challenge=}")
return challenge
# 2. セッションの終わりに呼び出される
def cleanup_instance(request: gr.Request):
session_hash = request.session_hash
if session_hash in challenges:
del challenges[session_hash]
print(f"cleanup_instance: {session_hash}")
with gr.Blocks(title="Gradio Challenge Response", fill_width=True, head=head) as app:
# 中略
# 2. セッションの初めに呼び出される
app.load(update_or_initialize_challenge, inputs=None, outputs=challenge_tb)
# 2. セッションの終わりに呼び出される
app.close(cleanup_instance)
セッションの扱いについては、Gradio DocsのManaging Stateを参照ください。
3. チャレンジの送信
作成したチャレンジの値をフロントまで飛ばさないといけません。そのために、セッション開始時に専用のTextbox
にチャレンジの値をセットするようにします。上でも既に示していますが、次の行が該当箇所です:
app.load(update_or_initialize_challenge, inputs=None, outputs=challenge_tb)
4. レスポンスの作成
レスポンスの作成は、実際にGradioの機能を動かす時に裏で行うことになります。レスポンスは、サーバから受け取ったチャレンジ$c$とユーザから入力してもらうパスワード$p$を使って生成されます。ここで、$p$はネットワーク通信に流してはいけません。
そこで、フロント側でレスポンス$r$を作成するカスタムコードを作り、$r$のみが通信として流れるようにします。カスタムコードについてはCustomizing your demo with CSS and Javascriptを参照してください。
Gradioでは、
- Webページのヘッダに任意のHTMLを書き足すこと
- イベント(ボタンクリック等)発生時に、サーバに値を渡す前に任意のJavaScriptを実行し、値を加工すること
などができます。
まずはチャレンジ$c$とパスワード$p$を元にレスポンス$r$を作るメソッドをヘッダにて定義しておきます。
<script src="http://cdn.rawgit.com/h2non/jsHashes/master/hashes.js"></script>
<script defer>
function pre_submit(challenge, password, ...args) {
const engine = new Hashes.SHA256()
const response = engine.hex(challenge + password)
return [response, response, ...args]
}
</script>
head = Path("./head.html").read_text(encoding="utf-8")
任意のGradioの機能を呼び出す時に使う事を想定し、引数を(challenge, password, 任意の引数)
としています。
返り値の先頭2つがどちらもresponse
である理由:
どうやらGradioのカスタムJSでは、引数の数と戻り値の要素数が同じじゃないといけないみたいなんです(この事はドキュメントでは明言されていません)。なので、戻り値の第2要素目も渋々response
にしています。
今回はSHA256をハッシュ関数として用いることにします。また、次のようなハッシュライブラリを使いました。
5&6. レスポンスの送信&レスポンスの照合
そのレスポンス$c$が、対応するセッションのチャレンジ$c$から生成されたものなのかを判定するメソッドを作ります。
def is_authorized(response: str, request: gr.Request) -> bool:
session_hash = request.session_hash
print(f"is_authorized: {response=} {session_hash=}")
if session_hash not in challenges:
return False
challenge = challenges[session_hash]
response_answer = hashlib.sha256((challenge + PASSWORD_ANSWER).encode()).hexdigest()
return response == response_answer
アプリの機能を使う時に認証を行うことを想定し、認証に必要な値(チャレンジ$c$とパスワード$p$)と任意の引数をセットで送るようにします。
submit_button.click(
submit,
inputs=[challenge_tb, password_tb, # 認証に必要な値
content1_tb, content2_tb, content3_tb], # 機能使用のための任意の引数
outputs=[output_md, # 機能の戻り値
challenge_tb], # 新しいチャレンジを受け取る
js="pre_submit"
)
例のJSコードを経由し、レスポンス$r$が生成された後、ハンドラsubmit
に送信されます。submit
では5つのコンポーネントの値に加え、request: gr.Request
も受け取るようにしています。
def submit(response: str, response_2: str, content1: str, content2: str, content3: str, request: gr.Request):
if not is_authorized(response=response, request=request):
raise gr.Error(f"Autorization Error")
return f"Success: {content1=} {content2=} {content3=}", update_or_initialize_challenge(request=request)
認証が成功するたびにチャレンジを変更するようにしています。また、機能からの戻り値と一緒に、新しく設定されたチャレンジも送信するようにし、これを再び例の専用のTextbox
にセットしています。
処理の過程をまとめると、
(challenge, password, 任意の引数)
$\xrightarrow{\text{カスタムJS: pre_submit}}$(response, response, 任意の引数)
$\xrightarrow{\text{ネットワーク通信}}$$\xrightarrow{\text{Python: submit}}$(機能の戻り値, challenge)
という感じになっていて、password
がネットワーク通信されることを防いでいます。
コード全体
説明がごちゃごちゃしちゃったのでここでコードの全体を示します。
head.html
<script src="http://cdn.rawgit.com/h2non/jsHashes/master/hashes.js"></script>
<script defer>
function pre_submit(challenge, password, ...args) {
const engine = new Hashes.SHA256()
const response = engine.hex(challenge + password)
return [response, response, ...args]
}
</script>
main.py
import gradio as gr
from pathlib import Path
import nanoid
from dataclasses import dataclass
import hashlib
from typing import Dict
PASSWORD_ANSWER = "hoge"
challenges: Dict[str, str] = {}
def update_or_initialize_challenge(request: gr.Request) -> str:
session_hash = request.session_hash
challenge = nanoid.generate()
challenges[session_hash] = challenge
print(f"update_or_initialize_challenge: {session_hash=} {challenge=}")
return challenge
def cleanup_instance(request: gr.Request):
session_hash = request.session_hash
if session_hash in challenges:
del challenges[session_hash]
print(f"cleanup_instance: {session_hash}")
def is_authorized(response: str, request: gr.Request) -> bool:
session_hash = request.session_hash
print(f"is_authorized: {response=} {session_hash=}")
if session_hash not in challenges:
return False
challenge = challenges[session_hash]
response_answer = hashlib.sha256(
(challenge + PASSWORD_ANSWER).encode()).hexdigest()
return response == response_answer
def submit(response: str, response_2: str, content1: str, content2: str, content3: str, request: gr.Request):
if not is_authorized(response=response, request=request):
raise gr.Error(f"Autorization Error")
return f"Success: {content1=} {content2=} {content3=}", update_or_initialize_challenge(request=request)
head = Path("./head.html").read_text(encoding="utf-8")
with gr.Blocks(title="Gradio Challenge Response", fill_width=True, head=head) as app:
with gr.Row():
challenge_tb = gr.Textbox(label="チャレンジ", value="")
password_tb = gr.Textbox(label="パスワード", value="")
with gr.Row():
content1_tb = gr.Textbox(label="内容1", value="1")
content2_tb = gr.Textbox(label="内容2", value="2")
content3_tb = gr.Textbox(label="内容3", value="3")
output_md = gr.Markdown(label="返り")
submit_button = gr.Button(value="送信")
submit_button.click(
submit,
inputs=[challenge_tb, password_tb,
content1_tb, content2_tb, content3_tb],
outputs=[output_md, challenge_tb],
js="pre_submit"
)
app.load(update_or_initialize_challenge, inputs=None, outputs=challenge_tb)
app.close(cleanup_instance)
app.launch()
動いている風景
Webページを開きます。
開いた瞬間にチャレンジにランダムな文字列が入っています。
コンソールを見ると、セッションが作成され、チャレンジも作成されたことが分かります。
update_or_initialize_challenge: session_hash='0r4j2sz2mhc' challenge='PbNGWRDXPV3--Jri6mmVu'
パスワードにデタラメな文字を入力すると、認証に失敗し例外が送出されます。
パスワードに私が決めた値、"hoge"を入れると、認証が成功します。それと同時にチャレンジが更新されます。
ここでもう一つ新しいタブを開いてみます。するとコンソールに新しいsession_hash
が現れ、それに対応するチャレンジが生成されます。
update_or_initialize_challenge: session_hash='eelpmhutfs' challenge='4OoiWIbD4YB6FVBuSz3cD'