28
29

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Pythonで認証コードを返すだけのマイクラサーバーを作ってみる

Last updated at Posted at 2024-03-20

はじめに

接続したときに「あなたの認証コードは〇〇です」みたいなの返すやつ作ってみたい
前の記事書いてて「これサーバー側も作れそう」ってなったからやってみる

追記
GitHubにレポジトリを作成しました。
Pull RequestやIssueなどお待ちしています。
https://github.com/n-mache/pymcauthserver

Quarry

Quarryってモジュールを見つけたけど、どうやら更新が止まってて1.20.4では機能しないらしい...
1から実装するしか無さそう?

実際にやってみる

認証コード返すだけのサーバーだし、ステータスとログイン処理さえあればいいはず

モジュール

cryptographyrequests だけ標準で入ってないから pip install で入れないと動かない

import socket #TCP通信とかをするやつ
import json #JSONをパースしてくれるやつ
import struct #バイナリを簡単にするやつ(?)
import threading #同時接続とかをするために使うやつ
from cryptography.hazmat.backends import default_backend #暗号化関係
from cryptography.hazmat.primitives.asymmetric import rsa, padding #暗号化関係
from cryptography.hazmat.primitives.ciphers import algorithms, modes #暗号化関係
from cryptography.hazmat.primitives import ciphers, serialization #暗号化関係
import os #OSの機能を使うためのやつ
import random #ランダム生成に使うやつ
import string #文字とかを使うやつ
import requests #HTTP通信をするためのやつ
import hashlib #ハッシュを生成するためのやつ
import uuid #UUIDをいろいろ扱えるやつ
import time #時間を扱うためのやつ

VarInt

前の記事のやつをそのまま使う

def encode_varint(num):
    res = b""   
    while num:
        b = num & 127
        num = num >> 7
        if num != 0:
            b |= 128
        res += bytes([b])
    return res

def decode_varint(data):
    val = 0
    shift = 0
    for d in data:
        val |= (d & 127) << shift
        if not (d & 128):break
        shift += 7
    return val

TCPサーバー

0.0.0.0:25565で建ててみる

svhost = "0.0.0.0"
svport = 25565

server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind((svhost, svport))
server_socket.listen()
print(f"{svhost}:{svport}で接続を受け入れています。")

接続を処理する

同時接続とかもしたいからhandle_client関数で処理するようにする

def handle_client(client, address):

handshake

handshakeはStateが1で「ステータス」、2で「ログイン」ってこの記事に書かれてるからそれで分岐させてみる

    res = b""
    while True:
        r = client.recv(1)
        res += r
        if not r[0] & 128:break
    size = decode_varint(res)
    res = client.recv(size)
    recv = b""
    pos = 0
    for r in res:
        recv += bytes([r])
        pos += 1
        if not r & 128:break
    packet_id = decode_varint(recv)
    if packet_id != 0:
        client.close()
        return
    recv = b""
    for r in res[pos:]:
        recv += bytes([r])
        pos += 1
        if not r & 128:break
    protocol_version = decode_varint(recv)
    recv = b""
    for r in res[pos:]:
        recv += bytes([r])
        pos += 1
        if not r & 128:break
    size = decode_varint(recv)
    addr = res[pos:pos+size].decode("utf-8")
    pos += size
    port = struct.unpack(">H", res[pos:pos+2])[0]
    pos += 2
    recv = b""
    for r in res[pos:]:
        recv += bytes([r])
        pos += 1
        if not r & 128:break
    next_state = decode_varint(recv)
    if next_state != 1 and next_state != 2:
        client.close()
        return

ステータス応答

image.png
このコードをそのまま実行すればこの画像みたいな表示になる
data = json.dumps...の部分を書き換えれば表示も変更できる

    if next_state == 1:
        res = b""
        while True:
            r = client.recv(1)
            res += r
            if not r[0] & 128:break
        size = decode_varint(res)
        res = client.recv(size)
        if res != b"\x00":
            client.close()
            return
        data = json.dumps({"version": {"name": "認証サーバー", "protocol": protocol_version}, "players": {"max": 1, "online": 0}, "description": {"text": "認証サーバー"}}).encode()
        data = b"\x00"+encode_varint(len(data))+data
        client.send(encode_varint(len(data))+data)
        pos = 0
        res = b""
        while True:
            r = client.recv(1)
            res += r
            pos += 1
            if not r[0] & 128:break
        size = decode_varint(res)
        res = client.recv(size)
        if res[0] != 1:
            client.close()
            return
        data = res
        data = encode_varint(len(data))+data
        client.send(data)
        client.close()

ログイン

