1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

新しくなったZigのstd.Ioを使って遊ぶ

1
Posted at

はじめに

Zig 0.15.1Writergateと称してI/O周りのAPIに抜本的に手が入り、Zig 0.16.0でついに念願の非同期APIが実装された。
ここでは、この非同期APIを使って遊んでみた記録。

Zig 0.15.1のWritergate

Zig 0.15.1のAPIはリリースノートの一文を超訳すると

  • Reader/Writeranytypeを受け入れるジェネリック関数として定義されていてイケてない
  • 下位ストリームから返されたエラーを透過的に引き継いでてイケてない
  • BufferedReader / BufferedWriterに依存してた
    https://ziglang.org/download/0.15.1/release-notes.html#Motivation

これらに抜本的に対策するためにstd.Ioインターフェースを用意され、Reader/Writerについて、このインターフェースを介するように変更された。

具体的には、例えばstd.fs.File.Readerの場合

旧API

pub const DeprecatedReader = std.io.GenericReader(File, ReadError, read);

新API

pub const Reader = struct {
    file: File,
    err: ?ReadError = null,
    mode: Reader.Mode = .positional,
    pos: u64 = 0,
    size: ?u64 = null,
    size_err: ?SizeError = null,
    seek_err: ?Reader.SeekError = null,
    interface: std.Io.Reader, // Io.Readerインターフェースを保持
    ....

Zig 0.16.0での変更

  • std.fsにあった型のほぼ全てがstd.Io下に移行された
    • std.fs.path名前空間くらいしか残ってない
  • 大半のファイルI/O APIがstd.Ioインターフェースを引数として要求されるようになった 1

具体的には、例えばstd.Io.Dir.closeだと

std.Io.Dir.close(dir: std.Io.Dir, io: std.Io) void { ... }

std.Ioプリミティブ

std.Ioを引数に渡す必要があるのはいいとして、どうやって初期化するのか?
その答えば、Zig 0.16.0で通称Juicy mainと呼ばれるmain()の引数にstd.process.Initが渡され、そこから所得できるようになっている。

pub const Init = struct {
    minimal: Minimal,
    arena: *std.heap.ArenaAllocator,
    gpa: Allocator,
    io: Io, // ココ
    environ_map: *Environ.Map,
    preopens: Preopens,
    ...

蛇足としてstd.process.Initの型を見るとわかるように、ArenaAllocatorAllocatorも初期化して渡されるようになり、
従来のmain()の先頭で、ArenaAllocatorを初期化が不要になった。
Allocatorの引き回しは変わらず必要ではあるが。

閑話休題。
Zig 0.16.0以降のスタイルとして、Allocatorに加えてIoも引き回すことにはなる。

このIoはビルドオプションによって切り替わる。
zig build-exeの以下のオプション

$ zig build-exe -h | grep single-thread
  -fsingle-threaded         Code assumes there is only one thread
  -fno-single-threaded      Code may not assume there is only one thread

もしくはstd,Build.Module

single_threaded: ?bool,

を指定する。
規定値( = null)の場合、multi-threaded IOとして初期化される。2

従来の引数なしのmain()の場合はどうするのかというと、
std.Io.Threaded.init(...)で直接初期化。
その上でfn io(t: *Threaded) Ioを呼んで取得する。

またIoプリミティブのファミリーとして、std.Io.Eventedも追加されている。
std.Io.Eventedを使うことでN:Mスレッド(グリーンスレッド)が実現できるとのこと。
Zig 0.16.0のリリースノートによると、そこそこテストは通してはいるがWork in progressとのこと。
https://ziglang.org/download/0.16.0/release-notes.html#toc-IO-as-an-Interface

ユニットテストの場合

std.testing.ioから取得できる。

build.zigにおけるビルド構成の場合

std.Build.Graphioフィールドを持っている。
具体的には、以下のように取得:

pub fn build(b: *std.Build) void {
    const io = b.graph.io;
    ...
}

std.Ioタスク

Zig 0.16.0で提供されるIoタスクは以下

  • Future
    • 関数ベースの非同期タスク
    • std.Io.async()またはstd.Io.concurrent()の発行で生成される
    • await()の発行で実行し、終了するまで待機する(スレッドを明け渡す)
  • Group
    • async(...)もしくはconcurrent(...)割り当てられた非同期タスクをawait()の発行で実行し、全てのタスクの完了まで待機する
    • タスクグループのキャンセルもcancel()によって行える
    • JavascriptPromise.all()を思い浮かべるとわかりやすいかも
  • Select
    • async(...)もしくはconcurrent(...)割り当てられた非同期タスクをawait()の発行で実行し、いずれか1つのタスクの完了まで待機する
    • タスクグループのキャンセルもcancel()によって行える
    • awaitMany()も用意されており、待機するタスク数を指定できる
      • 0でもOK (完了通知を受けた分を拾うだけで待機はしない)
    • JavascriptPromise.any()を思い浮かべるとわかりやすいかも
  • Batch
    • 並行的にタスクを実行する低レイヤーAPI
    • 扱いが難しく、まだ把握しきれてないので割愛

これらに加えて、ランダムシードの生成や時刻に関するAPIなども置かれてかなり巨大な代物となっている。

asyncとconcurrent

タスクの生成関数としてasync(...)concurrent(...)が提供されている。
これらの違いは

  • async スケジューリングの調整を目的としており、async(...)の呼び出しでは必ずしも実行されることは保証されてない
    • std.Ioの規定値であるstd.Io.Threadedでは、std.Io.Threaded.async_limitで決定されている
      • std.process.Initから提供されるこの値の規定値は論理CPU数-1
    • この値未満はスケジューリングされる
    • この値を超えると現スレッドで実行される
      • 回避するためにはタスク数 >= CPU数の場合にconcurrent()を代わりに呼び出す必要がある
  • concurrent concurrent(...)の呼び出しは実行も保証する
    • std.Io.Threadedでは、std.Io.Threaded.concurrent_limitで決定されている
      • std.process.Initから提供されるこの値の規定値は上限なし
    • この値を超えた場合、ConcurrencyUnavailableエラーが返される
      • 規定値は上限なしなので際限なくスレッドが作られる
    • stackful coroutineを構築するために内部的にヒープアロケーションが行われる
      • エラーを返す可能性がある

これは、std.Io.Threadedを使用する場合の全てのIOタスク(Group, Select等)に作用する。

使用例

std.Io.Selectを使用して擬似的にタスクのFire and forgetを実現してみた。

fn DetachedTaskReaper(comptime buffer_size: comptime_int) type {
    return struct {
        /// std.Io.Selectの非同期キューに使用されるバッファ
        buffer: [buffer_size]TaskResult = undefined,
        tasks: std.Io.Select(TaskResult),

        const Self = @This();

        pub fn create(io: std.Io, allocator: std.mem.Allocator) !*Self {
            // バッファサイズを静的にコンパイル時に決定させるためpinningさせて生成させる
            var self = try allocator.create(Self);
            self.* =  .{
                .tasks = std.Io.Select(TaskResult).init(io, &self.buffer),
            };

            return self;
        }

        pub fn deinit(self: *Self, allocator: std.mem.Allocator) void {
            // 破棄の際に完了で停滞しているタスクをキャンセル
            // その際結果は捨てる
            self.tasks.cancelDiscard();
            allocator.destroy(self);
        }

        /// 非同期タスクを作成する
        /// 所有権をの管理下に置くDetachedTaskReaper
        pub fn spawn(self: *Self, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) !void {
            // 展開先のunion型のvariantのフィールドに該当するリテラルを第一引数に指定する
            // std.Io.Selectはselectシステムコールにおけるread/write同様に複数種のリソースを同時に待ち受けできる
            // そのためconcurrent() / async()では展開先のフィールドの明示が必要
            // 
            // またこの型においてconcurrent()の代わりにasync()を使用することは推奨されない
            // これはasync()の呼び出しは起動を保証しないため
            // 例えば非同期キューが満杯の場合とか
            // これは最悪デッドロックを引き起こすしかねない
            return self.tasks.concurrent(.item, function, args);
        }

        /// 死神に徘徊させて実行の終わったタスクの魂を刈り取る
        pub fn tick(self: *Self) !void {
            // このバッファは結果の格納先に使用されるもの
            // またこのバッファに0を指定するとキューすら確認せずに早期リターンしてしまう
            // そのため必ず1以上を指定すること
            var buffer: [buffer_size]TaskResult = undefined;
            // 1 tick進める
            // awaitManyの第二引数の待機数に0を指定することで
            // キューに通知されたタスクのみを回収する
            const len = try self.tasks.awaitMany(&buffer, 0);

            for (buffer[0..len]) |result| {
                result.item catch |err| {
                    // 簡易的なエラーログ
                    std.log.err("Detached task has error: {s}", .{ @errorName(err) });
                };
            }
        }

        /// 結果を格納するための型
        /// ここでは1種類のため1つだけvariantを用意
        /// 分類が必要な場合は複数のvariantを用意し、
        /// async / concurrentコールの際に指定する
        const TaskResult = union(enum) { item: anyerror!void };
    };
}

使い方:

var reaper = try DetachedTaskReaper(32).create(init.io, init.gpa);
defer reaper.deinit(init.gpa);
// タスクの割り当て
try reaper.spawn(task_fn, .{ ... });

// イベントループを回す
while (true) {
    ...
    // イベントループ内で回収
    try reaper.tick();
    ...
}

std.Io.async(...)std.Io.concurrent(...)よって作成されるstd.Io.Futureが内部的にアロケーションしているため、
Futureを作っただけの 投げっぱなしジャーマン Fire and forgetをするとメモリリーク報告の洗礼を受ける。
すなわち戻り値のFutureを受け取り、await()を呼ぶ必要がある。

ここで視点を変えて、Futureの代わりにstd.Io.Selectでタスクを作り所有権をこの型に委ね、リソース解放を任せれば擬似的Fire and forgetが実現できそうと考えた。
実際別スレッドで標準入力をハンドリングする際にこのコードを使用した。
とりあえず、それとなくは期待通りに動いている。3

関数の色問題

例示したコードからも分かるように、Zigの非同期関数には他の言語で見られるようなasyncがキーワードとして付与されていない。
asyncをキーワードで付与する多くの言語は、関数を非同期の文脈に入れた場合、その呼び出し元も遡って非同期関数にする必要がある。
これは関数の色(https://journal.stuffwithstuff.com/2015/02/01/what-color-is-your-function/)問題として知られる。
この問題に対して、Zigstd.Ioを引き回し、明示的に引き渡すことで対処している。
実にZigらしい解決策だと思った。

補足

この遊びで得た知識をもとに std.Ioを使ったnng (https://nng.nanomsg.org)のラッパー書いてる。
機能はまだまだ不十分。

  1. std.Io.Dir.cwd()のように要求しないものもいる

  2. 具体的にはstd.Io.Threadedから生成されたIoが引き渡されている

  3. 正しくはstd.Io.async()で標準入力のハンドリングをキックさせたらメモリリーク報告の洗礼を受けて作らざるをえなくなったのが本音

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?