boost から独立して header-only ライブラリーになっている asio を使う。
ここからソースを入手して include するだけです。
boost 時代に微妙にビルドが必要だったのが不要になりました。
実験コード
vc2022
io_context 概念
タスクを asio::io_context に CompletionTokens を post して、別のところでで吸い出して実行する queue のようなもの。
io_service は io_context に改名された。
吸い出し方法はいろいろある
GUI のメインループと組み合わせやすいメソッドが揃っている。
run 系
ブロックする(新しい completion が post されるのを待機する)
- asio::io_context::run すべてのタスクが終わるまで
- asio::io_context::run_one 一個処理されるまで待つ
- asio::io_context::run_for timeout 付き
- asio::io_context::run_one_for
poll 系
ブロックしない(新しい completion が post されるのを待たない)。
- asio::io_context::poll そのときあるものを処理する。待たない
- asio::io_context::poll_one そのときあるものを一個だけ処理する。待たない
mainloop 非同期結果受け取り型
入力の結果(file stream, socket), 非同期イベント(timer, signal)などをメインスレッドで受け取るタイプ。
#include <asio.hpp>
#include <iostream>
int main() {
// 1
asio::io_context io;
// 2
asio::steady_timer timer(io);
timer.expires_after(std::chrono::seconds(1));
timer.async_wait(
[](asio::error_code ec) { std::cout << "expired" << std::endl; });
// 3
std::cout << "start..." << std::endl;
io.run();
std::cout << "end" << std::endl;
return 0;
}
task実行スレッドでタスクを実行するモデル
スレッドプールなどでタスクを実行させるタイプ。
strand
ひとつの io_context に対して複数のスレッドから run を実行することができる。
このときハンドラが同時に実行されないように mutex のような機能を提供する。
CompletionTokens
asio の非同期関数 async_xxx
は最後の引数を CompletionTokens
と言って、挙動をスイッチできる。
例えば asio::async_read の ReadToken 引数。
template 特殊化で挙動が変わる。
- callback // 完了時コールバック
- asio::detached // なげっぱなし。コールバックを省略する場合
- asio::use_future // 返り値に future を受ける
- asio::use_awaitable
- asio::experimental::use_coro
- asio::experimental::use_promise
Asynchronous Operations
timer
連続で使う場合は、expires_after 等で都度、時間を設定するべし
file 読み書き
path から作る
asio::stream_file
file descriptor
network
ipaddress
any
asio::ip::udp::v4() // 引数なし
文字列をパースする
auto addr = asio::ip::address_v4::from_string("127.0.0.1");
host名解決: asio::ip::tcp::resolver
callback 方式
asio::io_service io;
asio::ip::tcp::resolver resolver{io};
asio::ip::tcp::socket socket{io};
resolver.async_resolve(
{"www.boost.org", "http"}, [&](auto const &, asio::ip::tcp::resolver::iterator endpoint) {
std::cout << "resolve" << std::endl;
});
io.run();
future 方式
c++11
- callback のネストを回避できる
- io.run をスレッド実行するなどのブロック対策
asio::io_context io;
auto work = asio::make_work_guard(io); // 👈 io.run が仕事がなくても終わらないように
std::thread thread{[&io] { // io.run と resolve_future.get がともにブロックするので、スレッドで実行
io.run();
std::cout << "io end" << std::endl;
}};
// dns
asio::ip::tcp::resolver resolver{io};
auto resolve_future =
resolver.async_resolve({"www.boost.org", "http"}, asio::use_future);
// resolve_future.wait();
auto endpoint = resolve_future.get();
std::cout << "resolved" << std::endl;
work.reset();
thread.join();
socket
// 空の endpoint で初期するのが必用? asio::ip::udp::endpoint 引数を省略するとうまくいかなかった
asio::ip::udp::socket udp_socket(io, asio::ip::udp::endpoint(asio::ip::udp::v4(), 0));
// 送る
asio::ip::udp::endpoint ep(asio::ip::address::from_string("127.0.0.1"), 12345);
socket.async_send_to(asio::buffer(buffer), ep, asio::detached);
c++20 coroutine
c++20
coroutine memo
http get
を coroutine を使うように書き換えてみた。
callback の連鎖が co_await で平坦になってよさそう。
#include <asio.hpp>
#include <asio/co_spawn.hpp>
#include <asio/use_awaitable.hpp>
#include <iostream>
asio::awaitable<void> // 👈 asio の promise_type これを戻り値にすると中で co_await できる
co(const char* host, const char* service)
{
auto executor = co_await asio::this_coro::executor; // 👈 ~~suspendしない~~ resume 時に io_context を得る
asio::ip::tcp::resolver resolver(executor);
asio::ip::tcp::resolver::results_type endpoints =
co_await resolver.async_resolve(host, service, asio::use_awaitable); // 👈 最後の引数(completion token)に asio::use_awaitable を指定すると co_await できる版になる
for (auto& p : endpoints) {
std::cout << p.endpoint() << std::endl;
}
asio::ip::tcp::socket socket(executor);
co_await asio::async_connect(socket, endpoints, asio::use_awaitable);
asio::streambuf request;
std::ostream request_stream(&request);
request_stream << "GET / HTTP/1.0\r\n";
request_stream << "Host: " << host << "\r\n";
request_stream << "Accept: */*\r\n";
request_stream << "Connection: close\r\n\r\n";
std::cout << "GET..." << std::endl;
co_await asio::async_write(socket, request, asio::use_awaitable);
// Check that response is OK.
asio::streambuf response;
co_await asio::async_read_until(
socket, response, "\r\n", asio::use_awaitable);
{
if (response.size() > 0) {
std::cout << &response;
}
std::istream response_stream(&response);
std::string http_version;
response_stream >> http_version;
unsigned int status_code;
response_stream >> status_code;
std::string status_message;
std::getline(response_stream, status_message);
if (!response_stream || http_version.substr(0, 5) != "HTTP/") {
std::cout << "Invalid response\n";
co_return;
}
if (status_code != 200) {
std::cout << "Response returned with status code ";
std::cout << status_code << "\n";
// co_return;
}
}
// Process the response headers.
asio::async_read_until(socket, response, "\r\n\r\n", asio::use_awaitable);
std::istream response_stream(&response);
{
std::string header;
while (std::getline(response_stream, header) && header != "\r")
std::cout << header << "\n";
std::cout << "\n";
}
if (response.size() > 0) {
std::cout << &response;
}
// Start reading remaining data until EOF.
while (true) {
co_await asio::async_read(
socket, response, asio::transfer_at_least(1), asio::use_awaitable);
std::cout << &response;
}
}
int
main(int argc, char** argv)
{
asio::io_context io_context;
asio::co_spawn(io_context, co("think-async.com", "http"), asio::detached); // 👈 coroutine を開始する
io_context.run();
return 0;
}
coroutine フレームワークが一通り揃っていて、
非同期関数をサポートしている。
suspend_always 的なものが見つからなかった。
なにもせずに suspend するには 0秒タイマーなどで代用する。
co_await asio::steady_timer(executor, asio::chrono::seconds(0))
.async_wait(asio::use_awaitable);
std::function をスレッド実行して await する
を coroutine で書き直してみた。
もっと良い書き方はあると思うが、とりえず動いた。
#include <asio.hpp>
#include <asio/use_awaitable.hpp>
#include <iostream>
#include <thread>
// https://github.com/chriskohlhoff/asio/blob/master/asio/src/examples/cpp20/operations/callback_wrapper.cpp
template<typename T>
struct ThreadTask
{
// スレッド上で task を実行して結果を得る
// 結果を asio の継続(cb) に渡す(結果を io_context にエンキューする感じ?)
template<typename Callback>
static void Execute(const std::function<T()>& task, Callback cb)
{
std::thread([task, cb = std::move(cb)]() mutable {
std::move(cb)(task());
}).detach();
}
template<asio::completion_token_for<void(T)> CompletionToken>
static auto AsyncThredTask(const std::function<T()>& task,
CompletionToken token)
{
auto init = [task](asio::completion_handler_for<void(T)> auto handler) {
auto work = asio::make_work_guard(handler);
Execute(task,
[handler = std::move(handler),
work = std::move(work)](T result) mutable {
// Get the handler's associated allocator. If the handler
// does not specify an allocator, use the recycling
// allocator as the default.
auto alloc = asio::get_associated_allocator(
handler, asio::recycling_allocator<void>());
// Dispatch the completion handler through the handler's
// associated executor, using the handler's associated
// allocator.
asio::post(
work.get_executor(),
asio::bind_allocator(
alloc, [handler = std::move(handler), result]() mutable {
std::move(handler)(result);
}));
});
};
return asio::async_initiate<CompletionToken, void(T)>(init, token);
}
};
template<typename F>
decltype(auto)
Launch(F f)
{
using R = decltype(f());
return ThreadTask<R>::AsyncThredTask(f, asio::use_awaitable);
}
asio::awaitable<void>
task()
{
// auto io = co_await asio::this_coro::executor;
auto n = co_await Launch([]() {
std::cout << "[onThread] " << std::this_thread::get_id() << std::endl;
return 456;
});
std::cout << "[co_await] " << std::this_thread::get_id() << ": " << n
<< std::endl;
}
int
main(int argc, char** argv)
{
asio::io_context io;
asio::co_spawn(io, task(), asio::detached);
std::cout << "[main] run..." << std::endl;
io.run();
std::cout << "[main] done" << std::endl;
return 0;
}
asio 問題なく動くのだけど、はまるとわけがわからん(asio のコードが難解)という問題がある。
io, socket, timer は既存の部品を呼ぶだけ。
document