Python ソケット通信 Hands‑On
この資料では、Python の socket
モジュールを用いて TCP/UDP 通信を実装しながら、ネットワーク基礎と実践的なプログラミング技法を学びます。各 Lab のコードは英語コメント、解説は日本語で記載しています。
目次
- はじめに
- 環境準備
- Lab 1: TCP Echo サーバ & クライアント
- Lab 2: マルチスレッド TCP サーバ
- Lab 3: UDP 通信
- Lab 4: 文字列とバイナリの扱い
- Lab 5: JSON メッセージの送受信
- Lab 6: シンプルチャット (select 利用)
- トラブルシューティング
- 練習課題
- 参考リンク
1. はじめに
- ソケットとは: OS が提供するネットワーク I/O のエンドポイント。プロセス間のデータ送受信を抽象化。
-
TCP と UDP:
- TCP: コネクション型、信頼性保証、順序保証、ストリーム指向。
- UDP: コネクションレス、信頼性無し、小さいレイテンシ、データグラム指向。
-
Python
socket
モジュール: C の BSD ソケット API を薄くラップ。socket.socket(family, type, proto)
で生成。
2. 環境準備
# Python 3.9 以上を推奨
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
pip install --upgrade pip
3. Lab 1: TCP Echo サーバ & クライアント
3.1 サーバ (server.py)
import socket # BSD ソケット API を利用するための標準ライブラリ
# =======================
# 設定パラメータ
# =======================
HOST = "0.0.0.0" # すべてのネットワークインターフェースで待ち受ける(INADDR_ANY)
PORT = 5000 # 任意のユーザー用ポート(1024–49151)
# =======================
# サーバーメインループ
# =======================
# with 文を使うことで、ブロックを抜けた際にソケットを自動的に close できる
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
# ① ソケットを (HOST, PORT) にバインド
# OS はこのポートへの TCP パケットを当プロセスに転送する
s.bind((HOST, PORT))
# ② ソケットをリスニング(受信待ち)状態にする
# backlog(接続待ちキューの長さ)はデフォルト値
s.listen()
print(f"[*] {HOST}:{PORT} で接続待ち受け開始")
# ③ クライアントからの接続を永続的に受け付けるループ
while True:
# accept() は TCP 3‑way ハンドシェイク完了までブロック
# 戻り値: conn = クライアント専用ソケット, addr = (IP, port)
conn, addr = s.accept()
print(f"[+] TCP 3‑way ハンドシェイク完了。クライアント {addr} と接続確立")
# with でクライアントソケットを管理し、ブロック終了時に close
print(f"[+] with でクライアントソケットを管理し、ブロック終了時に close")
with conn:
# ④ このクライアントとの送受信ループ
while True:
# recv() で最大 1024 バイト受信
# 相手が接続を閉じると b''(長さ 0)を返す
data = conn.recv(1024)
if not data:
print(f"[-] クライアント {addr} が切断")
break
# 受信データを表示(UTF‑8 文字列化できると仮定)
print(f"[*] 受信 ({len(data)} バイト): {data.decode()}")
# sendall() で同じデータをそのまま送り返す(エコーサーバ)
# sendall は送信完了まで内部で再送を行い、部分送信を隠蔽
conn.sendall(data)
3.2 クライアント (client.py)
import socket # BSDソケットAPIを利用する標準ライブラリ
# =======================
# 接続先サーバの情報
# =======================
HOST = "127.0.0.1" # サーバのIPアドレス(ローカルホスト)
PORT = 5000 # サーバのポート番号(エコーサーバ側と合わせる)
# =======================
# クライアント処理
# =======================
# with 文により、ブロックを抜けると自動でソケットを close
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
# ① サーバへ接続要求を送る(3‑way ハンドシェイク)
s.connect((HOST, PORT))
print("Type messages (Ctrl+C で終了)")
try:
# ② 入力 → 送信 → 受信 → 表示 を繰り返す無限ループ
while True:
# 標準入力から1行取得し、バイト列へエンコード(UTF‑8)
msg = input(">>> ").encode()
# ③ 送信: すべてのバイトが送られるまでブロック
s.sendall(msg)
# ④ 受信: サーバから最大1024バイト受け取る
data = s.recv(1024)
# recv() が b'' を返す場合、サーバが切断したことを示す
if not data:
print("[!] サーバとの接続が切断されました")
break
# ⑤ サーバからの反響(エコー)を表示
print(f"Echo: {data.decode()}")
except KeyboardInterrupt:
# Ctrl+C でループを抜けた際のハンドリング
print("\n[!] Exiting")
3.3 手順
- ターミナル A で
python server.py
を実行。 - ターミナル B で
python client.py
を実行し、文字列を入力。 - サーバが受信→同じ内容を返し、クライアントが表示。
3.4 学びのポイント
-
bind
とlisten
の役割。 -
recv
が 0 バイトを返すとソケットはクローズ。
4. Lab 2: マルチスレッド TCP サーバ
複数クライアント同時接続に対応します。threading
を使用。
import socket # BSDソケットAPIを使うための標準ライブラリ
import threading # クライアントごとにスレッドを立てるための標準ライブラリ
# =======================
# 設定パラメータ
# =======================
HOST = "0.0.0.0" # すべてのネットワークインターフェースで待ち受け
PORT = 15000 # 任意のTCPポート(エコー用)
# =======================
# クライアント処理関数
# =======================
def handle_client(conn, addr):
"""1クライアントとの送受信を担当するスレッド関数."""
print(f"[+] 新規スレッド開始: {addr}")
# with で自動的にクライアントソケットを close できるようにする
with conn:
while True:
data = conn.recv(1024) # 最大1024バイト受信(ブロッキング)
if not data: # b'' ならクライアントが切断
break
print(f"[*] 送信 ({len(data)} バイト): {data.decode()}")
conn.sendall(data) # 受信データをそのまま送り返す(エコー)
print(f"[-] クライアント切断: {addr}")
# =======================
# サーバメインループ
# =======================
# with でリスニングソケットを管理(スクリプト終了時に自動close)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((HOST, PORT)) # ① ソケットをIP/PORTにバインド
s.listen() # ② リスニング状態へ移行
print(f"[*] {HOST}:{PORT} で待ち受け開始")
while True:
# ③ 接続要求を受け付ける(完了までブロック)
conn, addr = s.accept()
# ④ 各クライアントを専用スレッドで処理(daemon=True で主スレッド終了時に自動終了)
threading.Thread(target=handle_client, args=(conn, addr), daemon=True).start()
# 標準出力にスレッドIDを表示
print(f"[+] スレッドID: {threading.get_ident()}")
# スレッド数を表示
print(f"[+] スレッド数: {threading.active_count()}")
# スレッドのリストを表示
print(f"[+] スレッドリスト: {threading.enumerate()}")
演習
- クライアントが送ったメッセージにタイムスタンプを付与して返す。
-
concurrent.futures.ThreadPoolExecutor
を使って書き換える。
5. Lab 3: UDP 通信
5.1 UDP サーバ (udp_server.py)
import socket # BSDソケットAPIを利用するための標準ライブラリ
# =======================
# 設定パラメータ
# =======================
HOST = "0.0.0.0" # すべてのネットワークインターフェースで待ち受け
PORT = 5002 # UDPポート番号(任意のユーザーポート)
# =======================
# UDPサーバ処理
# =======================
# with 文により、ブロックを抜けるとソケットが自動で close される
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
# ① IPアドレスとポートをソケットに紐づけ(バインド)
s.bind((HOST, PORT))
print(f"[*] UDPサーバ起動: {HOST}:{PORT}")
# ② パケット受信 → 送信を繰り返す無限ループ
while True:
# recvfrom() はデータと送信元アドレスのタプルを返す
# data: 受信バイト列
# addr: (クライアントIP, クライアントポート)
data, addr = s.recvfrom(1024) # 最大1024バイト受信(ブロッキング)
print(f"[+] 受信 {addr}: {data.decode()}")
# ③ そのまま送り返す(エコー)。UDPはコネクションレスなので sendto() で宛先を指定
s.sendto(data, addr)
5.2 UDP クライアント (udp_client.py)
import socket # BSDソケット API を利用する標準ライブラリ
# =======================
# 送信先サーバの情報
# =======================
HOST = "127.0.0.1" # サーバ(UDP エコーサーバ)の IP アドレス
PORT = 5002 # サーバが待ち受けている UDP ポート番号
# =======================
# UDP クライアント処理
# =======================
# with 文を使うことで、ブロックを抜けた際にソケットが自動で close される
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
while True:
# ① ユーザーから 1 行入力を受け取り、UTF‑8 でバイト列へ変換
msg = input(">>> ").encode()
# 空行(何も入力せず Enter)でループを終了する
if not msg:
break
# ② sendto(): UDP はコネクションレスなので送信先 (HOST, PORT) を毎回指定
s.sendto(msg, (HOST, PORT))
# ③ recvfrom(): サーバから最大 1024 バイト受信
# 戻り値は (受信データ bytes, 送信元アドレス tuple) だが、
# ここではアドレスは不要なので変数名を '_' で捨てている
data, _ = s.recvfrom(1024)
# ④ 受信データを UTF‑8 で文字列化して表示(エコー応答)
print(f"Echo: {data.decode()}")
メモ
-
connect()
は不要。毎回送信先を指定 - パケットロスや順序入れ替えに注意 (本 Lab では扱わない)
いったんTCP/UDPの比較 (Lab1, Lab2, Lab3)をしてみよう
比較観点 | Lab 1 単一接続 TCP |
Lab 2 マルチスレッド TCP |
Lab 3 UDP |
---|---|---|---|
プロトコル / ソケット種別 | TCP (SOCK_STREAM ) |
TCP (SOCK_STREAM ) |
UDP (SOCK_DGRAM ) |
接続モデル | コネクション型(3‑way ハンドシェイク) | コネクション型(3‑way ハンドシェイク) | コネクションレス |
並列処理 | なし(シングルスレッド) 同時に 1 クライアントのみ |
threading でクライアントごとにスレッド生成多数同時接続可 |
1 プロセスで複数クライアント対応(UDP パケットは順次処理) |
keep‑alive 時の資源 | 1 つの接続を保持 | 接続ごとにスレッド + ソケット | ソケットは 1 本のみ;状態はパケット単位 |
送受信 API |
recv /sendall (ストリーム) |
recv /sendall (ストリーム) |
recvfrom /sendto (データグラム) |
信頼性・順序保証 | OS が保証 | OS が保証 | なし(アプリ側で必要なら実装) |
コード構造 | 20 行弱・最小実装 | スレッド関数 + 生成で拡張 | 受信→即エコーだけのシンプル構造 |
用途イメージ | お試し/単体テスト | チャットや簡易 API 試作 | ブロードキャスト、リアルタイム通知、低レイテンシ通信 |
長所 | わかりやすい・デバッグ容易 | 同時接続に強い・TCP の利点継続 | 軽量・手続きが少ない・待ち受けが 1 本 |
短所 | 同時接続不可 | スレッドが増えるとコンテキストスイッチ負荷 | パケットロス/順序乱れの可能性 |
- Lab 1 は「TCP + 単一接続」で最小限のサーバを学ぶ入門ステップ。
- Lab 2 は同じ TCP でもスレッド化し、実用的な“並列”エコーサーバを体験。
-
Lab 3 はプロトコルを UDP に切り替え、コネクションレス通信の特徴と API (
recvfrom
/sendto
) を学ぶ。
これにより、TCP vs UDP と シングルスレッド vs マルチスレッド の二方向からソケット通信の設計上の選択肢を体験的に理解した。
6. Lab 4: 文字列とバイナリの扱い
- Python のソケットは バイト列 (
bytes
) を送受信。 -
str
↔bytes
変換はencode()
/decode()
。 - 文字化け回避のため UTF‑8 を明示。
msg_bytes = "こんにちは".encode("utf-8")
text = msg_bytes.decode("utf-8")
7. Lab 5: JSON メッセージの送受信
JSON を送るときは 長さを先頭に固定長で付ける と安全。
json_utils.py
import socket # BSDソケット API(ここでは型ヒント用/関数呼び出し側で使用)
import json # Python 標準の JSON 直列化ライブラリ
import struct # バイト列⇔整数などの変換を行うユーティリティ
HEADER_LEN = 4 # 先頭に付加する “メッセージ長” のバイト数(4 バイト = 32 ビット)
# =========================================
# 送信用ユーティリティ:JSON + 長さプレフィックス
# =========================================
def send_json(sock: socket.socket, obj):
"""
obj を JSON 文字列へ変換し、長さ情報(4 バイト)を付けて送信する。
送信フォーマット:
[先頭 4 バイト: メッセージ長 (Big‑Endian)] + [JSON 本体]
"""
# 1) Python オブジェクト → JSON 文字列 → UTF‑8 バイト列へエンコード
data = json.dumps(obj).encode()
# 2) struct.pack("!I", len(data)) で
# 32ビット符号なし整数をネットワークバイトオーダ(Big‑Endian)に変換
length_prefix = struct.pack("!I", len(data))
# 3) sendall() は内部で繰り返し送信し、全バイト送り切るまでブロックする
sock.sendall(length_prefix + data)
# =========================================
# 受信用ユーティリティ:JSON + 長さプレフィックス
# =========================================
def recv_json(sock: socket.socket):
"""
長さプレフィックス付きの JSON メッセージを受信して Python オブジェクトへ戻す。
サーバ/クライアント両方で再利用可能。
"""
# 1) まず固定長 4 バイトを読み取り、メッセージ全体の長さを取得
raw_len = sock.recv(HEADER_LEN)
if not raw_len: # 相手が切断した場合は None を返す
return None
# 2) 4 バイト → 整数へ変換('!I' = Big‑Endian Unsigned Int)
msg_len = struct.unpack("!I", raw_len)[0]
# 3) 本体部分を msg_len バイト受信
# ※ 実運用では部分受信に備えてループで受信するのが望ましい
data = sock.recv(msg_len)
# 4) JSON 文字列 → Python オブジェクトへデコードして返却
return json.loads(data.decode())
server_json.py
import socket, json_utils as ju
HOST, PORT = "0.0.0.0", 6000 # ポートは被らない番号に
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((HOST, PORT))
s.listen()
print(f"[*] JSON サーバ起動: {HOST}:{PORT}")
conn, addr = s.accept()
print(f"[+] {addr} と接続")
with conn:
while True:
msg = ju.recv_json(conn)
if msg is None:
break
print("受信:", msg)
ju.send_json(conn, {"echo": msg}) # そのまま返す
client_json.py
import socket, json_utils as ju
HOST, PORT = "127.0.0.1", 6000
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((HOST, PORT))
print("Ctrl+C で終了")
try:
while True:
text = input(">>> ")
ju.send_json(s, {"msg": text})
res = ju.recv_json(s)
print("Echo:", res)
except KeyboardInterrupt:
pass
実行方法
# ターミナルA
python server_json.py
# ターミナルB
python client_json.py
8. Lab 6: シンプルチャット (select 利用)
複数クライアントを select
で multiplex。
import socket, select # BSDソケットAPIとI/O多重化 select() を利用
# =======================
# 設定パラメータ
# =======================
HOST, PORT = "0.0.0.0", 5003 # すべてのNICで待ち受けるチャットサーバ
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# SO_REUSEADDR を有効にすると TIME_WAIT 状態のポートをすぐ再利用できる
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# サーバ用ソケットを IP/ポートにバインドしリスニング状態へ
server.bind((HOST, PORT))
server.listen()
# =======================
# 多重化用のソケット集合
# =======================
sockets = {server} # 監視対象ソケット集合(最初はサーバソケットのみ)
clients = {} # クライアントソケット → アドレス のマップ
print("[*] Chat server started")
# =======================
# メインループ
# =======================
while True:
# select() で読み込み可能なソケットを検出
# readable: データ受信可能 or 接続要求あり
readable, _, _ = select.select(sockets, [], [])
for sock in readable:
# ───────────────────────────────────────────
# ① 新規接続要求(サーバソケットにイベント)
# ───────────────────────────────────────────
if sock is server:
conn, addr = server.accept() # TCP 3‑way ハンドシェイク完了
sockets.add(conn) # 監視対象に追加
clients[conn] = addr # アドレスを記録
print(f"[+] {addr} joined")
# ───────────────────────────────────────────
# ② 既存クライアントからのデータ受信
# ───────────────────────────────────────────
else:
data = sock.recv(1024) # 最大 1024 バイト受信
if not data:
# recv() が空 ⇒ クライアント切断
sockets.remove(sock) # 監視対象から除外
print(f"[-] {clients[sock]} disconnected")
del clients[sock] # 辞書から削除
9. トラブルシューティング
症状 | 原因候補 | 解決策 |
---|---|---|
OSError: [Errno 98] Address already in use |
前回のプロセスが残存 | プロセスを kill する / SO_REUSEADDR
|
クライアントから接続できない | ファイアウォール / ポート誤り | ポート開放、IP 確認 |
UnicodeDecodeError | 不一致の encoding |
encode("utf-8") / decode("utf-8") を統一 |
10. 練習課題
- Lab 2 のサーバを プロセスプール (
multiprocessing
) で再実装。 - UDP サーバを改良し、簡易時刻同期 (サーバ時刻を返信) を実装。
- Lab 6 のチャットに ニックネーム機能 を追加。
- JSON 長さプレフィックスの代わりに 改行区切りプロトコル を作成。
11. 参考リンク
- Python 公式ドキュメント: https://docs.python.org/3/library/socket.html
- ビーコム TCP/IP Illustrated, Vol. 1 (解説書)
- IETF RFC 793 (TCP), RFC 768 (UDP)