1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Pythonでマインクラフトサーバーを書き直してみるよ: #4 ユーザーログインと通信の暗号化

Posted at

はじめに

こんにちは。Pythonでマインクラフトサーバーを書き直してみる開発記録4日目です。
今回はLogin Intentが定義するログインプロトコルを実装します。

おさらいと変更点

ソケット周りのIOと抽象的なパケット定義が確立したのでサーバーリストPingプロトコルを実装しました。問題なく動いてくれたのでこの調子で今回はログインプロトコルを実装します。
プロジェクトの規模が大きくなってきたのでgitを使ってバージョンコントロールを行います。これに伴ってソースコードをsrcに移動しsetup.pyでアプリケーションを管理するようにしました。以降はコンソールでpyncraftコマンドを使うとサーバーが起動します。

マイクラにおける通信の暗号化

暗号化のプロトコルはこちらで解説されているものを参考にしています。
https://minecraft.wiki/w/Java_Edition_protocol/Encryption

mcryptoモジュールを作りここに暗号化関連のコードを書いていきます。

サーバーは1024bit幅のRSA暗号をつかってクライアントから共有鍵を受け取ります。共有鍵は今後のAES暗号通信の為のencryptorとdecryptorの生成に使われます。
RSA暗号鍵の作成にはcryptographyライブラリを使用します。Pythonで暗号をやるならまず安定したライブラリです

$ pip install cryptography

このライブラリを使用してサーバーが保持するRSA秘密鍵と公開鍵を作ります。

# networking/mcrypto.py
from cryptography.hazmat.primitives.asymmetric import rsa

def gen_rsa_key_pair(key_size=1024):
    # RSA秘密鍵と公開鍵を生成する
    # バニラではbit幅は1024bit
    private_key = rsa.generate_private_key(key_size=key_size, public_exponent=65537)
    public_key = private_key.public_key()
    return private_key, public_key

サーバーは公開鍵をDERエンコードしてクライアントへ送ります。なのでderエンコードをするためのコードを書きます。

# networking/mcrypto.py
def encode_public_key_der(public_key: rsa.RSAPublicKey) -> bytes:
    der_bytes = public_key.public_bytes(
        encoding=serialization.Encoding.DER,
        format=serialization.PublicFormat.SubjectPublicKeyInfo
    )
    return der_bytes

この鍵を使って暗号化/復号化させるためのヘルパーメソッドを作ります。

# networking/mcrypto.py
def encrypt_rsa(data: bytes, public_key: rsa.RSAPublicKey) -> bytes:
    return public_key.encrypt(data, padding.PKCS1v15())

def decrypt_rsa(data: bytes, private_key: rsa.RSAPrivateKey) -> bytes:
    return private_key.decrypt(data, padding.PKCS1v15())

次にクライアントから受け取るAES共有鍵からencryotprとdecryptorを取得するヘルパーを作ります。

# networking/mcrypto.py
def gen_ciphers(shared_secret):
    encryptor = Cipher(
        algorithms.AES(shared_secret), modes.CFB8(shared_secret), backend=default_backend()
    ).encryptor()
    decryptor = Cipher(
        algorithms.AES(shared_secret), modes.CFB8(shared_secret), backend=default_backend()
    ).decryptor()
    return encryptor, decryptor

サーバーはここから得たサイファーを接続ステートに保持し、今後の通信の暗号化を行います。

# networking/connection.py
class JEConnectionState:
    def __init__(self):
        self.state = JEPacketConnectionState.HANDSHAKING # 接続状態を管理するための変数
        self.compression_threshold = -1 # 圧縮プロトコルのしきい値 (-1は圧縮なし)

        # 暗号化
        self.verify_token = None
        self.cipher_pair = None

        # ユーザー情報
        self.username = None
        self.uuid = None

self.cipher_pairNoneでない場合、通信は暗号化されるのでこのロジックを組み込みます。
ソケットへの送受信を担当するのはラッパークラスのrecv_to_buffer()flush()関数なのでこれらを書き換えます。