正直ここでコード発行して切っちゃっても良いけど...
それだと他人のMCIDとUUIDで発行できちゃう可能性があるので認証までやります。

    if next_state == 2:
        res = b""
        while True:
            r = client.recv(1)
            res += r
            if not r[0] & 128:break
        size = decode_varint(res)
        res = client.recv(size)
        pos = 0
        if res[0] != 0:
            client.close()
            return
        pos += 1
        recv = b""
        for r in res[pos:]:
            recv += bytes([r])
            pos += 1
            if not r & 128:break
        size = decode_varint(recv)
        username = res[pos:pos+size].decode("utf-8")

暗号化リクエスト

キーとかサーバーIDとかを生成してクライアントに送るらしい

        key = rsa.generate_private_key(public_exponent=65537, key_size=1024, backend=default_backend())
        public_key = key.public_key().public_bytes(serialization.Encoding.DER, serialization.PublicFormat.SubjectPublicKeyInfo)
        verify_token = os.urandom(4)
        server_id = ("".join(random.choices(string.ascii_lowercase+string.digits,k=10))).encode('ascii')
        data = b"\x01"+encode_varint(len(server_id))+server_id+encode_varint(len(public_key))+public_key+encode_varint(len(verify_token))+verify_token
        data = encode_varint(len(data))+data
        client.send(data)

暗号化応答

暗号化応答ではクライアント側から共有暗号鍵っていうのが送られてくるらしい

        res = b""
        while True:
            r = client.recv(1)
            res += r
            if not r[0] & 128:break
        size = decode_varint(res)
        res = client.recv(size)
        if res[0] != 1:
            client.close()
            return
        pos = 1
        recv = b""
        for r in res[pos:]:
            recv += bytes([r])
            pos += 1
            if not r & 128:break
        size = decode_varint(recv)
        shared_secret = res[pos:pos+size]
        shared_secret = key.decrypt(shared_secret, padding.PKCS1v15())
        pos += size
        recv = b""
        for r in res[pos:]:
            pos += 1
            if len(recv) == 0 and r == 0:continue
            recv += bytes([r])
            if not r & 128:break
        size = decode_varint(recv)
        client_verify_token = res[pos:pos+size]
        client_verify_token = key.decrypt(client_verify_token, padding.PKCS1v15())
        if verify_token != verify_token:
            client.close()
            return
        cipher = ciphers.Cipher(algorithms.AES(shared_secret), modes.CFB8(shared_secret), backend=default_backend())
        encryptor = cipher.encryptor()
        decryptor = cipher.decryptor()

認証

ここだけうまくいかなくてめちゃくちゃ苦戦した。
wiki.vgを見てみたら「Minecraftで使用されるSha1.hexdigest()メゾットは非標準であることに注意してください」ってちゃんと書いてあった

        hash = hashlib.sha1()
        hash.update(server_id)
        hash.update(shared_secret)
        hash.update(public_key)
        hash = int(hash.hexdigest(), 16)
        if hash >> 156 & 8:
            hash = "-"+format(hash*-1 & 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF,"x")
        else:
            hash = format(hash,"x")
        has_joined = requests.get("https://sessionserver.mojang.com/session/minecraft/hasJoined",params={"username":username,"serverId":hash})
        has_joined_response = {}
        try:has_joined_response = has_joined.json()
        except:pass
        if has_joined.status_code != 200 or not "id" in has_joined_response:
            data = json.dumps({"text": "認証できませんでした。", "color": "red"}).encode()
            data = b"\x00"+encode_varint(len(data))+data
            data = encode_varint(len(data))+data
            data = encryptor.update(data)
            client.send(data)
            client.close()
            return

認証コードを発行する

ランダムな数字6桁を適当に発行してみてる
(この部分変えれば任意のものにできる)

        verify_code = "".join(random.choices(string.digits,k=6))
        print(has_joined_response["name"]+" ("+has_joined_response["id"]+") に認証コードを発行しました。")
        data = json.dumps({"text": "§a認証コードを発行しました§r\n§bユーザー名:§r §f"+has_joined_response["name"]+"§r\n§b認証コード:§r §f"+verify_code+"§r"}).encode()
        data = b"\x00"+encode_varint(len(data))+data
        data = encode_varint(len(data))+data
        data = encryptor.update(data)
        client.send(data)
        client.close()

認証コードを保存する

ファイルに保存してもいいしDBに保存してもいいし...
今回はファイルに保存するようにしてみる
(mcauths という名前でディレクトリ作らないと動かないので注意)

        with open("mcauths/"+has_joined_response["id"], "w") as f:
            f.write(json.dumps({"code": verify_code, "expire": time.time()+600}))

接続を受け入れる

