9
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

asio (non-Boost) を使う

Last updated at Posted at 2023-02-05

boost から独立して header-only ライブラリーになっている asio を使う。

ここからソースを入手して include するだけです。

boost 時代に微妙にビルドが必要だったのが不要になりました。

c++23 以降の標準ライブラリとして準備しているらしい

Working Draft, C++ Extensions for Networking (N4734)

紛糾しているらしい

[C++] ExecutorとNetworking TSで起きていたこと

実験コード

vc2022

io_context 概念

タスクを asio::io_context に CompletionTokens を post して、別のところでで吸い出して実行する queue のようなもの。

io_service は io_context に改名された。

https://amedama1x1.hatenablog.com/entry/2016/08/20/222326

吸い出し方法はいろいろある

GUI のメインループと組み合わせやすいメソッドが揃っている。

run 系

ブロックする(新しい completion が post されるのを待機する)

poll 系

ブロックしない(新しい completion が post されるのを待たない)。

mainloop 非同期結果受け取り型

入力の結果(file stream, socket), 非同期イベント(timer, signal)などをメインスレッドで受け取るタイプ。

#include <asio.hpp>
#include <iostream>

int main() {
  // 1
  asio::io_context io;

  // 2
  asio::steady_timer timer(io);
  timer.expires_after(std::chrono::seconds(1));
  timer.async_wait(
      [](asio::error_code ec) { std::cout << "expired" << std::endl; });

  // 3
  std::cout << "start..." << std::endl;
  io.run();
  std::cout << "end" << std::endl;

  return 0;
}

task実行スレッドでタスクを実行するモデル

スレッドプールなどでタスクを実行させるタイプ。

strand

ひとつの io_context に対して複数のスレッドから run を実行することができる。
このときハンドラが同時に実行されないように mutex のような機能を提供する。

CompletionTokens

asio の非同期関数 async_xxx は最後の引数を CompletionTokens と言って、挙動をスイッチできる。

例えば asio::async_read の ReadToken 引数。

template 特殊化で挙動が変わる。

  • callback // 完了時コールバック
  • asio::detached // なげっぱなし。コールバックを省略する場合
  • asio::use_future // 返り値に future を受ける
  • asio::use_awaitable
  • asio::experimental::use_coro
  • asio::experimental::use_promise

Asynchronous Operations

timer

連続で使う場合は、expires_after 等で都度、時間を設定するべし

file 読み書き

path から作る
asio::stream_file

file descriptor

network

ipaddress

any

asio::ip::udp::v4() // 引数なし

文字列をパースする

auto addr = asio::ip::address_v4::from_string("127.0.0.1");

host名解決: asio::ip::tcp::resolver

callback 方式
    asio::io_service io;
    asio::ip::tcp::resolver resolver{io};
    asio::ip::tcp::socket socket{io};
    resolver.async_resolve(
        {"www.boost.org", "http"}, [&](auto const &, asio::ip::tcp::resolver::iterator endpoint) {
          std::cout << "resolve" << std::endl;
    });
   io.run();
future 方式

c++11

  • callback のネストを回避できる
  • io.run をスレッド実行するなどのブロック対策
    asio::io_context io;
    auto work = asio::make_work_guard(io); // 👈 io.run が仕事がなくても終わらないように
    std::thread thread{[&io] { // io.run と resolve_future.get がともにブロックするので、スレッドで実行
      io.run();
      std::cout << "io end" << std::endl;
    }};

    // dns
    asio::ip::tcp::resolver resolver{io};

    auto resolve_future =
        resolver.async_resolve({"www.boost.org", "http"}, asio::use_future);

    // resolve_future.wait();
    auto endpoint = resolve_future.get();
    std::cout << "resolved" << std::endl;

    work.reset();
    thread.join();

socket

// 空の endpoint で初期するのが必用? asio::ip::udp::endpoint 引数を省略するとうまくいかなかった
asio::ip::udp::socket udp_socket(io, asio::ip::udp::endpoint(asio::ip::udp::v4(), 0));
// 送る
asio::ip::udp::endpoint ep(asio::ip::address::from_string("127.0.0.1"), 12345);
socket.async_send_to(asio::buffer(buffer), ep, asio::detached);

c++20 coroutine

c++20

coroutine memo

http get

を coroutine を使うように書き換えてみた。
callback の連鎖が co_await で平坦になってよさそう。

#include <asio.hpp>
#include <asio/co_spawn.hpp>
#include <asio/use_awaitable.hpp>
#include <iostream>