# networking/connection.py
# クライアントソケットにデータがある場合は受信してバッファに追加
if connection in read_ready:
    decryptor = connection.con_state.cipher_pair[1] if connection.con_state.cipher_pair else None
    connection.packet_wrapper.recv_to_buffer(decryptor)
# networking/connection.py
# クライアントソケットが書き込み可能な場合はパケットを送信
if connection in write_ready:
    encryptor = connection.con_state.cipher_pair[0] if connection.con_state.cipher_pair else None
    connection.packet_wrapper.flush(encryptor)
# networking/mcpacket/io.py
def recv_to_buffer(self, decryptor):
    # この時点でクライアントソケットにデータが存在していることを前提とする
    data = self._client_socket.recv(4096)
    if data == b'':
        return -1  # ソケットが閉じられた場合は-1を返す
    if decryptor:
        # データを復号化する
        data = decryptor.update(data)
    self._input_buffer.extend(data)
    return 0
# networking/mcpacket/io.py
def flush(self, encryptor):
    if self._output_buffer is None:
        raise ValueError("Output buffer is not initialized.")
    packet_content = self._output_buffer.get_value()
    if packet_content == b'':
        return
    if encryptor:
        # データを暗号化する
        packet_content = encryptor.update(packet_content)
    self._output_buffer.clear()
    self._client_socket.send(packet_content)

これで通信の暗号化は完成です。

ユーザーログインプロトコルを実装する

クライアントが送るハンドシェイクの三つある意図の一つです(前回の記事参照)。マルチプレイメニューの「サーバーに接続」若しくは「ダイレクト接続」をすると始まるプロトコルがこれです。この場合、接続はハンドシェイクステート、ログインステート、コンフィグステート、プレイステートの順番に切り替わります。

まずはserverboundパッケージとclientboundパッケージにログインモジュールを作ります。ここにログインパケットを書いていきます。

以下がログインプロトコルの解説と実装です。こちらはWikiで明確に説明がされているようです。
https://minecraft.wiki/w/Java_Edition_protocol/Packets#Login

1). C -> S ハンドシェイク(Login Intent)
まず、クライアントはサーバーにログインを意図したハンドシェイクパケットを送ります。サーバーはクライアントとの接続をハンドシェイクステートからログインステートに切り替え、続くログイン開始パケットを待ちます。

2). C -> S ログイン開始
ハンドシェイク送信後、クライアントは速やかにログイン開始パケットをサーバーに送ります。

# networking/mcpacket/serverbound/login.py
@ServerboundPacket.register_packet(JEPacketConnectionState.LOGIN, 0x00)
class SLoginStart(ServerboundPacket):
    def __init__(self, username: str, uuid: uuid.UUID):
        self.username = username
        self.uuid = uuid

    @property
    def packet_id(self):
        return self._packet_id
    
    def handle(self, con_state) -> login.CEncryptionRequest:
        # C -> S: LoginStart (ログイン開始)
        # S -> C: EncryptionRequest (暗号化リクエスト)
        con_state.username = self.username
        con_state.uuid = self.uuid
        public_der = pyncraftserver.public_der
        # 適当なバイト列を生成(後々の照合に使用)
        verify_token = os.urandom(16)
        con_state.verify_token = verify_token
        # 暗号化リクエストをクライアントへ送信
        return login.CEncryptionRequest(public_der, verify_token)

    @classmethod
    def from_bytes(cls, packet_buffer):
        # Nameを読み込む
        username = packet_buffer.read_utf8_string(16)
        # UUIDを読み込む
        uuid = packet_buffer.read_uuid()
        return cls(username, uuid)

3). S -> C 暗号化リクエスト
これに対して、サーバー側はDERエンコードされたRSA公開鍵とランダムに生成したバイト列(認証トークン)をクライアントに暗号化リクエストとして返信します。

