はじめに
こんにちは!
先日開催された「Google Cloud Next Tokyo '25」に参加した際、
Agonesを使ったゲームのデモを見て、「自分でも構築してみたい!」と思いました。
これまで多くのGoogle Cloudのサービスを触ってきましたが、Google Kubernetes Engine (GKE) をほとんど触ったことがなかったことをふと思い出しました。
そこで今回は、GKEを触ることもかねて、PythonのPygameで作成したシンプルなシューティングゲームを、GKEとAgonesを使ってオンラインで複数人が同時プレイできるようにしてみました。
システムアーキテクチャ
今回構築したシステムの全体像は以下のようになっています。
- Client: プレイヤーが手元で操作するゲームクライアント。Pygameで作成しています。
- Allocator Server: プレイヤーがゲームを開始しようとすると、まずこのサーバーに接続します。こちらはAgonesに対し、「今すぐプレイできるGame Serverを1つください」とリクエストを送る役割を担います。
-
Game Server: 実際にゲームロジックが動作するサーバー。Agonesの
Fleet
という単位で管理されており、必要に応じて自動的にスケールします。 - Status Server: 現在、何人のプレイヤーがゲームをプレイしているかといった、ゲーム全体の統計情報を提供
実装
まず、Client、Allocator Server、Game Server、Status Serverを実装していきます。
Client
Clientは、ゲームの描画、ユーザー入力の受付、そしてStatus Serverとの通信を担当します。
サーバーの割り当てリクエスト
ゲームを開始する前に、クライアントはまずAllocator Server
にHTTPリクエストを送り、接続すべきGame Server
のIPアドレスとポート番号を取得します。
割り当てリクエストのプログラム
def allocate_server(allocator_url):
"""
アロケーターサービスにHTTP POSTリクエストを送信して、
利用可能なゲームサーバーのIPアドレスとポートを取得します。
"""
print(f"アロケーターサービスに接続中: {allocator_url}...")
try:
# アロケーターにPOSTリクエストを送信 (タイムアウトは15秒)
response = requests.post(allocator_url, timeout=15)
# ステータスコードが200番台でない場合は例外を発生させる
response.raise_for_status()
# レスポンスをJSONとしてパース
data = response.json()
# ステータスが 'Allocated' (割り当て済み) かどうかを確認
if data.get("status") == "Allocated":
print(f"サーバーが割り当てられました: {data.get('server_name')}")
return data.get("address"), data.get("port")
else:
print(f"割り当てに失敗しました: {data.get('message')}")
return None, None
except requests.exceptions.RequestException as e:
print(f"アロケーターへの接続中にエラーが発生しました: {e}")
return None, None
サーバーとの通信
サーバー情報を取得したら、TCPソケットを使ってGame Serverに接続し、ゲームループを開始します。ループの中では、ユーザーのキー入力をサーバーに送信し、サーバーからは現在のゲーム状態(プレイヤーの位置、敵の位置、現在のプレイ人数など)をJSON形式で受け取って画面に描画します。
サーバー通信のプログラム
# プレイヤーのスプライトやUI要素を初期化
player_sprite = Player()
player_mini_img = pygame.transform.scale(player_sprite.image, (25, 20))
player_mini_img.set_colorkey(BLACK)
running = True
last_game_state = None
# ゲームループを開始
while running:
# フレームレートを60に制限
clock.tick(60)
# サーバーからゲーム状態を受信
try:
if not socket_file:
break
# ソケットから1行読み込む (改行コードまで)
line = socket_file.readline()
if not line:
print("サーバーから切断されました。")
break
game_state = json.loads(line)
except (ConnectionResetError, json.JSONDecodeError, socket.timeout, ConnectionAbortedError) as e:
print(f"データの受信中にエラーが発生しました: {e}")
break
# ゲームの状態に応じて画面を更新
if game_state.get('game_state') != last_game_state:
if game_state.get('game_state') == 'start':
# スタート画面を表示
player_count = game_state.get("total_players", 0)
if show_start_screen(screen, player_count):
# ゲーム開始をサーバーに通知
client_socket.sendall((json.dumps({"start_game": True, "keys": []}) + '\n').encode('utf-8'))
elif game_state.get('game_state') == 'game_over':
# ゲームオーバー画面を表示
if show_go_screen(screen, game_state['score']):
# リプレイをサーバーに通知
client_socket.sendall((json.dumps({"start_game": True, "keys": []}) + '\n').encode('utf-8'))
# サーバーがゲームをリセットするのを待つ
while running:
try:
line = socket_file.readline()
if not line:
running = False
break
new_state = json.loads(line)
if new_state.get('game_state') == 'playing':
game_state = new_state
break
except (ConnectionResetError, json.JSONDecodeError, socket.timeout, ConnectionAbortedError) as e:
print(f"ゲームリセット待機中にエラー: {e}")
running = False
break
last_game_state = game_state.get('game_state')
# ユーザーの入力を処理
keys_pressed = []
for event in pygame.event.get():
if event.type == pygame.QUIT:
running = False
# スペースキーが押されたら射撃
if event.type == pygame.KEYDOWN and event.key == pygame.K_SPACE:
keys_pressed.append('space')
# 左右の矢印キーの長押しを検出
keys = pygame.key.get_pressed()
if keys[pygame.K_LEFT]:
keys_pressed.append('left')
if keys[pygame.K_RIGHT]:
keys_pressed.append('right')
# サーバーに入力情報を送信
if keys_pressed:
try:
client_socket.sendall((json.dumps({"keys": keys_pressed}) + '\n').encode('utf-8'))
except (ConnectionResetError, BrokenPipeError):
print("サーバーとの接続が切れました。")
break
# ゲームがプレイ中の場合、画面を描画
if game_state.get('game_state') == 'playing':
all_sprites = pygame.sprite.Group()
# サーバーから受け取った情報で各スプライトの状態を更新
player_sprite.update_state(game_state['player'])
all_sprites.add(player_sprite)
for enemy_state in game_state['enemies']:
all_sprites.add(Enemy(enemy_state))
for bullet_state in game_state['bullets']:
all_sprites.add(Bullet(bullet_state))
# 画面を黒で塗りつぶし、全スプライトを描画
screen.fill(BLACK)
all_sprites.draw(screen)
# スコアや残機、現在のプレイ人数のUIを描画
draw_text(screen, f"Score: {game_state['score']}", 22, SCREEN_WIDTH / 2, 10)
draw_lives(screen, SCREEN_WIDTH - 100, 5, game_state['player']['lives'], player_mini_img)
total_players = game_state.get("total_players", 0)
player_count_text = f"Total Players: {total_players}" if total_players != -1 else "Players: N/A"
draw_text(screen, player_count_text, 18, 80, 10)
# 画面を更新
pygame.display.flip()
Game Server
Game Serverは、Agonesの管理下で動作する専用のゲームサーバーです。
Agones SDKとの連携
Agonesは、各Game ServerコンテナにサイドカーとしてSDKサーバーを設置します。Game Serverは、このサイドカーにHTTPリクエストを送ることで、自身の状態(Ready
, Allocated
など)をAgonesに通知します。
Agones SDKのプログラム
# Agones SDKが有効化されているか環境変数で確認
AGONES_SDK_ENABLED = "AGONES_SDK_HTTP_PORT" in os.environ
# Agones SDKサイドカーのエンドポイント
AGONES_ENDPOINT = f'http://localhost:{os.environ.get("AGONES_SDK_HTTP_PORT")}'
def agones_request(method, path, body=None):
"""Agones SDKサイドカーにリクエストを送信するヘルパー関数(リトライ機能付き)"""
if not AGONES_SDK_ENABLED:
return
url = f"{AGONES_ENDPOINT}{path}"
# 接続エラー時のリトライを設定
retry_strategy = Retry(
total=5,
backoff_factor=0.5,
status_forcelist=[500, 502, 503, 504],
allowed_methods=["GET", "POST", "PUT"],
connect=5
)
adapter = HTTPAdapter(max_retries=retry_strategy)
http = requests.Session()
http.mount("http://", adapter)
try:
# 設定したセッションでリクエストを送信
if method == 'POST':
response = http.post(url, json=body if body is not None else {}, timeout=2.0)
elif method == 'PUT':
response = http.put(url, json=body if body is not None else {}, timeout=2.0)
elif method == 'GET':
response = http.get(url, timeout=2.0)
else:
return None
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"Agonesへのリクエストに失敗しました (Path: {path}): {e}")
return None
def agones_ready():
"""サーバーがクライアント接続を受け入れ準備完了状態であることをAgonesに通知"""
agones_request('POST', '/ready')
def agones_health():
"""Agonesにヘルスチェックピングを送信"""
agones_request('POST', '/health', body={})
def agones_shutdown():
"""サーバーがシャットダウンすることをAgonesに通知"""
agones_request('POST', '/shutdown')
def main():
# ポーリング用のスレッドやAgones SDKの初期化
polling_thread = threading.Thread(target=update_total_players_periodically)
polling_thread.daemon = True
polling_thread.start()
if AGONES_SDK_ENABLED:
print("Agones SDKが有効です。ヘルスチェックを開始します。")
# ヘルスチェックをバックグラウンドで定期的に実行するスレッドを開始
agones_health_thread = threading.Thread(target=agones_health_check_thread)
agones_health_thread.daemon = True
agones_health_thread.start()
agones_set_label("server-type", "shooting-game")
agones_set_player_capacity(1)
# TCPソケットのセットアップ
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setblocking(False)
server_socket.bind(('0.0.0.0', int(os.environ.get("PORT", 7654))))
server_socket.listen(1)
print(f"サーバーがポート {server_socket.getsockname()[1]} で待機中...")
# サーバーの準備ができたことをAgonesに通知
if AGONES_SDK_ENABLED:
print("AgonesにサーバーがReady状態であることを通知します。")
agones_ready()
try:
# メインのゲームループ
while True:
# クライアントが接続したら
if conn:
if AGONES_SDK_ENABLED:
# Agonesにプレイヤーが接続したことを通知
agones_player_connect(player_id)
finally:
# クリーンアップ処理
if conn:
conn.close()
server_socket.close()
if AGONES_SDK_ENABLED:
if player_id:
# Agonesにプレイヤーが切断したことを通知
agones_player_disconnect(player_id)
# サーバーのシャットダウンをAgonesに通知
print("Agones SDKをシャットダウンします。")
agones_shutdown()
print("サーバーをシャットダウンしました。")
if __name__ == "__main__":
main()
定期的にヘルスチェックを送信するスレッドも実行し、Agonesにサーバーが正常であることを伝え続けます。
Allocator Server
このサーバーは、クライアントからのリクエストに応じて、AgonesにGameServerAllocation
というカスタムリソースの作成を依頼し、AgonesはReady
状態のGame Serverを1つ探し、その情報を返却します。
Allocator Serverのプログラム
custom_api = kubernetes.client.CustomObjectsApi()
@app.route('/allocate', methods=['POST'])
def allocate_game_server():
"""GameServerAllocationを作成し、割り当てられたサーバー情報を返す"""
# GameServerAllocationのマニフェストを定義
gsa_body = {
"apiVersion": f"{AGONES_ALLOCATION_GROUP}/{AGONES_ALLOCATION_VERSION}",
"kind": "GameServerAllocation",
"spec": {
"required": {
"matchLabels": {
"game": "shooting-game"
}
}
}
}
try:
# Kubernetes APIを呼び出してGameServerAllocationを作成
gsa_response = custom_api.create_namespaced_custom_object(
group=AGONES_ALLOCATION_GROUP,
version=AGONES_ALLOCATION_VERSION,
namespace=NAMESPACE,
plural=AGONES_ALLOCATION_PLURAL,
body=gsa_body
)
# レスポンスからステータスを取得
response_status = gsa_response.get("status", {})
server_state = response_status.get("state")
# 割り当て成功
if server_state == "Allocated":
address = response_status.get("address")
port = response_status.get("ports", [{}])[0].get("port")
server_name = response_status.get("gameServerName")
print(f"サーバー {server_name} の割り当てに成功: {address}:{port}")
# クライアントに必要な情報をJSONで返す
return jsonify({
"status": server_state,
"address": address,
"port": port,
"server_name": server_name
}), 200
elif server_state == "Unallocated":
print("割り当て失敗: Ready状態のサーバーがありません。")
return jsonify({"status": "Unallocated", "message": "No ready servers available."}),
else:
# その他の予期せぬステータス
print(f"予期せぬステータスで割り当て失敗: {server_state}")
return jsonify({"status": "Error", "message": f"Unexpected status: {server_state}"}), 500
except kubernetes.client.ApiException as e:
print(f"Kubernetes APIエラー: {e.reason} ({e.status})")
return jsonify({"status": "Error", "message": e.reason}), e.status
except Exception as e:
print(f"予期せぬエラーが発生: {e}")
return jsonify({"status": "Error", "message": "An unexpected error occurred."}),
Status Server
このサーバーは、Kubernetes APIを直接叩いて、現在Allocated
(プレイヤーが接続中)状態のGameServerの数を数え、それを全体のプレイヤー数として返します。
Status Serverのプログラム
custom_api = kubernetes.client.CustomObjectsApi()
@app.route('/status', methods=['GET'])
def get_server_status():
"""アクティブなGameServerの数を数えて返す"""
total_players = 0
try:
# 指定されたネームスペース内のすべてのGameServerオブジェクトをリストアップ
gs_list = custom_api.list_namespaced_custom_object(
group=AGONES_GROUP,
version=AGONES_VERSION,
namespace=NAMESPACE,
plural=AGONES_PLURAL
)
# GameServerをループして 'Allocated' 状態のものを数える
for gs in gs_list.get('items', []):
server_state = gs.get('status', {}).get('state')
if server_state == 'Allocated':
total_players += 1
return jsonify({"total_players": total_players})
except Exception as e:
print(f"Kubernetes APIのクエリ中にエラー: {e}")
return jsonify({"error": str(e)}), 500
GKEへのデプロイ準備
アプリケーションをGKEにデプロイするために、Dockerfile
と、Kubernetesリソースを定義するYAML
ファイルを作成します。
Dockerfile
以下はDockerfileです。サーバーごとに差はなく、同じ内容です。
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
#サーバーごとに違うpythonファイルを指定。
COPY allocator_server.py .
ENV PYTHONUNBUFFERED 1
CMD ["python", "allocator_server.py"]
KubernetesとAgonesの設定
Game Serverのテンプレートとオートスケールの設定
Game Serverのデプロイには、AgonesのFleet
を使ってテンプレートを作成します。また、FleetAutoscaler
も使うことで、「常に2台のサーバーを待機状態に保つ」といった柔軟なスケーリングが可能になります。
Game ServerのYAML
apiVersion: agones.dev/v1
kind: Fleet
metadata:
name: shooting-game-fleet
spec:
replicas: 2
template:
metadata:
labels:
game: shooting-game
spec:
ports:
- name: default
portPolicy: Dynamic
containerPort: 7654
template:
spec:
containers:
- name: shooting-game
# 使用するコンテナイメージ
image: asia-northeast1-docker.pkg.dev/[YOUR_PROJECT_ID]/[REPOSITORY_NAME]/game-server:[TAG]
---
apiVersion: autoscaling.agones.dev/v1
kind: FleetAutoscaler
metadata:
name: shooting-game-fleet-autoscaler
spec:
fleetName: shooting-game-fleet
policy:
type: Buffer
buffer:
bufferSize: 2
minReplicas: 2
maxReplicas: 10
Allocator ServerとStatus Serverの設定
Allocator ServerとStatus Serverは、KubernetesのDeployment
とService
としてデプロイします。ただし、AgonesやKubernetesのAPIを操作する必要があるため、ServiceAccount
, Role
, RoleBinding
を作成し、適切な権限を付与します。
Allocator ServerのYAML
# サービスアカウント
apiVersion: v1
kind: ServiceAccount
metadata:
name: allocator-sa
---
# ロール
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: allocator-role
rules:
- apiGroups: ["allocation.agones.dev"]
resources: ["gameserverallocations"]
verbs: ["create"]
---
# 3.ロールの割り当て
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: allocator-rb
subjects:
- kind: ServiceAccount
name: allocator-sa
roleRef:
kind: Role
name: allocator-role
apiGroup: rbac.authorization.k8s.io
---
# デプロイ
apiVersion: apps/v1
kind: Deployment
metadata:
name: allocator-deployment
labels:
app: allocator-service
spec:
replicas: 1
selector:
matchLabels:
app: allocator-service
template:
metadata:
labels:
app: allocator-service
spec:
serviceAccountName: allocator-sa
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app
operator: In
values:
- status-server
topologyKey: "kubernetes.io/hostname"
- labelSelector:
matchExpressions:
- key: game
operator: In
values:
- shooting-game
topologyKey: "kubernetes.io/hostname"
containers:
- name: allocator-service
# 使用するコンテナイメージ
image: asia-northeast1-docker.pkg.dev/[YOUR_PROJECT_ID]/[REPOSITORY_NAME]/allocation-server:[TAG]
ports:
- containerPort: 8080
env:
- name: NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
---
# サービス
apiVersion: v1
kind: Service
metadata:
name: allocator-service
labels:
app: allocator-service
spec:
type: LoadBalancer
selector:
app: allocator-service
ports:
- protocol: TCP
port: 80
targetPort: 8080
Status ServerのYAML
# サービスアカウント
apiVersion: v1
kind: ServiceAccount
metadata:
name: status-server-sa
---
# ロール
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: status-server-role
rules:
- apiGroups: ["agones.dev"]
resources: ["gameservers"]
verbs: ["get", "list"]
---
# ロールの割り当て
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: status-server-rb
subjects:
- kind: ServiceAccount
name: status-server-sa
roleRef:
kind: Role
name: status-server-role
apiGroup: rbac.authorization.k8s.io
---
# デプロイ
apiVersion: apps/v1
kind: Deployment
metadata:
name: status-server-deployment
labels:
app: status-server
spec:
replicas: 1
selector:
matchLabels:
app: status-server
template:
metadata:
labels:
app: status-server
spec:
serviceAccountName: status-server-sa
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: game
operator: In
values:
- shooting-game
topologyKey: "kubernetes.io/hostname"
- labelSelector:
matchExpressions:
- key: app
operator: In
values:
- allocator-service
topologyKey: "kubernetes.io/hostname"
containers:
- name: status-server
# 使用するコンテナイメージ
image: asia-northeast1-docker.pkg.dev/[YOUR_PROJECT_ID]/[REPOSITORY_NAME]/status-server:[TAG]
ports:
- containerPort: 8080
env:
- name: NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
---
# サービス
apiVersion: v1
kind: Service
metadata:
name: status-service
labels:
app: status-server
spec:
selector:
app: status-server
ports:
- protocol: TCP
port: 80 # The port the service will be available on
targetPort: 8080 # The port the container is listening on
---
構築手順
では、実際にGKEクラスタの作成からアプリケーションのデプロイまでしていきます。構築では基本的にCloud Shellを使用しました。
1. GKEクラスタの作成
まず、AgonesをインストールするためのGKEクラスタを作成します。
コンソールからAutopilotモードで作成を行います。
Cloud Shellでクラスタにアクセスできるように以下のコマンドを実行。
# kubectlが新しいクラスタを指すように設定
gcloud container clusters get-credentials $CLUSTER_NAME --region=$REGION
※今回、TCPの7000-8000ポートを使用するので、使用するVPCへのファイアウォールルール追加を忘れないでください。
2. Agonesのインストール
次に、作成したクラスタにAgonesをインストールします。
# Helmリポジトリを追加
helm repo add agones https://agones.dev/chart/stable
helm repo update
# Agonesをインストール
helm install my-agones agones/agones --namespace agones-system --create-namespace
# 状態確認
kubectl get pods --namespace agones-system
3. コンテナイメージのビルドとプッシュ
次に、3つのサーバー(Game Server, Allocator Server, Status Server)のコンテナイメージをビルドし、Google Artifact Registryにプッシュします。
# 変数を設定
export GCP_PROJECT=egi-study
export REPO_NAME=shooting-games
export GAME_IMAGE_NAME=game-server
export STATUS_IMAGE_NAME=status-server
export ALLOCATION_IMAGE_NAME=allocation-server
export IMAGE_TAG=v1
export AR_LOCATION=asia-northeast1 # 例: 東京
# Artifact Registryリポジトリを作成(初回のみ)
gcloud artifacts repositories create "${REPO_NAME}" \
--repository-format=docker \
--location="${AR_LOCATION}"
# Dockerに認証を設定
gcloud auth configure-docker "${AR_LOCATION}-docker.pkg.dev"
# イメージをビルド
docker build -t "${AR_LOCATION}-docker.pkg.dev/${GCP_PROJECT}/${REPO_NAME}/${GAME_IMAGE_NAME}:${IMAGE_TAG}" .
docker build -t "${AR_LOCATION}-docker.pkg.dev/${GCP_PROJECT}/${REPO_NAME}/${STATUS_IMAGE_NAME}:${IMAGE_TAG}" .
docker build -t "${AR_LOCATION}-docker.pkg.dev/${GCP_PROJECT}/${REPO_NAME}/${ALLOCATION_IMAGE_NAME}:${IMAGE_TAG}" .
# イメージをプッシュ
docker push "${AR_LOCATION}-docker.pkg.dev/${GCP_PROJECT}/${REPO_NAME}/${GAME_IMAGE_NAME}:${IMAGE_TAG}"
docker push "${AR_LOCATION}-docker.pkg.dev/${GCP_PROJECT}/${REPO_NAME}/${STATUS_IMAGE_NAME}:${IMAGE_TAG}"
docker push "${AR_LOCATION}-docker.pkg.dev/${GCP_PROJECT}/${REPO_NAME}/${ALLOCATION_IMAGE_NAME}:${IMAGE_TAG}"
4. Kubernetesへのデプロイ
コンテナイメージの準備ができたら、Kubernetesにデプロイします。
# yamlはGame Server、Status Server、Allocator Serverのもの
kubectl apply -f fleet.yaml -f status-deployment.yaml -f allocator-deployment.yaml
デプロイ後、各Podが正常に起動していることを確認します。
kubectl get pods
5. クライアントの実行
最後に、Pygameクライアントを実行してゲームサーバーに接続します。
まず、以下のコマンドでAlocator Serverの外部IPアドレスを取得します。
kubectl get service allocator-service
EXTERNAL-IP
列に表示されるIPアドレスをコピーし、以下のようにURLを.envに設定します。
ALLOCATOR_URL=http://[EXTERNAL_IP]/allocate
準備ができたら、クライアントを実行します。
# Clientのpythonファイルを実行
python client.py
動かしてみた
動作確認をしてみます。
現在のGame Serverの状態を確認すると、AgonesのFleetAutoscaler
により、指定したバッファ数である2台のゲームサーバーPodがReady
(準備完了)状態で待機していることがわかります。
この状態で、Clientを実行してみます。
ゲームが起動しました。
もちろん、普通に遊ぶことができます。
では再度Game Severの状態を確認すると、1台のサーバーの状態がAllocated
に変わっていることが確認できます。
では、もう1つ別のターミナルでClientを実行してみます。
ゲームが起動しました。ゲームプレイ人数が2人になっていることができますね。
では再度Game Severの状態を確認すると、2台のサーバーの状態がAllocated
に変わっていることが確認できます。
このように、AgonesがGame Serverを自動スケール様子が確認できました。
まとめ
今回は、Pygameで作成したゲームをGKEとAgonesを使って同時プレイできるようにしてみました。
イベントで見たデモでは、ゲームで得た情報をBigQueryに貯めて、AIに分析させたりといったことをしていたので、
今後はそういった機能にも挑戦していきたいと思っています。他にもオンラインゲームにあるマッチング機能も実装してみたいですね。
あと、GKE面白いと素直に感じました。
最後まで読んでいただきありがとうございました。