asio::awaitable<void> // 👈 asio の promise_type これを戻り値にすると中で co_await できる
co(const char* host, const char* service)
{
  auto executor = co_await asio::this_coro::executor; // 👈 ~~suspendしない~~ resume 時に io_context を得る
  asio::ip::tcp::resolver resolver(executor);
  asio::ip::tcp::resolver::results_type endpoints =
    co_await resolver.async_resolve(host, service, asio::use_awaitable); // 👈 最後の引数(completion token)に asio::use_awaitable を指定すると co_await できる版になる
  for (auto& p : endpoints) {
    std::cout << p.endpoint() << std::endl;
  }

  asio::ip::tcp::socket socket(executor);
  co_await asio::async_connect(socket, endpoints, asio::use_awaitable);

  asio::streambuf request;
  std::ostream request_stream(&request);
  request_stream << "GET / HTTP/1.0\r\n";
  request_stream << "Host: " << host << "\r\n";
  request_stream << "Accept: */*\r\n";
  request_stream << "Connection: close\r\n\r\n";
  std::cout << "GET..." << std::endl;
  co_await asio::async_write(socket, request, asio::use_awaitable);

  // Check that response is OK.
  asio::streambuf response;
  co_await asio::async_read_until(
    socket, response, "\r\n", asio::use_awaitable);
  {
    if (response.size() > 0) {
      std::cout << &response;
    }
    std::istream response_stream(&response);
    std::string http_version;
    response_stream >> http_version;
    unsigned int status_code;
    response_stream >> status_code;
    std::string status_message;
    std::getline(response_stream, status_message);
    if (!response_stream || http_version.substr(0, 5) != "HTTP/") {
      std::cout << "Invalid response\n";
      co_return;
    }
    if (status_code != 200) {
      std::cout << "Response returned with status code ";
      std::cout << status_code << "\n";
      // co_return;
    }
  }

  // Process the response headers.
  asio::async_read_until(socket, response, "\r\n\r\n", asio::use_awaitable);
  std::istream response_stream(&response);
  {
    std::string header;
    while (std::getline(response_stream, header) && header != "\r")
      std::cout << header << "\n";
    std::cout << "\n";
  }
  if (response.size() > 0) {
    std::cout << &response;
  }

  // Start reading remaining data until EOF.
  while (true) {
    co_await asio::async_read(
      socket, response, asio::transfer_at_least(1), asio::use_awaitable);
    std::cout << &response;
  }
}

int
main(int argc, char** argv)
{
  asio::io_context io_context;

  asio::co_spawn(io_context, co("think-async.com", "http"), asio::detached); // 👈 coroutine を開始する

  io_context.run();

  return 0;
}

coroutine フレームワークが一通り揃っていて、
非同期関数をサポートしている。

suspend_always 的なものが見つからなかった。
なにもせずに suspend するには 0秒タイマーなどで代用する。

co_await asio::steady_timer(executor, asio::chrono::seconds(0))
        .async_wait(asio::use_awaitable);

std::function をスレッド実行して await する

を coroutine で書き直してみた。

もっと良い書き方はあると思うが、とりえず動いた。

#include <asio.hpp>
#include <asio/use_awaitable.hpp>
#include <iostream>
#include <thread>

// https://github.com/chriskohlhoff/asio/blob/master/asio/src/examples/cpp20/operations/callback_wrapper.cpp
template<typename T>
struct ThreadTask
{
  // スレッド上で task を実行して結果を得る
  // 結果を asio の継続(cb) に渡す(結果を io_context にエンキューする感じ?)
  template<typename Callback>
  static void Execute(const std::function<T()>& task, Callback cb)
  {
    std::thread([task, cb = std::move(cb)]() mutable {
      std::move(cb)(task());
    }).detach();
  }

  template<asio::completion_token_for<void(T)> CompletionToken>
  static auto AsyncThredTask(const std::function<T()>& task,
                             CompletionToken token)
  {
    auto init = [task](asio::completion_handler_for<void(T)> auto handler) {
      auto work = asio::make_work_guard(handler);

      Execute(task,
              [handler = std::move(handler),
               work = std::move(work)](T result) mutable {
                // Get the handler's associated allocator. If the handler
                // does not specify an allocator, use the recycling
                // allocator as the default.
                auto alloc = asio::get_associated_allocator(
                  handler, asio::recycling_allocator<void>());

                // Dispatch the completion handler through the handler's
                // associated executor, using the handler's associated
                // allocator.
                asio::post(
                  work.get_executor(),
                  asio::bind_allocator(
                    alloc, [handler = std::move(handler), result]() mutable {
                      std::move(handler)(result);
                    }));
              });
    };

    return asio::async_initiate<CompletionToken, void(T)>(init, token);
  }
};

template<typename F>
decltype(auto)
Launch(F f)
{
  using R = decltype(f());
  return ThreadTask<R>::AsyncThredTask(f, asio::use_awaitable);
}

asio::awaitable<void>
task()
{
  // auto io = co_await asio::this_coro::executor;
  auto n = co_await Launch([]() {
    std::cout << "[onThread] " << std::this_thread::get_id() << std::endl;
    return 456;
  });
  std::cout << "[co_await] " << std::this_thread::get_id() << ": " << n
            << std::endl;
}

int
main(int argc, char** argv)
{
  asio::io_context io;

  asio::co_spawn(io, task(), asio::detached);

  std::cout << "[main] run..." << std::endl;
  io.run();
  std::cout << "[main] done" << std::endl;

  return 0;
}

asio 問題なく動くのだけど、はまるとわけがわからん(asio のコードが難解)という問題がある。
io, socket, timer は既存の部品を呼ぶだけ。

document

9
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?