1
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

recvでブロックしたワーカスレッドを終了させるサンプルコード

Posted at

背景

最近、以下のようなアプリを作る機会があった

  • ワーカスレッドでソケット経由でデータを受信する
  • メインスレッドでユーザからのキー入力を受信する
  • 特定のキー入力によってプロセスを終了させる

C-c押下(SIGINT)でプロセス全体をおとしても問題なかったが、時間もあったので「ワーカスレッドの終了を確認したあとプロセス全体を終了させる」サンプルを作ってみた

課題

ワーカスレッドは recv関数 を使っており、一旦ブロックさせたら外部からデータを受信するまでブロックを続ける(ソケットの設定を変えたらノンブロッキングになるが、今回はしなかった)
「スレッドの終了を確認したあと」を満たすためには、ブロックした状態から抜け出す必要があるため、やり方を調べた

コード、ビルド方法、実行サンプル

動作確認までの方法を示す

コード

以下に今回作成したコードを示す(c++歴が浅いのでコードの汚さなどには目をつぶってください)

sample.cpp
#include <arpa/inet.h>
#include <cstring>
#include <iostream>
#include <netinet/in.h>
#include <sys/eventfd.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/types.h>
#include <thread>
#include <unistd.h>

class Worker {
private:
  int sock_, efd_;
  std::thread th_;
  void ThreadFunc();

public:
  Worker(const char *ip, const int port);
  void Start();
  void Shutdown();
  void Join();
};

Worker::Worker(const char *ip, const int port) {
  std::cout << __PRETTY_FUNCTION__ << std::endl;

  if ((sock_ = socket(AF_INET, SOCK_DGRAM, 0)) == -1) {
    throw "socket() error";
  }

  struct sockaddr_in addr;
  addr.sin_family = AF_INET;
  addr.sin_port = htons(port);
  addr.sin_addr.s_addr = inet_addr(ip);
  if (bind(sock_, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
    throw "bind() error";
  }

  if ((efd_ = eventfd(0, 0)) == -1) {
    throw "eventfd() error";
  }

  std::cout << "  "
            << "Wait by " << ip << ":" << port << std::endl;
}

void Worker::ThreadFunc() {
  std::cout << __PRETTY_FUNCTION__ << std::endl;

  fd_set fds, rfds;
  FD_ZERO(&rfds);

  int max_fd = -1;
  FD_SET(sock_, &rfds);
  max_fd = (sock_ > max_fd ? sock_ : max_fd);
  FD_SET(efd_, &rfds);
  max_fd = (efd_ > max_fd ? efd_ : max_fd);

  while (1) {
    std::memcpy(&fds, &rfds, sizeof(fd_set));
    if (select(max_fd + 1, &fds, NULL, NULL, NULL) == -1) {
      throw "select() error";
    }

    if (FD_ISSET(sock_, &fds)) {
      char buf[256];
      memset(buf, 0, sizeof(buf));
      recv(sock_, buf, sizeof(buf), 0);
      std::cout << "  "
                << "recv from socket: " << buf << std::endl;
    }
    if (FD_ISSET(efd_, &fds)) {
      uint64_t num = 0;
      read(efd_, &num, sizeof(num));
      std::cout << "  "
                << "recv from eventfd" << std::endl;
      break;
    }
  }
}

void Worker::Start() {
  std::cout << __PRETTY_FUNCTION__ << std::endl;

  th_ = std::thread([this] { ThreadFunc(); });
}

void Worker::Shutdown() {
  std::cout << __PRETTY_FUNCTION__ << std::endl;

  uint64_t num = 1;
  if (write(efd_, &num, sizeof(num)) == -1) {
    std::cout << "efd_ = " << efd_ << std::endl;
    throw "write() error";
  }
}

void Worker::Join() {
  std::cout << __PRETTY_FUNCTION__ << std::endl;

  if (th_.joinable()) {
    th_.join();
  }

  std::cout << "end" << std::endl;
}

int main(int argc, char const *argv[]) {
  Worker worker("0.0.0.0", 10000);
  worker.Start();
  getchar();
  std::cout << "getchar" << std::endl;
  worker.Shutdown();
  worker.Join();
  return 0;
}

上記コードへデータを渡すための送信用コードを以下に示す。こちらはpythonで動作する

sender.py
#!/usr/bin/env python3
import socket
import sys


msg = "message"
if len(sys.argv) > 1:
    msg = sys.argv[1]
print("send : " + msg)

ip = "0.0.0.0"
port = 10000
addr = (ip, port)

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.sendto(msg.encode(), addr)

ビルド方法

ファイルsample.cppが存在するディレクトリで以下を実行することでsample.exeが作成される

$ g++ sample.cpp -o sample.exe -pthread

実行手順

はじめにコンソールを2つ起動する。
コンソール1をsample.exe用、コンソール2をsender.py用とする

コンソール1上で下記コマンドを入力しsample.exeを起動する

$ ./sample.exe

コンソール2上で下記コマンドを入力しsender.pyを起動する
sender.pyは引数を与えると、その内容をsample.exeへ送信する

$ python sender.py <message>

最後にコンソール1上でEnterキーを押下するとsample.exeが終了する
以下に動作時のサンプルを示す
この例では左側をコンソール1,右側をコンソール2とする
asciicast

注意点

※個人的にハマった点のメモ

selectに与える第1引数

selectに与える第1引数は監視する複数のfdの最大値に1を足した値であることに注意
もし1を足しわすれていても、見た目上は動作してしまうので問題が発生していることに気づくのが遅れる可能性がある(経験談)

eventfdに書き込むデータサイズ

eventfdへ書き込むデータは、はじめは終了タイミングを伝えるだけなので1バイトのデータを送信すれば良いと思っていた
しかしここに以下一文が書かれていることを発見した

渡されたバッファーの大きさが 8 バイト未満の場合、 read(2) はエラー EINVAL で失敗する。

以上理由からwriteやreadで渡すデータはuint64_tの変数を渡すのが好ましいと思われる

参考ページ

1
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?