SteamSDK
低レイヤーからプログラムを書くのは大変ですので、Steamの機能を使用します。
steamにはサーバー(ここでいうLobby)を作成して公開する機能があります。
またこれなら相手のIPアドレスやPort番号も知らなくてよいかつ、何気に難しいポートフォワーディングも必要としません。
さらにはSteamでフレンドであるだけで簡単にマッチングできます
Pygameの前に
まずは input() を使ってメッセージを送る仕組みを作りましょう。
仕組みは以下のようになります。
-
クライアントがサーバーにメッセージを送る(input()で入力)
-
サーバーが受信する
-
サーバーが全クライアントにブロードキャストする
-
クライアントがメッセージを受信する
送信時には相手の SteamID を使用します。
やりたいことはこんな感じです。
それと同じ実行フォルダ内にsteam_appid.txtを作成し
480と入力してください、
これはSteamに標準で存在しているSpaceWarというゲームのゲームIDです
テスト用のゲームということです。
ラッパ
前に書いた記事を試してもらえれば、とりあえずPythonでSteamSDKが使えるようになります
PythonでDLLファイルを使用するときはこんな感じでかきます。
# DLL をロード
dll_path = os.path.abspath("./SteamNetworkingWrapper.dll")
steam_dll = ctypes.CDLL(dll_path)
send_p2p_message = steam_dll.SendP2PMessage
send_p2p_message.argtypes = [ctypes.c_uint64, ctypes.c_char_p]
send_p2p_message.restype = ctypes.c_bool
receive_p2p_message = steam_dll.ReceiveP2PMessage
receive_p2p_message.argtypes = [ctypes.c_char_p, ctypes.c_int, ctypes.POINTER(ctypes.c_uint64)]
receive_p2p_message.restype = ctypes.c_bool
流れ的にはこんな記述を行いますが、機能をクラスで分けるのが一般的です。
また私はこれと併用でSteamworksPyを使用しています。
自前でDLLファイルを作ってしまうのもよいのですが、SteamWorksPyなら、P2P通信以外は全部備わっているので、
P2P以外はSteamWorksPyを使用するのが楽です。
まずはSteamWorksを初期化しましょう。
from steamworks import STEAMWORKS
import SteamNetworking as sn
# Steam API 初期化
steamworks = STEAMWORKS()
steamworks.initialize()
if sn.initialize_steam():
print("✅ Steam API 初期化成功")
# 自分の Steam ID を取得
server_id = sn.get_steam_id()
print(f"🎮 サーバーの Steam ID: {server_id}")
SteamNetworkingはDllファイルを読み込んだメソッドが含まれているファイルです。
クラス宣言せず直接書いてあるのでsnからいきなりメソッド名になっています。
ですが本来はform steam_networking import SteamNetworkとかいて
network = SteamNetwork()とインスタンスを作成するのがおすすめです。
(これは一般的なプログラミングでの話ですので、あまり気にしなくてよいです)
次にロビーを作成します
# **ロビーを作成**
LOBBY_TYPE = 1 # 1 = フレンドのみ, 2 = 公開, 3 = 非公開
MAX_PLAYERS = 4
lobby_id = sn.create_lobby(LOBBY_TYPE, MAX_PLAYERS)
sn.set_lobby_rich_presence(lobby_id) # ロビー情報を通知
if lobby_id == 0:
print("❌ ロビーの作成に失敗しました")
exit()
print(f"🏠 ロビーを作成しました! ロビー ID: {lobby_id}")
lobbyを作成しました。またlobby作成者もロビーに参加していますので(Host)
作成者のrichPresenceにlobby情報を載せておきます。
これはSteamSDKにフレンドが参加しているロビーIDを取得するメソッドが存在しないためです。
# PING の送信ループ
def send_ping():
while True:
for player_id in get_clients():
if player_id and player_id != server_id:
ping_message = json.dumps({"message": "PING"})
sn.send_p2p_message(player_id, ping_message.encode())
time.sleep(1) # **2秒ごとに PING を送信**
# **P2P セッションを確立**
def accept_p2p_sessions():
while True:
for player_id in get_clients():
if player_id and player_id != server_id:
sn.accept_p2p_session(player_id)
time.sleep(0.1)
これは必要でないのですが、pingを定期的に送信して、通信状態を検知します。
clientは一定時間Pingが届かなかった場合、接続が切れたと検知します。
またセッションはLobbyと接続された瞬間に確率しますので必要ありませんが
明確に記述を行います。
import threading
session_thread = threading.Thread(target=accept_p2p_sessions, daemon=True)
session_thread.start()
# **PING 送信を並列実行**
ping_thread = threading.Thread(target=send_ping, daemon=True)
ping_thread.start()
並列処理です。updateに記述するのが面倒だったからです
(Update = While内)
# メッセージ受信ループ
while True:
buffer = ctypes.create_string_buffer(512)
sender_steam_id = ctypes.c_uint64()
# メッセージ受信
if sn.receive_p2p_message(buffer, 512, ctypes.byref(sender_steam_id)):
data = json.loads(buffer.value.decode())
# **PING は無視**
if data["message"] == "PING":
continue
print(f"📩 {sender_steam_id.value} から受信: {data}")
# クライアント全員にメッセージをブロードキャスト
for player_id in get_clients():
if player_id and player_id != server_id:
sn.send_p2p_message(player_id, json.dumps(data).encode())
joined_steam_id = ctypes.c_uint64()
joined_lobby_id = ctypes.c_uint64()
# **ロビー参加チェック**
if sn.check_lobby_join(ctypes.byref(joined_steam_id), ctypes.byref(joined_lobby_id)):
name_buffer = ctypes.create_string_buffer(128)
sn.get_steam_name(joined_steam_id.value, name_buffer, 128)
print(f"🎉 {name_buffer.value.decode()} がロビー {joined_lobby_id.value} に参加しました!")
# **ロビー退室チェック**
left_steam_id = ctypes.c_uint64()
left_lobby_id = ctypes.c_uint64()
if sn.check_lobby_leave(ctypes.byref(left_steam_id), ctypes.byref(left_lobby_id)):
name_buffer = ctypes.create_string_buffer(128)
sn.get_steam_name(left_steam_id.value, name_buffer, 128)
print(f"🚪 {name_buffer.value.decode()} がロビー {left_lobby_id.value} から退出しました!")
time.sleep(0.05)
メッセージを送信受信、またlobbyの参加と退出を検知します。
UnityのMirrorなどでは、この時にプレイヤーにNetIDを付与する処理をします。
これによりコントローラーの入力をそのプレイヤーのオブジェクトのみに適応することができるからです。
ゲームを作っていくとなると、ここにCmdを入力して、Json形式で変数を渡すのがよいと思います
例えばCmd:Transform, x: 100, y:100みたいな感じでサーバーが全員にブロードキャストします。
clientはその座標をもとに、描写位置を調整します
クライアント予想をしている場合は、その位置を絶対値にして、予想した位置となめらかに遷移させる処理を入れるでしょう。
ロールバックするゲームなどは、これが原因です。
予想した位置よりサーバーが処理した位置のほうが後ろだった場合などに起こります。
TransformをUnityと同様にコンポーネントとして定義し、NetworkGameObjectクラスにリストとして存在させれば、自動で同期されるTransformがかけると思います。
正確にはNetTransformコンポーネントを作成して一緒に付与させることになると思います。
これは全体的な設計の話です。ですがゲーム開発はほぼこの全体設計が80%を締めます
クライアント
クライアント側ではどのようなプログラムになるでしょうか
# **ロビーに参加**
lobbies = sn.get_friend_lobbies_richpresence()
if len(lobbies) > 0:
lobby_id = lobbies[0]
sn.join_lobby(lobby_id)
sn.set_lobby_rich_presence(lobby_id) # ロビー情報を通知
print(f"✅ ロビー {lobby_id} に参加しました!")
else:
print("❌ 参加できるロビーがありません")
exit()
lobbyの参加処理です
イニシャライズの処理はサーバーと同じなので飛ばしました。
フレンドが参加しているlobbyIDを取得してLobbyに参加した後、
RichPresenceにLobbyIDを付与しただけです。
# **サーバーの Steam ID を取得(ロビーのオーナー)**
host_id = sn.get_lobby_owner(lobby_id)
print(f"🎮 ロビーのホスト: {host_id}")
# **P2P セッションを確立**
sn.accept_p2p_session(host_id)
# **接続状態の監視**
connected = False
last_ping_time = time.time()
print("⏳ サーバーからの PING を待機中...")
while not connected:
buffer = ctypes.create_string_buffer(512)
sender_steam_id = ctypes.c_uint64()
if sn.receive_p2p_message(buffer, 512, ctypes.byref(sender_steam_id)):
data = json.loads(buffer.value.decode())
if data["message"] == "PING":
last_ping_time = time.time()
connected = True
print("✅ サーバーとの接続が確立しました!")
if time.time() - last_ping_time > 5:
print("❌ サーバーが応答しません。接続失敗。")
exit()
time.sleep(0.1)
先ほどLobbyの処理を書いた時、Pingを定期的に送信していましたね。
つまり接続が完了した状態を検知するなら、サーバーからPingが初めて届いたことを検知すればよいです。
またこの時セッションを確立します。サーバー側が定期的に確立する意味がない理由はここにあります。
ですが届かない場合が私の時はありましたのであのような処理になっています。
# **メッセージ送信&受信ループ**
while True:
message = input("メッセージ: ")
data = int(input("付属数値: "))
if message.lower() == "exit":
break
# **JSON 形式で送信**
json_message = json.dumps({"message": message, "data": data})
sn.send_p2p_message(host_id, json_message.encode())
print(f"📨 {host_id} に送信: {json_message}")
# 受信ループ
buffer = ctypes.create_string_buffer(512)
sender_steam_id = ctypes.c_uint64()
while sn.receive_p2p_message(buffer, 512, ctypes.byref(sender_steam_id)):
data = json.loads(buffer.value.decode())
if data["message"] == "PING":
last_ping_time = time.time()
continue
print(f"📩 {sender_steam_id.value} から受信: {data}")
# **一定時間 PING を受け取らなかったら切断と判定**
if time.time() - last_ping_time > 5:
print("❌ サーバーが落ちました!")
break
time.sleep(0.1)
受信と送信を行います。
Json形式で送ったときのメリットを書くためにInputが二つあります。
messageをコマンドとして
message: Health, data:80みたいに書けば、サーバーとclientで体力を同期させることができます。
また一定時間ピングが届かなかったら、サーバーが落ちたと判定します
ただしこれは自分のPCに問題がある可能性もあるので
どちらかというと接続が切れたと書くほうが適切ですね
おわり
これでマルチプレイのロジックがわかったと思います。
あとは読者が独自に、Pygameとかで、これをもとに基底のライブラリを作成して
ゲームを作ってもらえればなと思います。
この後にあり得る問題としてはチートの処理です。
送信者が自身を偽装して、違うSteamIDを付与して送信する、
本来は存在していないはずの座標を送るなど
解決策としてはとにかくサーバー駆動にしてしまうのが速いと思います。
クライアントが操作できることはクライアントが現在可能な範囲のみと制約を付けるということです
ただしこれでもウォールハックなどは防げないです。
現在の多くのマルチゲームでも問題になっていることです。