ファイルやネットワークなどリソースの変化など POSIX の IO や特定時間の経過などをイベントとしてあつかえるようにするライブラリー。
epoll やなんかがやることを隠蔽してくれるので低レベルな処理を書きたくないひとにオススメ。
公式ドキュメントはとにかく長いけど、次だけ読めば問題ない。その他は必要なときに必要なところをつまみ読む感じで。って WHAT TO READ WHEN IN A HURRY に書いてある。
- ANATOMY OF A WATCHER
- EXAMPLE PROGRAM
- GLOBAL FUNCTIONS
- WATCHER TYPES の ev_io と ev_timer の項
libev は libevent を参考に作られていて、スピードが改善されているらしい。
それ以外にも C++ インターフェイスが整備されていてグローバル変数を使わずに書けたりするので特に理由がないかぎり libev でいいと思う。
歴史的な経緯は libevent の次のページに書いてあるから興味あればみてみるとよいかも。
ざっくり次の問題点を解決するために libevent が作られて、その高速化で libev という流れらしい。
- select, poll, epoll, kqueue, evports, /dev/poll など似たようなのがいっぱいあってどれを使えばいいのかわかりづらい
- 環境ごとに使えるものに差がある
- ファイルディスクリプターなどの低級な概念をそのまま扱いたくない
以降、公式ドキュメントをざざっとまとめてみた。
event loop
libev はイベントループの一種ということらしい。普段待機でイベントが起きたら処理、をぐるぐる繰り返す。
libev は同時にひとつ以上のイベントループを管理できる。
- イベントループ作成
- イベントループにイベント種類とコールバック関数を対にしたものを追加
という流れ。
watcher
libev には何らかのイベントの発火を監視する単位として watcher という概念がある。けど、一般的にイベントハンドラーと呼ばれるものと同じ。微妙に違うかもしれないけど気にしない。
次の型のどれかを実体化したものを watcher として扱う。
型名 | 説明 |
---|---|
ev_io | ファイルディスクリプター |
ev_stat | ファイルの状態、 Linux inotify を使っている |
ev_async | 自前イベント、 Linux eventfd を使っている |
ev_signal | 同期シグナル、 Linux signalfd を使っている |
ev_timer | 3 秒後など相対的な時間 |
ev_periodic | 日曜日の 24:00 など絶対的な時間 |
ev_child | プロセスの状態 |
ev_idle | 他に動作するwatcherがない場合に発火 |
ev_embed | epoll, kqueue など使用するバックエンドの指定 |
ev_prepare | watcher動作の直前に実行する処理の指定 |
ev_check | watcher動作の直後に実行する処理の指定 |
ev_fork | fork の限定サポート |
watcher が実体化した際に使用するメモリーは自分で管理する必要がある。
ある watcher に対応するイベントを発生させなくても、強制的に動作させることもできる。
状態遷移
watcher は次のうち、どれかの状態になる。
- initialized
- イベントループに登録される前の状態
- initialized になるにはイベントが発火した際に動作するコールバック関数の設定が必須
- active / running / active
- イベント発火を待っている最中の状態
- ドキュメントに書かれている方法を除いて、読み書き・移動・解放できない
- pending
- active で監視対象のイベントが発火された後の状態
- stopped になるか、コールバック関数が実行される直前までは pending
- ev_io とか繰り返し使用できる watcher は pending からまた active に移行できる
- 1 回限りの ev_timer は動作後 pending になるけど active には移行できない
- 読み書きはできるけど、イベントループに紐付いたままなので移動・解放・再使用はできない
- stopped
- libev が自動的に stopped にする
- 同時に pending にもなったりする
- 明示的に stopped にできる
- 明示的に stopped にすると強制的に pending を解除する
- なんかの拍子でコールバック関数が呼ばれることがなくなるから watcher 解放前には明示的に停止させたほうがよいとのこと
- stopped は initialized と同じなので再使用・移動・変更が可能
- libev が自動的に stopped にする
優先度
watcher には優先度をつけられる。具体的な優先度の範囲は定数 EV_MINPRI ~ EV_MAXPRI まで。
基本的に watcher に設定された優先度が高い方からコールバック関数が実行されていく。
で、 libev は優先度を取り扱うときの方法として次の 2 つを用意しているらしい。
lock-out model
高い優先度を持つものが低い優先度のものを締め出す = lock-out
- 一番高い優先度の watcher だけコールバック関数を実行できる
- それ以外の優先度の watcher は何もできない
この model は kernel の実装がアレで非効率とか書かれてる。
- イベント A, B が発生
- イベント A を監視している watcher のうち優先度が最高のものを駆動
- イベント A を監視している watcher のうち次の優先度のものを駆動
- 以下同様
only-for-ordering
優先度が高い watcher から順にコールバック関数を実行していくだけ = only-for-ordering
- 一番高い優先度の watcer がコールバック関数を実行
- その次に高い優先度の watcher がコールバック関数を実行
- 以下発火したイベントに対応する watcher が存在すれば優先度順に実行されていく
何がうれしいかというと、たとえば ev_io watcher でデータを受けとって ev_timer watcher でタイムアウトしてないかどうかを見張るときなんかに次のイヤなパターンが発生する可能性がある。
- プログラムが高負荷時で読みにいけないときにデータを受信する
- 読めないままタイマーが動作する
- データが届いているのにタイムアウトになってしまう
この場合はタイマーの優先度を低くしておけば先にデータを確認してくれるようにできる。
model の使い分け
libev では ev_idle だけが lock-out model でその他が only-for-ordering model を使うらしい。
ev_idle が発火するタイミングは ev_idle より優先度の高い watcher がいない場合。
で、全体を lock-out model で処理したい場合は ev_idle watcher で全部受けとって内部で切り分けるということをすればいいらしい。けど意味あんのかこれ。
C++ bindings
C 言語用のライブラリーだけど C++ 用のインターフェイスも同梱している。
- ev++.h を include することで使用可能。
- 例外は watcher のコールバック関数からのみ throw 可能。
- ev 名前空間の下にいろいろ定義されてる
優先度変更
優先度をいじるインターフェイスがないので自分で書いてみた。
template<typename T> constexpr inline T pointer_cast(void* const p) noexcept {
return static_cast<T>(p);
}
template<typename T> constexpr inline T pointer_cast(const void* const p) noexcept {
return static_cast<T>(p);
}
/*
* ev::TYPE priority accessors
* getter
* int priority(ev_TYPE* watcher);
* setter
* void priority(ev_TYPE* watcher, int priority);
* */
template <typename T> inline constexpr int priority(const T& watcher) noexcept;
template <typename T> inline void priority(T& watcher, const int priority) noexcept;
#define DEFINE_PRIORITY_GETTER(t) \
template <> inline constexpr int priority(const ev::t& watcher) noexcept { \
return ev_priority(pointer_cast<const ev_ ## t*>(&watcher)); \
}
#define DEFINE_PRIORITY_SETTER(t) \
template <> inline void priority(ev::t& watcher, const int priority) noexcept { \
ev_set_priority(pointer_cast<ev_ ## t*>(&watcher), priority); \
}
#define DEFINE_PRIORITY_ACCESSOR(t) \
DEFINE_PRIORITY_GETTER(t) \
DEFINE_PRIORITY_SETTER(t)
DEFINE_PRIORITY_ACCESSOR(io)
DEFINE_PRIORITY_ACCESSOR(timer)
io と timer 以外に欲しければ適当に定義しておくんなさい。
サンプル
全部 C++ で書いてみたもの。
tail -f もどき
/*
* tailike.cpp
* tail -f with libev
*
* g++ -std=c++11 -Wall -Wextra -pedantic -lev tailike.cpp
*
* written by janus_wel<janus.wel.3@gmail.com>
*
* These codes are licensed under the MIT license
* http://opensource.org/licenses/MIT
* */
#include <fcntl.h> // open
#include <string.h> // strerror
#include <unistd.h> // read
#include <ev++.h>
#include <array>
#include <iostream>
#include <stdexcept>
#include <string>
constexpr std::size_t BUFFER_SIZE = 4096;
class file_descriptor_t {
private:
int file_descriptor_;
public:
explicit file_descriptor_t(const char* const file_name)
: file_descriptor_(open(file_name, O_RDONLY)) {
if (file_descriptor_ == -1) {
throw std::runtime_error(strerror(errno));
}
}
explicit file_descriptor_t(const std::string& file_name)
: file_descriptor_t(file_name.c_str()) { }
~file_descriptor_t() noexcept {
close(file_descriptor_);
}
inline int raw() const noexcept {
return file_descriptor_;
}
inline operator int() const noexcept {
return this->raw();
}
};
class callback_t {
public:
typedef std::array<char, BUFFER_SIZE> buffer_t;
private:
buffer_t buffer_;
file_descriptor_t file_descriptor_;
public:
callback_t(const char* file_path)
: file_descriptor_(file_path) { }
void read(ev::stat& w, int) {
if (!w.attr.st_nlink) {
return;
}
const auto read_bytes = ::read(file_descriptor_, buffer_.data(), buffer_.size() - 1);
if (read_bytes == -1) {
// error occured when read
return;
}
if (read_bytes == 0) {
// fd is closed?
return;
}
buffer_[read_bytes] = '\0';
std::cout << buffer_.data() << std::flush;
}
};
int main(const int argc, const char* const argv[]) {
if (argc < 2) {
std::cerr
<< "specify file to tail"
<< std::endl;
}
const char* const file_path = argv[1];
try {
callback_t callback(file_path);
ev::default_loop loop;
ev::stat watcher(loop);
watcher.set<callback_t, &callback_t::read>(&callback);
watcher.set(file_path, 0);
watcher.start();
loop.run(0);
return 0;
}
catch (const std::runtime_error& e) {
std::cerr
<< e.what()
<< std::endl;
return 1;
}
}
timer
/*
* timer.cpp
* timer sample of libev
*
* g++ -std=c++11 -Wall -Wextra -pedantic -lev timer.cpp
*
* written by janus_wel<janus.wel.3@gmail.com>
*
* These codes are licensed under the MIT license
* http://opensource.org/licenses/MIT
* */
#include <ev++.h>
#include <iostream>
class callback_t {
private:
double timeout_;
public:
callback_t(const double& timeout)
: timeout_(timeout) { }
void awake(ev::timer& timer, int) noexcept {
std::cout
<< "timer: " << timeout_ << "sec"
<< std::endl;
timer.set(timeout_);
timer.start();
}
};
int main(const int, const char* const []) {
// timer settings
constexpr double timeout1 = 0.500000;
constexpr double timeout2 = 0.700000;
// loop
ev::default_loop loop;
// generate and setup timers
ev::timer timer1(loop);
callback_t callback1(timeout1);
timer1.set<callback_t, &callback_t::awake>(&callback1);
timer1.set(timeout1);
timer1.start();
ev::timer timer2(loop);
callback_t callback2(timeout2);
timer2.set<callback_t, &callback_t::awake>(&callback2);
timer2.set(timeout2);
timer2.start();
// run
loop.run(0);
return 0;
}
echo server
/*
* echo-server.cpp
* echo server with libev
*
* g++ -std=c++11 -Wall -Wextra -pedantic -lev timer.cpp
*
* telnet localhost 40713
*
* written by janus_wel<janus.wel.3@gmail.com>
*
* These codes are licensed under the MIT license
* http://opensource.org/licenses/MIT
* */
#include <ev++.h>
#include <arpa/inet.h>
#include <errno.h>
#include <netdb.h>
#include <netinet/in.h>
#include <string.h>
#include <sys/socket.h>
#include <unistd.h>
#include <array>
#include <iomanip>
#include <iostream>
#include <map>
#include <memory>
#include <stdexcept>
constexpr unsigned int BUFFER_SIZE = 4096;
constexpr unsigned short int SERVER_PORT = 40713;
constexpr int NUMOF_BACKLOG = 16;
constexpr int PROTOCOL = 0;
template<typename T> constexpr inline T pointer_cast(void* const p) noexcept {
return static_cast<T>(p);
}
template<typename T> constexpr inline T pointer_cast(const void* const p) noexcept {
return static_cast<T>(p);
}
class socket_t {
private:
const sa_family_t family_;
const int socket_;
private:
static constexpr int ERROR_CODE = -1;
public:
socket_t(const sa_family_t family, const int type, const int protocol)
: family_(family),
socket_(socket(family, type, protocol)) {
if (socket_ == socket_t::ERROR_CODE) {
throw std::runtime_error(strerror(errno));
}
}
~socket_t() noexcept {
close(socket_);
}
inline int raw() const noexcept {
return socket_;
}
inline operator int() const noexcept {
return this->raw();
}
void bind(const uint16_t port) {
sockaddr_in socket_address;
socket_address.sin_family = family_;
socket_address.sin_addr.s_addr = INADDR_ANY;
socket_address.sin_port = htons(port);
const auto result = ::bind(
socket_,
pointer_cast<sockaddr*>(&socket_address),
sizeof(socket_address));
if (result == socket_t::ERROR_CODE) {
throw std::runtime_error(strerror(errno));
}
}
void listen(const int backlog) {
const auto result = ::listen(socket_, backlog);
if (result == socket_t::ERROR_CODE) {
throw std::runtime_error(strerror(errno));
}
}
};
class connection_t;
typedef std::map<int, std::unique_ptr<connection_t>> connections_t;
connections_t connections;
class connection_t {
private:
unsigned int id_;
int descriptor_;
std::array<char, BUFFER_SIZE> buffer;
ev::io read_watcher;
ev::io write_watcher;
ssize_t received_bytes;
public:
connection_t(const unsigned int id, const int descriptor)
: id_(id), descriptor_(descriptor) {
read_watcher.set<connection_t, &connection_t::read>(this);
read_watcher.set(descriptor, ev::READ);
write_watcher.set<connection_t, &connection_t::write>(this);
write_watcher.set(descriptor, ev::WRITE);
}
void start() noexcept {
read_watcher.start();
}
void close() noexcept {
::close(this->descriptor_);
connections.erase(connections.find(this->id_));
std::cout
<< connections.size() << " connections active"
<< std::endl;
}
void read(ev::io& w, int) {
std::cout
<< "read()"
<< std::endl;
ssize_t result = recv(w.fd, buffer.data(), buffer.size(), 0);
std::cout
<< std::setw(4) << " " << result << " bytes received"
<< std::endl;
if (result == -1) {
if (errno == EAGAIN) {
return;
}
throw std::runtime_error(strerror(errno));
}
if (result == 0) {
std::cout
<< std::setw(4) << " " << "disconnected by peer"
<< std::endl;
w.stop();
this->close();
return;
}
received_bytes += result;
//w.stop();
write_watcher.start();
}
void write(ev::io& w, int) {
std::cout
<< "write()"
<< std::endl;
ssize_t sent_bytes = 0;
while (sent_bytes < received_bytes) {
ssize_t result = send(
w.fd,
&buffer[sent_bytes],
received_bytes - sent_bytes,
0);
std::cout
<< std::setw(4) << " " << result << " bytes sent"
<< std::endl;
if (result == -1) {
if (errno == EAGAIN) {
return;
}
throw std::runtime_error(strerror(errno));
}
sent_bytes += result;
}
received_bytes = 0;
w.stop();
//this->close();
}
};
class host_t {
private:
std::string hostname_;
std::string ip_address_;
private:
static constexpr int SUCCESS_CODE = 0;
public:
const char* hostname() const noexcept {
return hostname_.c_str();
}
const char* ip_address() const noexcept {
return ip_address_.c_str();
}
public:
explicit host_t(const char* hostname, const sa_family_t family) {
addrinfo* address_info;
addrinfo hints;
hints.ai_family = family;
const auto result = getaddrinfo(hostname, nullptr, &hints, &address_info);
if (result != host_t::SUCCESS_CODE) {
throw std::runtime_error(gai_strerror(result));
}
hostname_ = address_info->ai_canonname;
ip_address_ = inet_ntoa(pointer_cast<sockaddr_in*>(address_info->ai_addr)->sin_addr);
freeaddrinfo(address_info);
}
explicit host_t(const sockaddr_in& socket_address) {
std::array<char, NI_MAXHOST> buffer;
const auto result = getnameinfo(
pointer_cast<const sockaddr*>(&socket_address), sizeof(socket_address),
buffer.data(), buffer.size(),
nullptr, 0,
NI_NAMEREQD);
if (result != host_t::SUCCESS_CODE) {
throw std::runtime_error(gai_strerror(result));
}
hostname_ = buffer.data();
ip_address_ = inet_ntoa(socket_address.sin_addr);
}
};
class callback_t {
private:
unsigned int count_;
public:
callback_t() : count_(0) { }
void operator() (ev::io& w, int) {
sockaddr_in peer_socket_address;
socklen_t peer_socket_address_length = sizeof(peer_socket_address);
const auto descriptor = accept(
w.fd,
pointer_cast<sockaddr*>(&peer_socket_address),
&peer_socket_address_length);
if (descriptor == -1) {
throw std::runtime_error(strerror(errno));
}
host_t host(peer_socket_address);
std::cout
<< host.hostname() << " " << host.ip_address()
<< std::endl;
const auto id = ++count_;
connections[id] = connections_t::mapped_type(new connection_t(id, descriptor));;
connections[id]->start();
std::cout
<< connections.size() << " connections active"
<< std::endl;
}
};
int main(const int, const char* const []) {
socket_t socket(AF_INET, SOCK_STREAM, PROTOCOL);
socket.bind(SERVER_PORT);
socket.listen(NUMOF_BACKLOG);
ev::default_loop loop;
ev::io watcher(loop);
callback_t callback;
watcher.set(&callback);
watcher.set(socket, ev::READ);
watcher.start();
loop.run(0);
return 0;
}