# networking/mcpacket/clientbound/login.py
class CEncryptionRequest(ClientboundPacket):
    def __init__(self, public_der: bytes, verify_token: bytes, should_authenticate: bool = True, server_id: str = None):
        self._public_der = public_der
        self._verify_token = verify_token
        self.should_authenticate = should_authenticate
        self.server_id = '' if not server_id else server_id

    @property
    def packet_id(self):
        return 0x01

    def to_bytes(self, con_state):
        packet_buffer = JEPacketBuffer()
        # Server ID (バニラは未使用)
        packet_buffer.write_utf8_string(self.server_id, 20)
        # 公開鍵
        packet_buffer.write_varint(len(self._public_der))
        packet_buffer.write(self._public_der)
        # 認証トークン
        packet_buffer.write_varint(len(self._verify_token))
        packet_buffer.write(self._verify_token)
        # クライアントがアカウント認証するかどうか (con_stateの値に依存)
        packet_buffer.write_boolean(self.should_authenticate)
        return packet_buffer

4). C -> S: 暗号化リスポンス
クライアントは受け取ったサーバーの公開鍵を使って認証トークンとAES共通鍵を暗号化させ、暗号化リスポンスパケットとしてサーバーへ送ります。サーバーからアカウント認証を求められている場合はMojangセッションサーバーと問い合わせてアカウントをログインさせます。(詳しくはこちらを参照しています: https://minecraft.wiki/w/Java_Edition_protocol/Encryption#Authentication)

# networking/mcpacket/serverbound/login.py
@ServerboundPacket.register_packet(JEPacketConnectionState.LOGIN, 0x01)
class SEncryptionResponse(ServerboundPacket):
    def __init__(self, shared_secret: bytes, verify_token: bytes):
        self._shared_secret = shared_secret
        self._verify_token = verify_token
        
    @property
    def packet_id(self):
        return self._packet_id
    
    def handle(self, con_state) -> login.CLoginSuccess | login.CDisconnect:
        # C -> S: EncryptionResponse (暗号化応答)
        # S -> C: LoginSuccess (ログイン成功)
        # 通信の暗号化
        rsa_private, _ = con_state.rsa_pair
        server_id = con_state.server_id
        public_der = con_state.public_der
        verify_token = decrypt_rsa(self._verify_token, rsa_private)
        if con_state.verify_token != verify_token:
            return login.CDisconnect("Encryption failed")
        shared_key = decrypt_rsa(self._shared_secret, rsa_private)
        # ここから先すべて暗号化
        con_state.cipher_pair = gen_ciphers(shared_key)
        # クライアントログイン
        hash = auth_hash(server_id, shared_key, public_der)
        params = {
            'username': con_state.username,
            'serverId': hash,
        }
        # Mojang APIへユーザー認証をリクエスト
        response = requests.get('https://sessionserver.mojang.com/session/minecraft/hasJoined', params=params)
        if response.status_code != 200:
            return login.CDisconnect("Authentication failed: Perhaps Mojang API is down?")
        data = response.json()
        profile_id = uuid.UUID(data.get('id'))
        player_name = data.get('name')
        name = data.get('properties')[0].get('name')
        value = data.get('properties')[0].get('value')
        signature = data.get('properties')[0].get('signature')
        logger.info(f'Player {player_name} with UUID {profile_id} has been authorized', False)
        return login.CLoginSuccess(profile_id, player_name, name, value, signature)

    @classmethod
    def from_bytes(cls, packet_buffer):
        # 共通鍵
        shared_secret_length = packet_buffer.read_varint()
        shared_secret = packet_buffer.read(shared_secret_length)
        # 認証トークン
        verify_token_length = packet_buffer.read_varint()
        verify_token = packet_buffer.read(verify_token_length)
        return cls(shared_secret, verify_token)

5). S -> C: ログイン成功
サーバー側は暗号化リスポンスを受け取ると秘密鍵でバイト列を復号化し、暗号化前のバイト列と照らし合わせます。問題ない場合はAES共通鍵を復号化し、encryptorとdecryptorを生成します。このサイファーペアは接続ステートに保持され、今後の暗号化に使用されます。この時点で通信はすべて暗号化されます。アカウント認証が有効な場合はここで照らし合わせます。まずサーバーはクライアントへ送ったserver_id、DER公開鍵と、クライアントから受け取った共通鍵で認証hashを計算します。このhashとユーザー名をMojangセッションサーバーへ問い合わせて認証を完了させます。最後にサーバーはこれをクライアントへログイン成功パケットとして返信します。
Propertyフィールドの三つはMojang APIから受け取った情報をそのまま使います。

