1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

BrokenPipeError の原因と解決

Posted at

0. 要約

今回のエラー(BrokenPipeError)は、クライアントにレスポンスを返す際に、リッスンソケットではなく
self.sock.accept() で得られた接続用ソケットを使うべきだったことが原因。

tcp_server.py
response_payload = {
    "status": status,
    "operation": operation,
    "response": response
}

- self.sock.sendall(...)
+ connection.sendall(...)

connection.sendall(f"Response: {response_payload}".encode('utf-8'))
print("Response sent.")

1. はじめに

TCP Serverを用いた簡易的なチャットアプリを作成途中、BrokenPipeError に遭遇した。ここでは、このエラーが発生したときの状況と、その対処について述べる。

2. 環境

  • OS: Ubuntu 24.04.1 LTS
  • WSL Version: 2
  • Host OS: Windows 11
  • Python: 3.12.3
  • 実行方法: python3 tcp_server.py による直接起動

3. エラー発生時のコードと通信仕様

ここでは、BrokenPipeError に遭遇したときのコードを示す。

前提として、tcp_client.py はあらかじめ設定されたルーム名とユーザー名を tcp_server.py に送信する構成である。サーバは受信した operation の値に基づいて次のいずれかの動作を行う:

  • チャットルームの作成
  • チャットルームへの参加
  • チャットルームからの退出

各操作後、サーバはクライアントに結果を応答として返す設計である。応答には次の情報が含まれている:

  • status: 0 あるいは 1 を取り、0 は成功を、1 はエラーを表す
  • operation: tcp_client.pyに含まれていたoperation情報をそのまま返す
  • response: 実行した動作とその結果に基づいて作成されたメッセージが含まれている

初めに、tcp_server.pyを示す。

tcp_server.py
import socket
import os
import time
from rooms import Rooms
from utils import Token

class TCPServer:
    def __init__(self):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.rooms = Rooms()
        server_address = "0.0.0.0"
        server_port = 9001
        
        self.dpath = "temp"
        
        if not os.path.exists(self.dpath):
            os.makedirs(self.dpath)
        
        print("Server is starting...")
        self.sock.bind((server_address, server_port))
        self.sock.listen(1)
        
    def recv_all(self, sock, size):
        buffer = b''
        
        while len(buffer) < size:
            part = sock.recv(size - len(buffer))
            
            if not part:
                raise ConnectionError("Error: Connection Error happened")
            
            buffer += part
            
        return buffer
    
    def debug_print(self, room_name_size, operation, state, payload_size, room_name, payload):
        print("=== Debug Info ===")
        print(f"ヘッダー解析:")
        print(f"room_len={room_name_size}")
        print(f"op={operation}")
        print(f"state={state}")
        print(f"payload_size={payload_size}\n")
        print(f"ボディ解析:")
        print(f"Room Name: {room_name}")
        print(f"Payload (username): {payload}")
        print("===================")
        
        
    def run(self):
        try:
            while True:
                connection, client_address = self.sock.accept()
                
                try:
                    # ヘッダーの受信
                    header = self.recv_all(connection, 32)                    
                    room_name_size = header[0]
                    operation = header[1]
                    state = header[2]
                    payload_size = int.from_bytes(header[3:32], 'big')
                    body_length = room_name_size + payload_size
                    
                    # ボディの受信
                    body = self.recv_all(connection, body_length)
                    room_name = body[:room_name_size].decode('utf-8')
                    payload = body[room_name_size:].decode('utf-8')

                    # デバッグ用の出力
                    self.debug_print(room_name_size, operation, state, payload_size, room_name, payload)
                    
                    """ 
                    operation is:
                    
                    1: Create
                    2: Join
                    3: Leave
                    """
                    status = 0 # 0: Success, 1: Error
                    response = None
                    
                    if operation == 1:
                        if self.rooms.create_room(room_name):
                            print(f"Room '{room_name}' created successfully.")
                            response = f"Room '{room_name}' created successfully."
                        else:
                            print(f"Room '{room_name}' already exists.")
                            response = f"Room '{room_name}' already exists."
                            status = 1
                            
                    elif operation == 2:
                        if self.rooms.join_room(room_name, payload):
                            print(f"User '{payload}' joined room '{room_name}'.")
                            response = f"User '{payload}' joined room '{room_name}'."
                        else:
                            print(f"Room '{room_name}' does not exist.")
                            response = f"Room '{room_name}' does not exist."
                            status = 1

                    elif operation == 3:
                        if self.rooms.leave_room(room_name, payload):
                            print(f"User '{payload}' left room '{room_name}'.")
                            response = f"User '{payload}' left room '{room_name}'."
                        else:
                            print(f"Room '{room_name}' does not exist.")
                            response = f"Room '{room_name}' does not exist."
                            status = 1
                            
                    response_payload = {
                        "status": status,
                        "operation": operation,
                        "response": response
                    }
                    
                    self.sock.sendall(f"Response: {response_payload}".encode('utf-8'))
                    print("Response sent.")


                finally:
                    connection.close()
        
        except KeyboardInterrupt:
            print("Server is shutting down...")
        finally:
            self.sock.close()
            
