はじめに
こんにちは!N8のN高生のchikachanです!
わし普段Qiita使わないんですけどN高のOrganizationも入りたいし今回はQiitaで書いてみました。
今回このマイクラ鯖のステータスだけを返す鯖というかアプリを作ろうと思った理由なんですけど今作ってるN高生限定のマイクラ鯖をDDOS対策で国外からのアクセスを禁止する予定なんですけどなんかそのまま通信遮断するのもかわいそうだなぁって思って作ってみました。
あとメンテナンス中とかの表示に便利そうだし、てかそっちの方が需要ありそう。
まずはそもそもの仕組みを調べてみる。
なかなか情報がないけど調べてたら頭おかしい(褒め言葉)Wikiを見つけた。
このWikiにサーバー側からいろんな情報をとる仕組みが書いてあったから順番に書いていく!
ハンドシェイク
まずはWiki通りに通信を始めるためにハンドシェイクの部分を書いていくんだけどデバックのしやすさも考えてまずはクライアントを作っていく。
まずはパケットID 0x00番にプロトコルのバージョンをVarInt形式で、サーバーのアドレスをStringでサーバーのポートをUnsigned Shortで次にどの通信をするかをVarIntで送ります!
わかりやすくするために一つづつ説明しつつ書いていく。
まず今回使うライブラリをインポートしつつ使う情報を変数に入れる。
import socket
import struct
import json
address = "サーバーのアドレス"
port = 25565
次にはパケットID 0x00から始まるBytearrayを作る。
handshake = bytearray([0x00])
次はプロトコルのバージョンを入れるんだけどここで初めて聴くVarIntなるものが現れた。
ってことでVarIntでパックする関数を書く。
VarIntって数字を効率的に送るための形式で先頭の1bitで次のバイトまで続くかを指定して残りの7bitで数字を入れる。ちなリトルエンディアンなのだけ注意
たとえば200をVarIntでラップしたい時
まずは200を2進数にする
200 = 0b11001000
そして7bitごとに区切る
1 1001000
リトルエンディアンだから逆にする。
1001000 1
そして今回は2byteにまたがってるから1byte目の1bitを1にする。
11001000 1
後はbyteにするために0で埋める
11001000 00000001
これで完成後はコード化
def pack_varint(data: int) -> bytes:
o = b''
while True:
byte = data & 0x7F
data >>= 7
o += struct.pack('B', byte | (0x80 if data > 0 else 0))
if data == 0:
break
return o
この関数使ってバージョンをパックしてhadshakeのbytearryに追加
バージョンに対応した数字はWiki参照
handshake += pack_varint(764)
次はアドレスを送りたいんやけどそのまま送ってもどこまでがアドレスかわからへんからまずながさを送る。
handshake += pack_varint(len(address))
そしてアドレスを追加する。
handshake += bytearray(address, "utf8")
次はポートこれはUnsigned Shortで書く。
handshake += struct.pack(">H", port)
次はNext stateって欄なんだけど1ならステータスを取得するだけ、2ならこの後ログインするよってこと。
今回は1を追加する。
handshake += bytearry([0x01])
後は全体の長さを一番最初につけてパケット完成!!
handshake = pack_varint(len(handshake)) + handshake
ソケットAPI使ってどっかの鯖に送りつけよう!!
ちな変なパケット送りつけて怒られても僕は責任取りません
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.send(handshake)
ステータスリクエスト
ハンドシェイクの時に次はステータスを取得しますっていっちゃたんで取得するためのパケットを送りつけましょう!
めっちゃ簡単でパケットID 0x00 になんも指定しないで送りつけるだけです。
sock.send(bytearray([0x01,0x00]))
これでやっと鯖はステータスの情報を送ってくれます。なんで読んでいきましょう。
まずはパケット全体の長さを読むけど次はVarIntをアンパックしないといけないから関数どーん
def unpack_varint_socket(sock: socket.socket) -> int:
data = 0
for i in range(5):
o = sock.recv(1)
if len(o) == 0:
break
byte = ord(o)
data |= (byte & 0x7F) << 7 * i
if not byte & 0x80:
break
return data
特殊なとこは必要な分だけ読むからsocketごと与えてるところと連番が欲しいからforを使ってるところぐらい
この関数使って全体の長さを読む。
packet_len = unpack_varint_socket(sock)
print(f"パケットの長さ:{packet_len}")
次はパケットIDを読む。
packet_id = unpack_varint_socket(sock)
print(f"パケットID:{packet_id}")
次はコンテンツの長さを読む。
content_len = unpack_varint_socket(sock)
print(f"コンテンツの長さ:{content_len}")
次はステータス本体を読んでいくけど長いからすぐ読んでもまだ全てのデータを受信しきれない可能性があるから全部読み切るまで読み続ける関数を作っとく。
def read_long_data(sock: socket.socket, size:int ) -> bytearray:
data = bytearray()
while len(data) < size:
data += bytearray(sock.recv(size - len(data)))
return data
この関数使ってステータスの本文を読む。ちなjsonだからめっちゃみやすい
payload_raw = read_long_data(sock, content_len)
payload = json.loads(payload_raw.decode('utf8'))
print(payload)
帰ってくるJsonの形はこんな感じ
{
"version": {
"name": "1.19.4",
"protocol": 762
},
"players": {
"max": 100,
"online": 5,
"sample": [
{
"name": "chikachan0522",
"id": "b662166f-f32b-47f1-876f-69d72648b070"
}
]
},
"description": {
"text": "Hello world"
},
"favicon": "data:image/png;base64,<data>",
"enforcesSecureChat": true,
"previewsChat": true
}
ステータスだけを返すサーバーを作ってみる
こっからやっと本題
まずはソケットAPIでソケット作ってbindしてlistenする。
ちなsocket.AF_INET
がIPv4って意味でsocket.SOCK_STREAM
がTCPって意味
接続があったらすぐacceptしてhandle_clientを呼び出す。
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind((host, port))
server_socket.listen()
print(f"Listening on {host}:{port}")
try:
while True:
sock, addr = server_socket.accept()
handle_client(sock, addr)
except KeyboardInterrupt:
pass
finally:
server_socket.close()
あとはクライアントの真逆をするだけ
こっちも一個づつ読んでもいいけどゆうて必要なものはNext stateだけだからめっちゃ省略する。
def handle_client(sock: socket.socket, addr: str):
packet_len = unpack_varint_socket(sock)
data = read_long_data(sock, packet_len)
if data[-1] == 1:
status(sock)
elif data[-1] == 2:
login(sock)
あとはstatusの時とloginの時の処理書いていくだけ。
先にJsonを返す関数だけ書いちゃう
def send_json(sock: socket.socket, json_data: dict):
response = json.dumps(json_data).encode('utf8')
response = pack_varint(len(response)) + response
response = pack_varint(0x00) + response
response = pack_varint(len(response)) + response
sock.send(response)
レスポンスは結構簡単でさっき帰ってきたフォーマット通りに返すだけ
def status(sock: socket.socket):
status = {
"version": {
"name": "ばーじょんのなまえ",
"protocol": 47
},
"players": {
"max": 200,
"online": 100
},
"description": {
"text": "せつめい"
}
}
send_json(sock, status)
こんなこんじ
何気に普通に書いてたけどloginの方はJsonでtextを返すだけで表示してくれる
def login(sock: socket.socket):
login = {
"text": "Hello, world!"
}
send_json(sock, login)
こんな感じで表示される。
完成したやつ
import socket
import json
import struct
def pack_varint(data: int) -> bytes:
o = b''
while True:
byte = data & 0x7F
data >>= 7
o += struct.pack('B', byte | (0x80 if data > 0 else 0))
if data == 0:
break
return o
def unpack_varint_socket(sock: socket.socket) -> int:
data = 0
for i in range(5):
o = sock.recv(1)
if len(o) == 0:
break
byte = ord(o)
data |= (byte & 0x7F) << 7 * i
if not byte & 0x80:
break
return data
def send_json(sock: socket.socket, json_data: dict):
response = json.dumps(json_data).encode('utf8')
response = pack_varint(len(response)) + response
response = pack_varint(0x00) + response
response = pack_varint(len(response)) + response
sock.send(response)
def read_long_data(sock: socket.socket, size:int ) -> bytearray:
data = bytearray()
while len(data) < size:
data += bytearray(sock.recv(size - len(data)))
return data
def handle_client(sock: socket.socket, addr: str):
packet_len = unpack_varint_socket(sock)
data = read_long_data(sock, packet_len)
if data[-1] == 1:
status(sock)
elif data[-1] == 2:
login(sock)
def status(sock: socket.socket):
status = {
"version": {
"name": "ばーじょんのなまえ",
"protocol": 47
},
"players": {
"max": 0,
"online": 0
},
"description": {
"text": "せつめい"
}
}
send_json(sock, status)
def login(sock: socket.socket):
login = {
"text": "Hello, world!"
}
send_json(sock, login)
if __name__ =="__main__":
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.bind(('127.0.0.1', 25565))
server_sock.listen()
try:
while True:
sock, addr = server_sock.accept()
handle_client(sock, addr)
except KeyboardInterrupt:
pass
finally:
server_sock.close()
最後に
レポートで時間なくて最後の方めっちゃ適当になったけどとりあえず動いてくれてよかった。
ほんとはこの後にGoで書き直してバイナリ化してdockerイメージで公開するとこまでいきたかった。欲を言えば統合版対応までさせる予定だったけど全然できなかった。時間があったら追記で対応させたいなぁ...
てかクライアントはまぁまぁ時間かけたのに結局サーバー側ハンドシェイク全く読まなかったのおもろい
仕組み知れて面白かったけど
今回参考にしたWikiにPingとか認証の方まで載ってたからやる気あったら実際にワールドに入るところまで書いてみたいなぁ
来年度はちゃんと余裕持ってレポート終わらせよ
一応Github載せといた