# networking/mcpacket/clientbound/login.py
class CLoginSuccess(ClientboundPacket):
    def __init__(self, profile_id: uuid.UUID, player_name: str, property_name: str, value: str, signature: str):
        self.uuid = profile_id
        self.username = player_name
        self.property_name = property_name
        self.value = value
        self.signature = signature
        
    @property
    def packet_id(self):
        return 0x02
    
    def to_bytes(self, con_state):
        packet_buffer = JEPacketBuffer()
        packet_buffer.write_uuid(self.uuid)
        packet_buffer.write_utf8_string(self.username, 16)
        # プロパティフィールド
        packet_buffer.write_varint(1)
        packet_buffer.write_utf8_string(self.property_name, 64)
        packet_buffer.write_utf8_string(self.value, 32767)
        packet_buffer.write_boolean(self.signature is not None)
        if self.signature:
            # 署名がある場合は書き込む
            packet_buffer.write_utf8_string(self.signature, 1024)
        return packet_buffer

6). C -> S: ログイン承知
これに対してクライアントはログインプロセスが完了を承知したことを表すログイン承知(Login Acknowledged)パケットを返信します。このパケットを最後にクライアントとサーバーはそれぞれ接続をログインステートからコンフィグステートへ切り替えます。

# networking/mcpacket/serverbound/login.py
@ServerboundPacket.register_packet(JEPacketConnectionState.LOGIN, 0x03)
class SLoginAcknowledged(ServerboundPacket):
    def __init__(self):
        pass

    @property
    def packet_id(self):
        return self._packet_id
    
    def handle(self, con_state):
        # S -> C: LoginSuccess 
        # C -> S: LoginAcknowledged (ログイン完了)
        # 接続をCONFIGに変更
        logger.info(f'{con_state.username} logged in', False)
        con_state._switch_state(JEPacketConnectionState.CONFIGURATION)
        return None
    
    @classmethod
    def from_bytes(cls, packet_buffer):
        return cls()

コンフィグプロトコルを実装する

ログイン認証が終わった後、接続はコンフィグステートに切り替わります。
コンフィグステートではクライアント側はユーザーの設定している描写距離などの情報をサーバーへ送ります。サーバーは同じ要領でサーバー側の設定を送ります。

コンフィグステートでは具体的なパケットのフローは定義されていないのでお互いが必要な設定をパケットでステートレスにやり取りします。これが終わるとサーバー側はコンフィグ完了パケットをクライアントへ送り接続はプレイステートに切り替わります。

今回の実装ではサーバー側は設定の送受信の準備ができていないので即座にコンフィグ完了パケットを送ってプレイステートへ移行させます。

# networking/mcpacket/clientbound/configuration.py
@ClientboundPacket.repliable(SFinishConfigurationAcknowledged)
class CFinishConfiguration(ClientboundPacket):
    def __init__(self):
        pass

    @property
    def packet_id(self):
        return 0x03
    
    def to_bytes(self, con_state):
        packet_buffer = JEPacketBuffer()
        return packet_buffer

