序
昔はthreadとかがなくて(統一されてなくて)、select/pollを使って書かれたサーバーとかを良く見かけました。threadが統一され、Windowsが32bitになるとマルチスレッド全盛になってあまり見かけなくなり、nginxやnode.jsが出てきた辺りで一時期epollが持て囃されてた気がします。最近ではそんなepollの記事もメッキリ減りましたが、そろそろ私も少々使ってみるかとこんな記事を書いている次第です。
epollとは
pollの進化形で複数のfdのI/O待ちを同時に出来るAPI。パイプやソケット通信で良く使用されるもの。複数の通信相手からの入力にはスレッドを使うか非同期I/Oを使うが、後者に分類される技術。詳しくは以下から。
使ってみる
#include <iostream>
#include <exception>
#include <string>
#include <cstring>
#include <sys/epoll.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
using namespace std;
#define THROW_EXCEPTION(s) throw runtime_error(string(s ": ") + strerror(errno))
struct auto_close_fd {
int fd;
operator int() {return fd;}
auto_close_fd(int fd): fd(fd) {}
~auto_close_fd() {if (fd != -1) close(fd);}
};
int main() {
try {
auto_close_fd epfd = epoll_create1(0);
if (epfd < 0) THROW_EXCEPTION("epoll_create1 failed");
epoll_event ev;
ev.data.fd = STDIN_FILENO;
ev.events = EPOLLIN;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, STDIN_FILENO, &ev) < 0) THROW_EXCEPTION("epoll_ctl failed");
epoll_event evs[1];
auto len = epoll_wait(epfd, evs, sizeof(evs)/sizeof(evs[0]), 10000);
if (len < 0) THROW_EXCEPTION("epoll_wait failed");
else if (len == 0) cout << "timeout(10 sec)" << endl;
else {
if (len != 1) THROW_EXCEPTION("epoll_wait returned multiple fds");
if (evs[0].data.fd != STDIN_FILENO) THROW_EXCEPTION("epoll_wait returned wrong fd");
char buffer[10];
auto read_size = read(evs[0].data.fd, buffer, sizeof(buffer));
if (read_size < 0) THROW_EXCEPTION("read failed");
cout << "read: \"" << string(buffer, read_size) << "\"" << endl;
}
}
catch(exception& s) {
cerr << s.what() << endl;
return 1;
}
return 0;
}
何のことはない、標準入力から10バイト読んで出力するだけのプログラムです。ただ、10秒以上入力がないと勝手に終わります。epoll_
で始まるapi自体の説明はどこか他所で読んでください。
普通にreadするだけではスレッドなどを使わない限り、同時に10秒待ったりとかはできません。epollを使うと、待ってる間にread可能な状態になったfdだけを知ることが出来ます。これを使うことにより、何も入力のないfdをreadしてブロックされることなく、スレッドも使わずに他の処理をすることが出来るようになります。今回は何もせずに10秒waitしていますが...。apiから明らかなように同時に複数のfdを待つ事ができますが、自明なのでその説明はしません。
簡易イベントループを書いてみる
#include <iostream>
#include <vector>
#include <functional>
#include <string>
#include <chrono>
#include <map>
#include <unordered_map>
#include <utility>
#include <sys/eventfd.h>
#include <sys/epoll.h>
#include <unistd.h>
class file_descriptor {
int fd_;
public:
constexpr static int INVALID = -1;
file_descriptor(int fd = INVALID): fd_(fd) {}
file_descriptor(file_descriptor &&org): fd_(org.fd_) {org.fd_ = INVALID;}
file_descriptor(const file_descriptor &org) = delete;
file_descriptor &operator=(file_descriptor &&org) {
if (this != &org) {
this->~file_descriptor();
this->fd_ = org.fd_;
org.fd_ = INVALID;
}
return *this;
}
file_descriptor &operator=(const file_descriptor &org) = delete;
~file_descriptor() {
if (this->fd_ != INVALID)
close(this->fd_);
this->fd_ = INVALID;
}
operator int() {return this->fd_;}
};
class epoll {
file_descriptor fd_;
constexpr static size_t MAX_EVENTS = 5;
std::vector<epoll_event> events_;
epoll(const epoll &) = delete;
epoll(epoll &&) = delete;
auto &operator=(const epoll &) = delete;
auto &operator=(epoll &&) = delete;
int ctl(int fd, int op, uint32_t events) {
epoll_event ev;
ev.events = events;
ev.data.fd = fd;
return epoll_ctl(this->fd_, op, fd, &ev);
}
public:
epoll(): fd_(epoll_create1(0)), events_(MAX_EVENTS) {}
operator bool() {return static_cast<int>(this->fd_) != file_descriptor::INVALID;}
int ctl_add(int fd, uint32_t events){return ctl(fd, EPOLL_CTL_ADD, events);}
int ctl_mod(int fd, uint32_t events){return ctl(fd, EPOLL_CTL_MOD, events);}
int ctl_del(int fd, uint32_t events){return ctl(fd, EPOLL_CTL_DEL, events);}
int wait(int timeout) {
this->events_.resize(MAX_EVENTS);
auto len = epoll_wait(this->fd_, this->events_.data(), this->events_.size(), timeout);
this->events_.resize(len < 0 ? 0 : len);
return len;
}
const auto &events() const {return this->events_;}
};
class event_loop {
epoll ep_;
bool running_ = false;
file_descriptor fd_;
std::map<std::chrono::high_resolution_clock::time_point, std::function<void()>> time2fnc;
std::unordered_map<int, std::function<void(int)>> fd2func;
union buff8bytes {
char bytes[8];
uint64_t value;
};
void interrupt() {
buff8bytes buff;
buff.value = 1;
write(this->fd_, &buff.bytes[0], sizeof(buff.bytes));
// epoll_waitを止めるための特殊な書き込みなので結果を無視する
}
public:
event_loop(): fd_(eventfd(0, EFD_NONBLOCK | EFD_SEMAPHORE)) {
this->ep_.ctl_add(this->fd_, EPOLLIN);
}
auto set_timeout(std::function<void()> func, int delay_ms) {
using namespace std;
using namespace std::chrono;
auto t = high_resolution_clock::now() + milliseconds(delay_ms);
auto it = this->time2fnc.find(t);
while (it != this->time2fnc.end())
{
t += nanoseconds(1);
it = this->time2fnc.find(t);
}
this->time2fnc.insert(std::make_pair(t, func));
}
auto register_fd(int fd, std::function<void(int)> func) {
this->fd2func.insert(std::make_pair(fd, func));
this->ep_.ctl_add(fd, EPOLLIN);
}
auto loop() {
using namespace std;
using namespace std::chrono;
this->running_ = true;
while (this->running_) {
auto t = high_resolution_clock::now();
auto it = this->time2fnc.begin();
while (it != this->time2fnc.end()) {
if (it->first < t) {
auto func = it->second;
this->time2fnc.erase(it);
func();
} else {
break;
}
it = this->time2fnc.begin();
}
auto ms = (it == this->time2fnc.end()) ? -1 : duration_cast<milliseconds>(it->first - t).count();
auto len = this->ep_.wait(ms);
if (len > 0) {
for (const auto &e : this->ep_.events()) {
if (e.data.fd == this->fd_) {
buff8bytes buff;
read(e.data.fd, &buff.bytes[0], sizeof(buff.bytes));
// カウンタをクリアするための特殊な読み込みなので結果を無視する
} else {
this->fd2func[e.data.fd](e.data.fd);
}
}
}
}
}
void stop() {
this->running_ = false;
interrupt();
}
};
int main() {
using namespace std;
using namespace std::chrono;
event_loop loop;
function<void(int)> on_read_request = [&loop](int fd)->void {
constexpr int BUFF_SIZE = 10;
vector<char> buff(BUFF_SIZE);
auto len = read(fd, buff.data(), buff.size());
buff.resize(len < 0 ? 0 : len);
cout << "read(): ";
cout.write(buff.data(), buff.size());
cout << "\n" << endl;
if (string(buff.data(), buff.size()) == "stop\n") {
loop.stop();
}
};
loop.register_fd(STDIN_FILENO, on_read_request);
constexpr int WAIT_TIME_MS = 10000;
auto t = high_resolution_clock::now();
function<void()> on_timeout = [&loop,&on_timeout,&t]()->void {
cout << WAIT_TIME_MS << "ms!\n" << endl;
t = high_resolution_clock::now();
loop.set_timeout(on_timeout, WAIT_TIME_MS);
};
loop.set_timeout(on_timeout, WAIT_TIME_MS);
function<void()> on_draw = [&loop,&on_draw,&t]()->void {
static const char pre_line[] = {"\x1b[s\x1b[1A\r"};
static const char post_line[] = {"\x1b[u"};
static const char* rest_chars[] = {" ", "\u258F", "\u258E", "\u258D", "\u258C", "\u258B", "\u258A", "\u2589"};
auto t2 = high_resolution_clock::now();
auto ms = duration_cast<milliseconds>(t2-t).count();
constexpr int width100 = 80;
auto char8size = ms * width100 * 8 / WAIT_TIME_MS;
auto chars = char8size >> 3;
auto rest = char8size & ((1<<3)-1);
cout << pre_line;
for (int i = 0; i < chars; ++i)
cout << "\u2588";
cout << rest_chars[rest];
cout << ms << "[ms]";
cout << post_line;
cout.flush();
loop.set_timeout(on_draw, 0);
};
loop.set_timeout(on_draw, 0);
cout << endl << endl << "\x1b[1A";
cout.flush();
loop.loop();
}
標準入力から10バイトreadする間に、10秒間数える進捗表示をし、標準入力から"stop\n"を読み込めるまで続けるプログラムです。
epollを使ってevent loopを作成し、epoll_waitのタイムアウト設定を利用してsetTimeout機能を実現しています。実際に待っているのは前回同様標準入力と、epoll_waitを止めるための制御用特殊ファイルディスクリプタです。
待ち時間の間はタイマ0ms設定のsetTimeoutで無限に進捗表示をするようにしています。グラフ部分はUnicodeの特殊な文字で書いていて、同じ行を上書きするためにANSIエスケープシーケンスを直に使っています。なので端末やフォントによってはグラフが上手く表示できないものがあります。
まとめ
epoll(LT)を使って、簡易イベントループを書いてみました。入出力は同期I/Oのままだし、イベントキューもないので、なんちゃってイベントループですが、制御の雰囲気は伝わるかもしれません。
epollはETが機能的にpollと違う部分なのですが、今回は触れていません。気になる方は調べてみるといいかもです。あと昔epollが持て囃された理由は↓が詳しかったです。
古い(15年前)ので今も記事のとおりなのかは知りません…
おまけ ~ 本物のイベントループについて
V8エンジンなどで使用されているのは、libuvというマルチプラットフォームのライブラリです。
Linuxではepollが使われているそうです。今どきevent loopを真面目に実装するくらいならコチラを使いましょう。この記事でなんちゃってイベントループを書いたのは、あくまで筆者の学習のためです。