背景
最近、以下のようなアプリを作る機会があった
- ワーカスレッドでソケット経由でデータを受信する
- メインスレッドでユーザからのキー入力を受信する
- 特定のキー入力によってプロセスを終了させる
C-c押下(SIGINT)でプロセス全体をおとしても問題なかったが、時間もあったので「ワーカスレッドの終了を確認したあとプロセス全体を終了させる」サンプルを作ってみた
課題
ワーカスレッドは recv関数 を使っており、一旦ブロックさせたら外部からデータを受信するまでブロックを続ける(ソケットの設定を変えたらノンブロッキングになるが、今回はしなかった)
「スレッドの終了を確認したあと」を満たすためには、ブロックした状態から抜け出す必要があるため、やり方を調べた
コード、ビルド方法、実行サンプル
動作確認までの方法を示す
コード
以下に今回作成したコードを示す(c++歴が浅いのでコードの汚さなどには目をつぶってください)
#include <arpa/inet.h>
#include <cstring>
#include <iostream>
#include <netinet/in.h>
#include <sys/eventfd.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/types.h>
#include <thread>
#include <unistd.h>
class Worker {
private:
int sock_, efd_;
std::thread th_;
void ThreadFunc();
public:
Worker(const char *ip, const int port);
void Start();
void Shutdown();
void Join();
};
Worker::Worker(const char *ip, const int port) {
std::cout << __PRETTY_FUNCTION__ << std::endl;
if ((sock_ = socket(AF_INET, SOCK_DGRAM, 0)) == -1) {
throw "socket() error";
}
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip);
if (bind(sock_, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
throw "bind() error";
}
if ((efd_ = eventfd(0, 0)) == -1) {
throw "eventfd() error";
}
std::cout << " "
<< "Wait by " << ip << ":" << port << std::endl;
}
void Worker::ThreadFunc() {
std::cout << __PRETTY_FUNCTION__ << std::endl;
fd_set fds, rfds;
FD_ZERO(&rfds);
int max_fd = -1;
FD_SET(sock_, &rfds);
max_fd = (sock_ > max_fd ? sock_ : max_fd);
FD_SET(efd_, &rfds);
max_fd = (efd_ > max_fd ? efd_ : max_fd);
while (1) {
std::memcpy(&fds, &rfds, sizeof(fd_set));
if (select(max_fd + 1, &fds, NULL, NULL, NULL) == -1) {
throw "select() error";
}
if (FD_ISSET(sock_, &fds)) {
char buf[256];
memset(buf, 0, sizeof(buf));
recv(sock_, buf, sizeof(buf), 0);
std::cout << " "
<< "recv from socket: " << buf << std::endl;
}
if (FD_ISSET(efd_, &fds)) {
uint64_t num = 0;
read(efd_, &num, sizeof(num));
std::cout << " "
<< "recv from eventfd" << std::endl;
break;
}
}
}
void Worker::Start() {
std::cout << __PRETTY_FUNCTION__ << std::endl;
th_ = std::thread([this] { ThreadFunc(); });
}
void Worker::Shutdown() {
std::cout << __PRETTY_FUNCTION__ << std::endl;
uint64_t num = 1;
if (write(efd_, &num, sizeof(num)) == -1) {
std::cout << "efd_ = " << efd_ << std::endl;
throw "write() error";
}
}
void Worker::Join() {
std::cout << __PRETTY_FUNCTION__ << std::endl;
if (th_.joinable()) {
th_.join();
}
std::cout << "end" << std::endl;
}
int main(int argc, char const *argv[]) {
Worker worker("0.0.0.0", 10000);
worker.Start();
getchar();
std::cout << "getchar" << std::endl;
worker.Shutdown();
worker.Join();
return 0;
}
上記コードへデータを渡すための送信用コードを以下に示す。こちらはpythonで動作する
#!/usr/bin/env python3
import socket
import sys
msg = "message"
if len(sys.argv) > 1:
msg = sys.argv[1]
print("send : " + msg)
ip = "0.0.0.0"
port = 10000
addr = (ip, port)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.sendto(msg.encode(), addr)
ビルド方法
ファイルsample.cppが存在するディレクトリで以下を実行することでsample.exeが作成される
$ g++ sample.cpp -o sample.exe -pthread
実行手順
はじめにコンソールを2つ起動する。
コンソール1をsample.exe用、コンソール2をsender.py用とする
コンソール1上で下記コマンドを入力しsample.exeを起動する
$ ./sample.exe
コンソール2上で下記コマンドを入力しsender.pyを起動する
sender.pyは引数を与えると、その内容をsample.exeへ送信する
$ python sender.py <message>
最後にコンソール1上でEnterキーを押下するとsample.exeが終了する
以下に動作時のサンプルを示す
この例では左側をコンソール1,右側をコンソール2とする
注意点
※個人的にハマった点のメモ
selectに与える第1引数
selectに与える第1引数は監視する複数のfdの最大値に1を足した値であることに注意
もし1を足しわすれていても、見た目上は動作してしまうので問題が発生していることに気づくのが遅れる可能性がある(経験談)
eventfdに書き込むデータサイズ
eventfdへ書き込むデータは、はじめは終了タイミングを伝えるだけなので1バイトのデータを送信すれば良いと思っていた
しかしここに以下一文が書かれていることを発見した
渡されたバッファーの大きさが 8 バイト未満の場合、 read(2) はエラー EINVAL で失敗する。
以上理由からwriteやreadで渡すデータはuint64_t
の変数を渡すのが好ましいと思われる
参考ページ
-
c++ - Terminate thread c++11 blocked on r ead - Stack Overflow
- このページの質問者も私と同じような問題に悩んでいた。
- いくつか回答があったうちこの回答をみて今回の方針を決めた
-
selectを使う:Geekなぺーじ
- 「select socket c」で検索して出てきたページ。かなり参考にさせて頂いた。
-
Man page of SELECT
- selectについて書かれている日本語のページ
-
Man page of EVENTFD
- eventfdについて書かれている日本語のページ