サーバーのsocketに対してaccept関数を使うと受け入れができるらしい

while True:
    client, address = server_socket.accept()
    threading.Thread(target=handle_client,args=(client, address,)).start()

全体のコード

import socket #TCP通信とかをするやつ
import json #JSONをパースしてくれるやつ
import struct #バイナリを簡単にするやつ(?)
import threading #同時接続とかをするために使うやつ
from cryptography.hazmat.backends import default_backend #暗号化関係
from cryptography.hazmat.primitives.asymmetric import rsa, padding #暗号化関係
from cryptography.hazmat.primitives.ciphers import algorithms, modes #暗号化関係
from cryptography.hazmat.primitives import ciphers, serialization #暗号化関係
import os #OSの機能を使うためのやつ
import random #ランダム生成に使うやつ
import string #文字とかを使うやつ
import requests #HTTP通信をするためのやつ
import hashlib #ハッシュを生成するためのやつ
import uuid #UUIDをいろいろ扱えるやつ
import time #時間を扱うためのやつ

def encode_varint(num):
    res = b""   
    while num:
        b = num & 127
        num = num >> 7
        if num != 0:
            b |= 128
        res += bytes([b])
    return res

def decode_varint(data):
    val = 0
    shift = 0
    for d in data:
        val |= (d & 127) << shift
        if not (d & 128):break
        shift += 7
    return val

svhost = "0.0.0.0"
svport = 25565

server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind((svhost, svport))
server_socket.listen()
print(f"{svhost}:{svport}で接続を受け入れています。")

def handle_client(client, address):
    res = b""
    while True:
        r = client.recv(1)
        res += r
        if not r[0] & 128:break
    size = decode_varint(res)
    res = client.recv(size)
    recv = b""
    pos = 0
    for r in res:
        recv += bytes([r])
        pos += 1
        if not r & 128:break
    packet_id = decode_varint(recv)
    if packet_id != 0:
        client.close()
        return
    recv = b""
    for r in res[pos:]:
        recv += bytes([r])
        pos += 1
        if not r & 128:break
    protocol_version = decode_varint(recv)
    recv = b""
    for r in res[pos:]:
        recv += bytes([r])
        pos += 1
        if not r & 128:break
    size = decode_varint(recv)
    addr = res[pos:pos+size].decode("utf-8")
    pos += size
    port = struct.unpack(">H", res[pos:pos+2])[0]
    pos += 2
    recv = b""
    for r in res[pos:]:
        recv += bytes([r])
        pos += 1
        if not r & 128:break
    next_state = decode_varint(recv)
    if next_state != 1 and next_state != 2:
        client.close()
        return
    if next_state == 1:
        res = b""
        while True:
            r = client.recv(1)
            res += r
            if not r[0] & 128:break
        size = decode_varint(res)
        res = client.recv(size)
        if res != b"\x00":
            client.close()
            return
        data = json.dumps({"version": {"name": "認証サーバー", "protocol": protocol_version}, "players": {"max": 1, "online": 0}, "description": {"text": "認証サーバー"}}).encode()
        data = b"\x00"+encode_varint(len(data))+data
        client.send(encode_varint(len(data))+data)
        pos = 0
        res = b""
        while True:
            r = client.recv(1)
            res += r
            pos += 1
            if not r[0] & 128:break
        size = decode_varint(res)
        res = client.recv(size)
        if res[0] != 1:
            client.close()
            return
        data = res
        data = encode_varint(len(data))+data
        client.send(data)
        client.close()
    if next_state == 2:
        res = b""
        while True:
            r = client.recv(1)
            res += r
            if not r[0] & 128:break
        size = decode_varint(res)
        res = client.recv(size)
        pos = 0
        if res[0] != 0:
            client.close()
            return
        pos += 1
        recv = b""
        for r in res[pos:]:
            recv += bytes([r])
            pos += 1
            if not r & 128:break
        size = decode_varint(recv)
        username = res[pos:pos+size].decode("utf-8")
        key = rsa.generate_private_key(public_exponent=65537, key_size=1024, backend=default_backend())
        public_key = key.public_key().public_bytes(serialization.Encoding.DER, serialization.PublicFormat.SubjectPublicKeyInfo)
        verify_token = os.urandom(4)
        server_id = ("".join(random.choices(string.ascii_lowercase+string.digits,k=10))).encode('ascii')
        data = b"\x01"+encode_varint(len(server_id))+server_id+encode_varint(len(public_key))+public_key+encode_varint(len(verify_token))+verify_token
        data = encode_varint(len(data))+data
        client.send(data)
        res = b""
        while True:
            r = client.recv(1)
            res += r
            if not r[0] & 128:break
        size = decode_varint(res)
        res = client.recv(size)
        if res[0] != 1:
            client.close()
            return
        pos = 1
        recv = b""
        for r in res[pos:]:
            recv += bytes([r])
            pos += 1
            if not r & 128:break
        size = decode_varint(recv)
        shared_secret = res[pos:pos+size]
        shared_secret = key.decrypt(shared_secret, padding.PKCS1v15())
        pos += size
        recv = b""
        for r in res[pos:]:
            pos += 1
            if len(recv) == 0 and r == 0:continue
            recv += bytes([r])
            if not r & 128:break
        size = decode_varint(recv)
        client_verify_token = res[pos:pos+size]
        client_verify_token = key.decrypt(client_verify_token, padding.PKCS1v15())
        if verify_token != client_verify_token:
            client.close()
            return
        cipher = ciphers.Cipher(algorithms.AES(shared_secret), modes.CFB8(shared_secret), backend=default_backend())
        encryptor = cipher.encryptor()
        decryptor = cipher.decryptor()
        hash = hashlib.sha1()
        hash.update(server_id)
        hash.update(shared_secret)
        hash.update(public_key)
        hash = int(hash.hexdigest(), 16)
        if hash >> 156 & 8:
            hash = "-"+format(hash*-1 & 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF,"x")
        else:
            hash = format(hash,"x")
        has_joined = requests.get("https://sessionserver.mojang.com/session/minecraft/hasJoined",params={"username":username,"serverId":hash})
        has_joined_response = {}
        try:has_joined_response = has_joined.json()
        except:pass
        if has_joined.status_code != 200 or not "id" in has_joined_response:
            data = json.dumps({"text": "認証できませんでした。", "color": "red"}).encode()
            data = b"\x00"+encode_varint(len(data))+data
            data = encode_varint(len(data))+data
            data = encryptor.update(data)
            client.send(data)
            client.close()
            return
        verify_code = "".join(random.choices(string.digits,k=6))
        print(has_joined_response["name"]+" ("+has_joined_response["id"]+") に認証コードを発行しました。")
        data = json.dumps({"text": "§a認証コードを発行しました§r\n§bユーザー名:§r §f"+has_joined_response["name"]+"§r\n§b認証コード:§r §f"+verify_code+"§r"}).encode()
        data = b"\x00"+encode_varint(len(data))+data
        data = encode_varint(len(data))+data
        data = encryptor.update(data)
        client.send(data)
        client.close()
        # 認証コードを保存する処理をここへ
        # ファイルに保存する例
        with open("mcauths/"+has_joined_response["id"], "w") as f:
            f.write(json.dumps({"code": verify_code, "expire": time.time()+600}))