if __name__ == "__main__":
    server = TCPServer()
    server.run()

次に、tcp_client.pyを示す。

tcp_client.pyimport socket
import sys

class TCPClient:
    def __init__(self):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_address = '0.0.0.0'
        self.server_port = 9001
        
        try:
            self.sock.connect((self.server_address, self.server_port))
        except socket.error as e:
            print(f"Connection error: {e}")
            self.sock.close()
            sys.exit(1)
    
    
    def send_data(self, room_name, payload):
        try:
            room_name_bytes = room_name.encode('utf-8')
            payload_bytes = payload.encode('utf-8')

            room_name_size = len(room_name_bytes)
            payload_size = len(payload_bytes)

            header = bytearray(32)
            header[0] = room_name_size
            header[1] = 1  # Operation: 1=Create
            header[2] = 0  # State: 0=Request
            header[3:32] = payload_size.to_bytes(29, 'big')

            body = room_name_bytes + payload_bytes

            self.sock.sendall(header + body)
            print("Sent!")
            
            # 受信処理
            response = self.sock.recv(2048).decode('utf-8')
            print("Server Response:", response)
            
            
        finally:
            self.sock.close()

if __name__ == "__main__":
    client = TCPClient()
    room_name = "testroom"
    payload = "alice"  # プレーン文字列
    client.send_data(room_name, payload)

4. 実行結果

前項で示したコードを実行した結果を下記に示す。

tcp_client.pyの実行結果
Sent!
Server Response:
tcp_server.pyの実行結果
Server is starting...
=== Debug Info ===
ヘッダー解析:
room_len=8
op=1
state=0
payload_size=5

ボディ解析:
Room Name: testroom
Payload (username): alice
===================
Room 'testroom' created successfully.
Traceback (most recent call last):
  File "/mnt/c/Users/sfuji/dev_workspace/portfolios/chat_app/tcp_server.py", line 128, in <module>
    server.run()
  File "/mnt/c/Users/sfuji/dev_workspace/portfolios/chat_app/tcp_server.py", line 114, in run
    self.sock.sendall(f"Response: {response_payload}".encode('utf-8'))
BrokenPipeError: [Errno 32] Broken pipe

5. 発生したエラーとその原因

前項で示した通り、tcp_server.py でエラーが発生し、tcp_client.py 側でもサーバーからの応答がなく通信は終了された。

実行結果から、クライアント側からサーバ側への情報の送信は成功していて、また operation に基づく動作も実行されていることがわかる。

tcp_server.py での実行結果
Server is starting...
=== Debug Info ===
ヘッダー解析:
room_len=8
op=1
state=0
payload_size=5

ボディ解析:
Room Name: testroom
Payload (username): alice
===================
Room 'testroom' created successfully.

以上より、サーバーからクライアント側への送信の時に問題が発生していることがわかった。

6. 原因の詳細と修正方法

今回のエラーは、サーバーがクライアントへの応答を送信する際に、self.sock.sendall(...) を使用していことが原因だった。

ここで使用していた self.sock は、クライアントと通信するためのソケットではなく、接続待ち受け用のリッスンソケットであった。そのため、クライアントが接続を終えた後に送信しようとすると BrokenPipeError が発生する。

正しくは、connection, _ = self.sock.accept() によって得られた connection オブジェクトを通じて通信を行う必要

tcp_server.py
response_payload = {
    "status": status,
    "operation": operation,
    "response": response
}

- self.sock.sendall(...)
+ connection.sendall(...)

connection.sendall(f"Response: {response_payload}".encode('utf-8'))
print("Response sent.")

7. 修正後の実行結果

修正後の実行結果を下記に示す。

初めに、tcp_server.py を示す。

tcp_server.py
Server is starting...
=== Debug Info ===
ヘッダー解析:
room_len=8
op=1
state=0
payload_size=5

ボディ解析:
Room Name: testroom
Payload (username): alice
===================
Room 'testroom' created successfully.
Response sent.

次に tcp_client.py を示す。

tcp_client.py
Sent!
Server Response: Response: {'status': 0, 'operation': 1, 'response': "Room 'testroom' created successfully."}

実行結果より、想定通りに動作することを確認できた。

8. 最後に

本記事では、チャットアプリの作成時に遭遇した BrokenPipeError の発生と対処までの一連の流れについてまとめました。

ソケット通信に入門したばかりなので、エラー以外に色々と突っ込みどころがあるかもしれません。コメントで教えてください。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?