ここで新しくClientboundPacket.repliableデコレータを追加しました。今まではクライアントのパケットに返信するパケットのみを定義していたのでサーバー側は送ったパケットに対しての後処理をする必要がありませんでした。しかしコンフィグ完了パケットをはじめとしたパケットはサーバー側が返信を待たなければいけないので、後々送ったパケットに返信が到着しているかを確認するためのロジックが必要になります。repliableでデコレートされたクライアント行きのパケットはその返信を監視するため、タイムアウト等のロジックを提供します。サーバーはパケットを適当なタイミングで送り、タイムアウトを設けてクライアントからの返信を待たせる仕組みです。
サーバー側はConnectionオブジェクトに用意したqueue_packet()から個々のクライアントにパケットを送信できます。

ここで論理サーバーの土台となるPyncraftServerクラスを定義します。
コンフィグステートに移行したらサーバーのゲームループにクライアント接続を登録し、以降の通信はそちらに任せる方向にしました。

# core/pyncraftserver.py
class PyncraftServer:
    def __init__(self):
        # クライアント接続
        self._connected_clients = []
        self._connected_clients_lock = threading.Lock()
        self._processor = get_listener()._connection_processor

    def start_loop(self):
        while True:
            # サーバーに接続するすべてのクライアントを取得
            all_connections = self._processor.all_connections()
            # 接続したクライアントがコンフィグステートならサーバーコンフィグを設定
            config_connections = [c for c in all_connections if c.con_state.get_state() == JEPacketConnectionState.CONFIGURATION]
            self.configurations(config_connections)
            # 接続したクライアントがPLAYステートなら最後のtickで更新されたサーバー状態のパケットを送信
            play_connections = [c for c in all_connections if c.con_state.get_state() == JEPacketConnectionState.PLAY]
            self.send_server_updates(play_connections)
            # ここでサーバーtick処理
            time.sleep(0.2)

    def configurations(self, connections: list[Connection]):
        for con in connections:
            con_state = con.con_state
            client_infos = con_state.configs()
            if client_infos is None:
                continue
            client_info, plugin_message = client_infos
            # ここでコンフィグ設定
            # クライアントへPLAY状態へ移行するためのパケットを送信
            if con.queue_packet(configuration.CFinishConfiguration(), 1) is None:
                logger.error(f'Failed to send CFinishConfiguration to {con._address}')
                continue
            logger.info(f'Configuration setup for {con._address} completed. Switching to PLAY')
    
    def send_server_updates(self, connections: list[Connection]):
        for con in connections:
            con.queue_packet(play.CDisconnect('ユーザー認証、通信暗号化、コンフィグ設定が完了しました'))

とりあえずループだけ用意し、これをメインスレッドで動かします。通信関係のコードは別スレッドで動いているのでメインのループは適当なタイミングでパケットを送受信する事になります。接続ステートはフィールドを直接読み込むと同期が取れないのでthreading.Lockで同期させ、get_state()で読み込むようにします。

configurations()がコンフィグステートにある接続を処理します。すべての接続はコンフィグステートに切り替わると必ずここを一回だけ通過し、プレイステートに切り替えます。プレイステート状態にある接続はsend_server_updates()関数がサーバー状態をパケットとして送信します。

マイクラから接続テストしてみる

サーバーに接続する際、サーバーとクライアントはユーザーログイン、通信の暗号化、コンフィグのやりとりを行っています。この一連のやり取りが完了すると接続はプレイステートに切り替わります。
プレイステートに移行した接続では、サーバーはすぐにゲーム情報を送り始めます。今回はまだゲームロジックを入れていないのでサーバーが閉じた際に送られるパケット、Disconnectパケットを送って強制的に接続を閉じています。この時に、「ユーザー認証、通信暗号化、コンフィグ設定が完了しました」というメッセージが表示されていれば今回の実装は完了です。

ezgif-4343d5275786ce.gif

まとめ

  • 暗号化にはcryptographyライブラリを使用する
  • サーバーはRSAキーペアでクライアントから受け取る共通鍵を暗号化させる
  • ユーザーログイン、暗号化、コンフィグの一連のプロトコルが実装完了
  • 次回からチャンクの読み込みとプレイヤーのスポーンを目指します

四日目のレポジトリはこちら

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?