0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

python×gRPCでのマルチプロセス処理の罠

Last updated at Posted at 2024-05-06

こんにちは!

Pythonでマルチプロセス処理を使用した際に躓いた経験があるため、その内容をこの記事で共有しようと思います。同じ問題で困っている方の助けになれば幸いです。

概要

  • pythonとgRPCでマルチプロセスを使用したエラーの再現
  • エラーの原因と解決策
  • 最後に

PythonとgRPCでマルチプロセスを使用したエラーの再現

PythonとgRPCでHello World

まずは公式クイックスタートに従ってPythonaとgRPCを用いたhello worldを実装します。
エラー再現のため環境はpython >= 3.9、grpcio >= 1.53で作成してください。

マルチプロセス実行を行うようにプログラムの書き換え

次にgreeter_server.py及びgreeter_client.pyを書き換えエラーを再現します。
書き換えた内容は以下です。

  • クライアント・サーバ間の疎通確認を20秒おきに送る設定の追加
  • サーバ側で子プロセスを作成し、300秒間待つプログラムの追加

プログラムは公式のサンプルを元に書き換えています

greeter_server.py
from concurrent import futures
import logging

import grpc
import helloworld_pb2
import helloworld_pb2_grpc
import multiprocessing
import time

class Greeter(helloworld_pb2_grpc.GreeterServicer):
    def SayHello(self, request, context):
        p = None
        parent_conn, child_conn = multiprocessing.Pipe(False)
        def f():
        """サーバ側の処理を300s待たせる関数"""
            time.sleep(300)
        p = multiprocessing.Process(target=f,args=())
        p.start()
        p.join()
        return helloworld_pb2.HelloReply(message="Hello, %s!" % request.name)


def serve():
    port = "50051"
    # 疎通確認を20秒おきに送る設定
    server_options = [
                    ("grpc.keepalive_time_ms", 20000),
                    ("grpc.keepalive_timeout_ms", 10000),
                    ("grpc.http2.min_ping_interval_without_data_ms", 5000),
                    ("grpc.max_connection_age_grace_ms", 5000),
                    ("grpc.http2.max_pings_without_data", 5),
                    ("grpc.keepalive_permit_without_calls", 1),
                    ]
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10),options=server_options)
    helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
    server.add_insecure_port("[::]:" + port)
    server.start()
    print("Server started, listening on " + port)
    server.wait_for_termination()


if __name__ == "__main__":
    logging.basicConfig()
    serve()
greeter_client.py
from __future__ import print_function

import logging

import grpc
import helloworld_pb2
import helloworld_pb2_grpc


def run():
    # NOTE(gRPC Python Team): .close() is possible on a channel and should be
    # used in circumstances in which the with statement does not fit the needs
    # of the code.
    print("Will try to greet world ...")
    # 疎通確認を20秒おきに送る設定の追加
    channel_options = [
                    ("grpc.keepalive_time_ms", 8000),
                    ("grpc.keepalive_timeout_ms", 5000),
                    ("grpc.http2.max_pings_without_data", 5),
                    ("grpc.keepalive_permit_without_calls", 1),
                    ]
    with grpc.insecure_channel("localhost:50051", options=channel_options) as channel:
        stub = helloworld_pb2_grpc.GreeterStub(channel)
        response = stub.SayHello(helloworld_pb2.HelloRequest(name="you"))
    print("Greeter client received: " + response.message)


if __name__ == "__main__":
    logging.basicConfig()
    run()

結果の確認

プログラムを変更した後にサーバを再起動し、クライアントを実行すると、以下のようなエラーが確認できます。

raise _InactiveRpcError(state)  # pytype: disable=not-instantiable
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
        status = StatusCode.UNAVAILABLE
        details = "ping timeout"
        debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2024-05-06T22:25:41.8447335+09:00", grpc_status:14, grpc_message:"ping timeout"}"

エラーの原因はと解決策は?

エラーの原因は?

原因は、エラー文にも書かれているように、サーバ側におくった疎通確認のためのpingが返ってこないことにあります。このエラーはマルチプロセス化を行わなかった場合は確認できていないため、マルチプロセス化の部分に問題があったと推測できます。
grpcでマルチプロセス実行を行う公式サンプルのドキュメントを確認すると原因が判明しました。以下は公式ドキュメントの一部です。

The library is implemented as a C extension, maintaining much of the state that drives the system in native code. As such, upon calling fork, any threads in a critical section may leave the state of the gRPC library invalid in the child process.

これを一部、日本語に翻訳すると”fork を呼び出すと、クリティカルセクションにあるスレッドは子プロセスで gRPC ライブラリの状態を無効にしてしまう可能性があります。”とのことです。
まさに今回の状況です。子プロセス作成の際には内部的にforkが呼び出されており、これが原因でgRPCの機能に不具合が生じてエラーが発生したようです。

解決策

解決策としては大きく以下の3つが考えられます。

  • マルチプロセスを行わない
  • エラーが発生していないバージョンを使用する(grpcio<1.53では問題なく動作していました)
  • マルチプロセスによるプロセス作成を工夫する

解決策は人によりけりだと思いますが、私は以下のような状況のため”プロセス作成の方法を工夫する”対応をとりたいと思います。

  • pythonで作ったシステムのスケーリングを行いたい
  • パッケージをアップデートしていきたい

実は、”プロセス作成の方法を工夫する”は公式でサンプルが提供されており、こちらになります。
forkでの不具合は、gRPCプロセスのインスタンスを作成する前にマルチプロセス化することで回避できるようです。
具体的にはサーバープロセスを複数立てて、適切に処理を割り振ることで並列処理を行っています。

最後に

今回のエラーに出くわして、”pythonは大規模システムには向いていない”という言葉を思いだしました。
ぱっと調べてみただけでも記事が見つかりました。
この言葉は、pythonはインタプリタ言語であるため実行速度が遅く大規模システムには向いていない、というような使われ方が通常よく耳にするので、今回のエラーと関係はないです。
しかし、世間で”pythonは大規模システムには向いていない”と言われているため、今回のようにライブラリの対応がされていない場合が多いのではないかと思いました。ライブラリに頼ることができないと、開発コストが高くなってしまうため、別のライブラリが充実した言語を使用したくなります。
結論、今回のエラーを経験して私の中では大規模システムの開発にpythonを選びたくない理由が一つ増えました。

以上、拙い文章でしたが最後までお読みいただきありがとうございました。

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?