はじめに
接続したときに「あなたの認証コードは〇〇です」みたいなの返すやつ作ってみたい
前の記事書いてて「これサーバー側も作れそう」ってなったからやってみる
追記
GitHubにレポジトリを作成しました。
Pull RequestやIssueなどお待ちしています。
https://github.com/n-mache/pymcauthserver
Quarry
Quarryってモジュールを見つけたけど、どうやら更新が止まってて1.20.4では機能しないらしい...
1から実装するしか無さそう?
実際にやってみる
認証コード返すだけのサーバーだし、ステータスとログイン処理さえあればいいはず
モジュール
cryptography
と requests
だけ標準で入ってないから pip install
で入れないと動かない
import socket #TCP通信とかをするやつ
import json #JSONをパースしてくれるやつ
import struct #バイナリを簡単にするやつ(?)
import threading #同時接続とかをするために使うやつ
from cryptography.hazmat.backends import default_backend #暗号化関係
from cryptography.hazmat.primitives.asymmetric import rsa, padding #暗号化関係
from cryptography.hazmat.primitives.ciphers import algorithms, modes #暗号化関係
from cryptography.hazmat.primitives import ciphers, serialization #暗号化関係
import os #OSの機能を使うためのやつ
import random #ランダム生成に使うやつ
import string #文字とかを使うやつ
import requests #HTTP通信をするためのやつ
import hashlib #ハッシュを生成するためのやつ
import uuid #UUIDをいろいろ扱えるやつ
import time #時間を扱うためのやつ
VarInt
前の記事のやつをそのまま使う
def encode_varint(num):
res = b""
while num:
b = num & 127
num = num >> 7
if num != 0:
b |= 128
res += bytes([b])
return res
def decode_varint(data):
val = 0
shift = 0
for d in data:
val |= (d & 127) << shift
if not (d & 128):break
shift += 7
return val
TCPサーバー
0.0.0.0:25565で建ててみる
svhost = "0.0.0.0"
svport = 25565
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind((svhost, svport))
server_socket.listen()
print(f"{svhost}:{svport}で接続を受け入れています。")
接続を処理する
同時接続とかもしたいからhandle_client関数で処理するようにする
def handle_client(client, address):
handshake
handshakeはStateが1で「ステータス」、2で「ログイン」ってこの記事に書かれてるからそれで分岐させてみる
res = b""
while True:
r = client.recv(1)
res += r
if not r[0] & 128:break
size = decode_varint(res)
res = client.recv(size)
recv = b""
pos = 0
for r in res:
recv += bytes([r])
pos += 1
if not r & 128:break
packet_id = decode_varint(recv)
if packet_id != 0:
client.close()
return
recv = b""
for r in res[pos:]:
recv += bytes([r])
pos += 1
if not r & 128:break
protocol_version = decode_varint(recv)
recv = b""
for r in res[pos:]:
recv += bytes([r])
pos += 1
if not r & 128:break
size = decode_varint(recv)
addr = res[pos:pos+size].decode("utf-8")
pos += size
port = struct.unpack(">H", res[pos:pos+2])[0]
pos += 2
recv = b""
for r in res[pos:]:
recv += bytes([r])
pos += 1
if not r & 128:break
next_state = decode_varint(recv)
if next_state != 1 and next_state != 2:
client.close()
return
ステータス応答
このコードをそのまま実行すればこの画像みたいな表示になる
data = json.dumps...
の部分を書き換えれば表示も変更できる
if next_state == 1:
res = b""
while True:
r = client.recv(1)
res += r
if not r[0] & 128:break
size = decode_varint(res)
res = client.recv(size)
if res != b"\x00":
client.close()
return
data = json.dumps({"version": {"name": "認証サーバー", "protocol": protocol_version}, "players": {"max": 1, "online": 0}, "description": {"text": "認証サーバー"}}).encode()
data = b"\x00"+encode_varint(len(data))+data
client.send(encode_varint(len(data))+data)
pos = 0
res = b""
while True:
r = client.recv(1)
res += r
pos += 1
if not r[0] & 128:break
size = decode_varint(res)
res = client.recv(size)
if res[0] != 1:
client.close()
return
data = res
data = encode_varint(len(data))+data
client.send(data)
client.close()
ログイン
正直ここでコード発行して切っちゃっても良いけど...
それだと他人のMCIDとUUIDで発行できちゃう可能性があるので認証までやります。
if next_state == 2:
res = b""
while True:
r = client.recv(1)
res += r
if not r[0] & 128:break
size = decode_varint(res)
res = client.recv(size)
pos = 0
if res[0] != 0:
client.close()
return
pos += 1
recv = b""
for r in res[pos:]:
recv += bytes([r])
pos += 1
if not r & 128:break
size = decode_varint(recv)
username = res[pos:pos+size].decode("utf-8")
暗号化リクエスト
キーとかサーバーIDとかを生成してクライアントに送るらしい
key = rsa.generate_private_key(public_exponent=65537, key_size=1024, backend=default_backend())
public_key = key.public_key().public_bytes(serialization.Encoding.DER, serialization.PublicFormat.SubjectPublicKeyInfo)
verify_token = os.urandom(4)
server_id = ("".join(random.choices(string.ascii_lowercase+string.digits,k=10))).encode('ascii')
data = b"\x01"+encode_varint(len(server_id))+server_id+encode_varint(len(public_key))+public_key+encode_varint(len(verify_token))+verify_token
data = encode_varint(len(data))+data
client.send(data)
暗号化応答
暗号化応答ではクライアント側から共有暗号鍵っていうのが送られてくるらしい
res = b""
while True:
r = client.recv(1)
res += r
if not r[0] & 128:break
size = decode_varint(res)
res = client.recv(size)
if res[0] != 1:
client.close()
return
pos = 1
recv = b""
for r in res[pos:]:
recv += bytes([r])
pos += 1
if not r & 128:break
size = decode_varint(recv)
shared_secret = res[pos:pos+size]
shared_secret = key.decrypt(shared_secret, padding.PKCS1v15())
pos += size
recv = b""
for r in res[pos:]:
pos += 1
if len(recv) == 0 and r == 0:continue
recv += bytes([r])
if not r & 128:break
size = decode_varint(recv)
client_verify_token = res[pos:pos+size]
client_verify_token = key.decrypt(client_verify_token, padding.PKCS1v15())
if verify_token != verify_token:
client.close()
return
cipher = ciphers.Cipher(algorithms.AES(shared_secret), modes.CFB8(shared_secret), backend=default_backend())
encryptor = cipher.encryptor()
decryptor = cipher.decryptor()
認証
ここだけうまくいかなくてめちゃくちゃ苦戦した。
wiki.vgを見てみたら「Minecraftで使用されるSha1.hexdigest()メゾットは非標準であることに注意してください」ってちゃんと書いてあった
hash = hashlib.sha1()
hash.update(server_id)
hash.update(shared_secret)
hash.update(public_key)
hash = int(hash.hexdigest(), 16)
if hash >> 156 & 8:
hash = "-"+format(hash*-1 & 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF,"x")
else:
hash = format(hash,"x")
has_joined = requests.get("https://sessionserver.mojang.com/session/minecraft/hasJoined",params={"username":username,"serverId":hash})
has_joined_response = {}
try:has_joined_response = has_joined.json()
except:pass
if has_joined.status_code != 200 or not "id" in has_joined_response:
data = json.dumps({"text": "認証できませんでした。", "color": "red"}).encode()
data = b"\x00"+encode_varint(len(data))+data
data = encode_varint(len(data))+data
data = encryptor.update(data)
client.send(data)
client.close()
return
認証コードを発行する
ランダムな数字6桁を適当に発行してみてる
(この部分変えれば任意のものにできる)
verify_code = "".join(random.choices(string.digits,k=6))
print(has_joined_response["name"]+" ("+has_joined_response["id"]+") に認証コードを発行しました。")
data = json.dumps({"text": "§a認証コードを発行しました§r\n§bユーザー名:§r §f"+has_joined_response["name"]+"§r\n§b認証コード:§r §f"+verify_code+"§r"}).encode()
data = b"\x00"+encode_varint(len(data))+data
data = encode_varint(len(data))+data
data = encryptor.update(data)
client.send(data)
client.close()
認証コードを保存する
ファイルに保存してもいいしDBに保存してもいいし...
今回はファイルに保存するようにしてみる
(mcauths
という名前でディレクトリ作らないと動かないので注意)
with open("mcauths/"+has_joined_response["id"], "w") as f:
f.write(json.dumps({"code": verify_code, "expire": time.time()+600}))
接続を受け入れる
サーバーのsocketに対してaccept関数を使うと受け入れができるらしい
while True:
client, address = server_socket.accept()
threading.Thread(target=handle_client,args=(client, address,)).start()
全体のコード
import socket #TCP通信とかをするやつ
import json #JSONをパースしてくれるやつ
import struct #バイナリを簡単にするやつ(?)
import threading #同時接続とかをするために使うやつ
from cryptography.hazmat.backends import default_backend #暗号化関係
from cryptography.hazmat.primitives.asymmetric import rsa, padding #暗号化関係
from cryptography.hazmat.primitives.ciphers import algorithms, modes #暗号化関係
from cryptography.hazmat.primitives import ciphers, serialization #暗号化関係
import os #OSの機能を使うためのやつ
import random #ランダム生成に使うやつ
import string #文字とかを使うやつ
import requests #HTTP通信をするためのやつ
import hashlib #ハッシュを生成するためのやつ
import uuid #UUIDをいろいろ扱えるやつ
import time #時間を扱うためのやつ
def encode_varint(num):
res = b""
while num:
b = num & 127
num = num >> 7
if num != 0:
b |= 128
res += bytes([b])
return res
def decode_varint(data):
val = 0
shift = 0
for d in data:
val |= (d & 127) << shift
if not (d & 128):break
shift += 7
return val
svhost = "0.0.0.0"
svport = 25565
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind((svhost, svport))
server_socket.listen()
print(f"{svhost}:{svport}で接続を受け入れています。")
def handle_client(client, address):
res = b""
while True:
r = client.recv(1)
res += r
if not r[0] & 128:break
size = decode_varint(res)
res = client.recv(size)
recv = b""
pos = 0
for r in res:
recv += bytes([r])
pos += 1
if not r & 128:break
packet_id = decode_varint(recv)
if packet_id != 0:
client.close()
return
recv = b""
for r in res[pos:]:
recv += bytes([r])
pos += 1
if not r & 128:break
protocol_version = decode_varint(recv)
recv = b""
for r in res[pos:]:
recv += bytes([r])
pos += 1
if not r & 128:break
size = decode_varint(recv)
addr = res[pos:pos+size].decode("utf-8")
pos += size
port = struct.unpack(">H", res[pos:pos+2])[0]
pos += 2
recv = b""
for r in res[pos:]:
recv += bytes([r])
pos += 1
if not r & 128:break
next_state = decode_varint(recv)
if next_state != 1 and next_state != 2:
client.close()
return
if next_state == 1:
res = b""
while True:
r = client.recv(1)
res += r
if not r[0] & 128:break
size = decode_varint(res)
res = client.recv(size)
if res != b"\x00":
client.close()
return
data = json.dumps({"version": {"name": "認証サーバー", "protocol": protocol_version}, "players": {"max": 1, "online": 0}, "description": {"text": "認証サーバー"}}).encode()
data = b"\x00"+encode_varint(len(data))+data
client.send(encode_varint(len(data))+data)
pos = 0
res = b""
while True:
r = client.recv(1)
res += r
pos += 1
if not r[0] & 128:break
size = decode_varint(res)
res = client.recv(size)
if res[0] != 1:
client.close()
return
data = res
data = encode_varint(len(data))+data
client.send(data)
client.close()
if next_state == 2:
res = b""
while True:
r = client.recv(1)
res += r
if not r[0] & 128:break
size = decode_varint(res)
res = client.recv(size)
pos = 0
if res[0] != 0:
client.close()
return
pos += 1
recv = b""
for r in res[pos:]:
recv += bytes([r])
pos += 1
if not r & 128:break
size = decode_varint(recv)
username = res[pos:pos+size].decode("utf-8")
key = rsa.generate_private_key(public_exponent=65537, key_size=1024, backend=default_backend())
public_key = key.public_key().public_bytes(serialization.Encoding.DER, serialization.PublicFormat.SubjectPublicKeyInfo)
verify_token = os.urandom(4)
server_id = ("".join(random.choices(string.ascii_lowercase+string.digits,k=10))).encode('ascii')
data = b"\x01"+encode_varint(len(server_id))+server_id+encode_varint(len(public_key))+public_key+encode_varint(len(verify_token))+verify_token
data = encode_varint(len(data))+data
client.send(data)
res = b""
while True:
r = client.recv(1)
res += r
if not r[0] & 128:break
size = decode_varint(res)
res = client.recv(size)
if res[0] != 1:
client.close()
return
pos = 1
recv = b""
for r in res[pos:]:
recv += bytes([r])
pos += 1
if not r & 128:break
size = decode_varint(recv)
shared_secret = res[pos:pos+size]
shared_secret = key.decrypt(shared_secret, padding.PKCS1v15())
pos += size
recv = b""
for r in res[pos:]:
pos += 1
if len(recv) == 0 and r == 0:continue
recv += bytes([r])
if not r & 128:break
size = decode_varint(recv)
client_verify_token = res[pos:pos+size]
client_verify_token = key.decrypt(client_verify_token, padding.PKCS1v15())
if verify_token != client_verify_token:
client.close()
return
cipher = ciphers.Cipher(algorithms.AES(shared_secret), modes.CFB8(shared_secret), backend=default_backend())
encryptor = cipher.encryptor()
decryptor = cipher.decryptor()
hash = hashlib.sha1()
hash.update(server_id)
hash.update(shared_secret)
hash.update(public_key)
hash = int(hash.hexdigest(), 16)
if hash >> 156 & 8:
hash = "-"+format(hash*-1 & 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF,"x")
else:
hash = format(hash,"x")
has_joined = requests.get("https://sessionserver.mojang.com/session/minecraft/hasJoined",params={"username":username,"serverId":hash})
has_joined_response = {}
try:has_joined_response = has_joined.json()
except:pass
if has_joined.status_code != 200 or not "id" in has_joined_response:
data = json.dumps({"text": "認証できませんでした。", "color": "red"}).encode()
data = b"\x00"+encode_varint(len(data))+data
data = encode_varint(len(data))+data
data = encryptor.update(data)
client.send(data)
client.close()
return
verify_code = "".join(random.choices(string.digits,k=6))
print(has_joined_response["name"]+" ("+has_joined_response["id"]+") に認証コードを発行しました。")
data = json.dumps({"text": "§a認証コードを発行しました§r\n§bユーザー名:§r §f"+has_joined_response["name"]+"§r\n§b認証コード:§r §f"+verify_code+"§r"}).encode()
data = b"\x00"+encode_varint(len(data))+data
data = encode_varint(len(data))+data
data = encryptor.update(data)
client.send(data)
client.close()
# 認証コードを保存する処理をここへ
# ファイルに保存する例
with open("mcauths/"+has_joined_response["id"], "w") as f:
f.write(json.dumps({"code": verify_code, "expire": time.time()+600}))
while True:
client, address = server_socket.accept()
threading.Thread(target=handle_client,args=(client, address,)).start()
ここまでの進捗
サイト側の実装例
これはPHPで実装する例だけど...
$mcauths_dir
の部分だけさっきのディレクトリのパスに変更すればそのまま使えるはず
注意
サイトで公開される場所にmcauthsディレクトリを置くと認証コードが盗み見られる可能性があります。
<?php
if (isset($_POST["mcid"]) && isset($_POST["code"])){
$mcauths_dir = "/path/to/mcauths/"; // ディレクトリのパスには最後にスラッシュ("/")をつける
$mcid = $_POST["mcid"];
$code = $_POST["code"];
$uuid = @file_get_contents("https://api.mojang.com/users/profiles/minecraft/".$mcid);
if ($uuid === false){
http_response_code(400);
exit("UUIDの取得に失敗しました。");
}
$res = json_decode($uuid, true);
$uuid = $res["id"];
$mcid = $res["name"];
if (!is_file($mcauths_dir.$uuid)){
http_response_code(400);
exit("認証情報が存在しません。");
}
$auth = json_decode(file_get_contents($mcauths_dir.$uuid), true);
if ($auth["code"] !== $code){
http_response_code(400);
exit("認証に失敗しました。");
}
if ($auth["expire"] < time()){
http_response_code(400);
unlink($mcauths_dir.$uuid);
exit("認証コードの有効期限が切れています。");
}
// 認証成功
unlink($mcauths_dir.$uuid);
exit("認証に成功しました。<br>UUID: ".$uuid."<br>NAME: ".$mcid);
}
?>
<form method="POST">
MCID: <input name="mcid"><br>
CODE: <input name="code"><br>
<input type="submit">
</form>