while True:
    client, address = server_socket.accept()
    threading.Thread(target=handle_client,args=(client, address,)).start()

ここまでの進捗

image.png
最終的にこうなった

サイト側の実装例

これはPHPで実装する例だけど...
$mcauths_dir の部分だけさっきのディレクトリのパスに変更すればそのまま使えるはず

注意
サイトで公開される場所にmcauthsディレクトリを置くと認証コードが盗み見られる可能性があります。

<?php
if (isset($_POST["mcid"]) && isset($_POST["code"])){
    $mcauths_dir = "/path/to/mcauths/"; // ディレクトリのパスには最後にスラッシュ("/")をつける
    $mcid = $_POST["mcid"];
    $code = $_POST["code"];
    $uuid = @file_get_contents("https://api.mojang.com/users/profiles/minecraft/".$mcid);
    if ($uuid === false){
        http_response_code(400);
        exit("UUIDの取得に失敗しました。");
    }
    $res = json_decode($uuid, true);
    $uuid = $res["id"];
    $mcid = $res["name"];
    if (!is_file($mcauths_dir.$uuid)){
        http_response_code(400);
        exit("認証情報が存在しません。");
    }
    $auth = json_decode(file_get_contents($mcauths_dir.$uuid), true);
    if ($auth["code"] !== $code){
        http_response_code(400);
        exit("認証に失敗しました。");
    }
    if ($auth["expire"] < time()){
        http_response_code(400);
        unlink($mcauths_dir.$uuid);
        exit("認証コードの有効期限が切れています。");
    }
    // 認証成功
    unlink($mcauths_dir.$uuid);
    exit("認証に成功しました。<br>UUID: ".$uuid."<br>NAME: ".$mcid);
}
?>
<form method="POST">
    MCID: <input name="mcid"><br>
    CODE: <input name="code"><br>
    <input type="submit">
</form>

動作

mcauth_demo.gif

参考

28
29
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
28
29

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?