はじめに
Zig 0.15.1でWritergateと称してI/O周りのAPIに抜本的に手が入り、Zig 0.16.0でついに念願の非同期APIが実装された。
ここでは、この非同期APIを使って遊んでみた記録。
Zig 0.15.1のWritergate
Zig 0.15.1のAPIはリリースノートの一文を超訳すると
-
Reader/Writerがanytypeを受け入れるジェネリック関数として定義されていてイケてない - 下位ストリームから返されたエラーを透過的に引き継いでてイケてない
-
BufferedReader / BufferedWriterに依存してた
https://ziglang.org/download/0.15.1/release-notes.html#Motivation
これらに抜本的に対策するためにstd.Ioインターフェースを用意され、Reader/Writerについて、このインターフェースを介するように変更された。
具体的には、例えばstd.fs.File.Readerの場合
旧API
pub const DeprecatedReader = std.io.GenericReader(File, ReadError, read);
新API
pub const Reader = struct {
file: File,
err: ?ReadError = null,
mode: Reader.Mode = .positional,
pos: u64 = 0,
size: ?u64 = null,
size_err: ?SizeError = null,
seek_err: ?Reader.SeekError = null,
interface: std.Io.Reader, // Io.Readerインターフェースを保持
....
Zig 0.16.0での変更
-
std.fsにあった型のほぼ全てがstd.Io下に移行された-
std.fs.path名前空間くらいしか残ってない
-
- 大半のファイルI/O APIが
std.Ioインターフェースを引数として要求されるようになった 1
具体的には、例えばstd.Io.Dir.closeだと
std.Io.Dir.close(dir: std.Io.Dir, io: std.Io) void { ... }
std.Ioプリミティブ
std.Ioを引数に渡す必要があるのはいいとして、どうやって初期化するのか?
その答えば、Zig 0.16.0で通称Juicy mainと呼ばれるmain()の引数にstd.process.Initが渡され、そこから所得できるようになっている。
pub const Init = struct {
minimal: Minimal,
arena: *std.heap.ArenaAllocator,
gpa: Allocator,
io: Io, // ココ
environ_map: *Environ.Map,
preopens: Preopens,
...
蛇足としてstd.process.Initの型を見るとわかるように、ArenaAllocatorとAllocatorも初期化して渡されるようになり、
従来のmain()の先頭で、ArenaAllocatorを初期化が不要になった。
Allocatorの引き回しは変わらず必要ではあるが。
閑話休題。
Zig 0.16.0以降のスタイルとして、Allocatorに加えてIoも引き回すことにはなる。
このIoはビルドオプションによって切り替わる。
zig build-exeの以下のオプション
$ zig build-exe -h | grep single-thread
-fsingle-threaded Code assumes there is only one thread
-fno-single-threaded Code may not assume there is only one thread
もしくはstd,Build.Moduleの
single_threaded: ?bool,
を指定する。
規定値( = null)の場合、multi-threaded IOとして初期化される。2
従来の引数なしのmain()の場合はどうするのかというと、
std.Io.Threaded.init(...)で直接初期化。
その上でfn io(t: *Threaded) Ioを呼んで取得する。
またIoプリミティブのファミリーとして、std.Io.Eventedも追加されている。
std.Io.Eventedを使うことでN:Mスレッド(グリーンスレッド)が実現できるとのこと。
Zig 0.16.0のリリースノートによると、そこそこテストは通してはいるがWork in progressとのこと。
https://ziglang.org/download/0.16.0/release-notes.html#toc-IO-as-an-Interface
ユニットテストの場合
std.testing.ioから取得できる。
build.zigにおけるビルド構成の場合
std.Build.Graphがioフィールドを持っている。
具体的には、以下のように取得:
pub fn build(b: *std.Build) void {
const io = b.graph.io;
...
}
std.Ioタスク
Zig 0.16.0で提供されるIoタスクは以下
- Future
- 関数ベースの非同期タスク
-
std.Io.async()またはstd.Io.concurrent()の発行で生成される -
await()の発行で実行し、終了するまで待機する(スレッドを明け渡す)
- Group
-
async(...)もしくはconcurrent(...)割り当てられた非同期タスクをawait()の発行で実行し、全てのタスクの完了まで待機する - タスクグループのキャンセルも
cancel()によって行える -
JavascriptのPromise.all()を思い浮かべるとわかりやすいかも
-
- Select
-
async(...)もしくはconcurrent(...)割り当てられた非同期タスクをawait()の発行で実行し、いずれか1つのタスクの完了まで待機する - タスクグループのキャンセルも
cancel()によって行える -
awaitMany()も用意されており、待機するタスク数を指定できる- 0でもOK (完了通知を受けた分を拾うだけで待機はしない)
-
JavascriptのPromise.any()を思い浮かべるとわかりやすいかも
-
- Batch
- 並行的にタスクを実行する低レイヤーAPI
- 扱いが難しく、まだ把握しきれてないので割愛
これらに加えて、ランダムシードの生成や時刻に関するAPIなども置かれてかなり巨大な代物となっている。
asyncとconcurrent
タスクの生成関数としてasync(...)とconcurrent(...)が提供されている。
これらの違いは
- async スケジューリングの調整を目的としており、
async(...)の呼び出しでは必ずしも実行されることは保証されてない-
std.Ioの規定値であるstd.Io.Threadedでは、std.Io.Threaded.async_limitで決定されている-
std.process.Initから提供されるこの値の規定値は論理CPU数-1
-
- この値未満はスケジューリングされる
- この値を超えると現スレッドで実行される
- 回避するためにはタスク数 >= CPU数の場合に
concurrent()を代わりに呼び出す必要がある
- 回避するためにはタスク数 >= CPU数の場合に
-
- concurrent
concurrent(...)の呼び出しは実行も保証する-
std.Io.Threadedでは、std.Io.Threaded.concurrent_limitで決定されている-
std.process.Initから提供されるこの値の規定値は上限なし
-
- この値を超えた場合、
ConcurrencyUnavailableエラーが返される- 規定値は上限なしなので際限なくスレッドが作られる
-
stackful coroutineを構築するために内部的にヒープアロケーションが行われる- エラーを返す可能性がある
-
これは、std.Io.Threadedを使用する場合の全てのIOタスク(Group, Select等)に作用する。
使用例
std.Io.Selectを使用して擬似的にタスクのFire and forgetを実現してみた。
fn DetachedTaskReaper(comptime buffer_size: comptime_int) type {
return struct {
/// std.Io.Selectの非同期キューに使用されるバッファ
buffer: [buffer_size]TaskResult = undefined,
tasks: std.Io.Select(TaskResult),
const Self = @This();
pub fn create(io: std.Io, allocator: std.mem.Allocator) !*Self {
// バッファサイズを静的にコンパイル時に決定させるためpinningさせて生成させる
var self = try allocator.create(Self);
self.* = .{
.tasks = std.Io.Select(TaskResult).init(io, &self.buffer),
};
return self;
}
pub fn deinit(self: *Self, allocator: std.mem.Allocator) void {
// 破棄の際に完了で停滞しているタスクをキャンセル
// その際結果は捨てる
self.tasks.cancelDiscard();
allocator.destroy(self);
}
/// 非同期タスクを作成する
/// 所有権をの管理下に置くDetachedTaskReaper
pub fn spawn(self: *Self, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) !void {
// 展開先のunion型のvariantのフィールドに該当するリテラルを第一引数に指定する
// std.Io.Selectはselectシステムコールにおけるread/write同様に複数種のリソースを同時に待ち受けできる
// そのためconcurrent() / async()では展開先のフィールドの明示が必要
//
// またこの型においてconcurrent()の代わりにasync()を使用することは推奨されない
// これはasync()の呼び出しは起動を保証しないため
// 例えば非同期キューが満杯の場合とか
// これは最悪デッドロックを引き起こすしかねない
return self.tasks.concurrent(.item, function, args);
}
/// 死神に徘徊させて実行の終わったタスクの魂を刈り取る
pub fn tick(self: *Self) !void {
// このバッファは結果の格納先に使用されるもの
// またこのバッファに0を指定するとキューすら確認せずに早期リターンしてしまう
// そのため必ず1以上を指定すること
var buffer: [buffer_size]TaskResult = undefined;
// 1 tick進める
// awaitManyの第二引数の待機数に0を指定することで
// キューに通知されたタスクのみを回収する
const len = try self.tasks.awaitMany(&buffer, 0);
for (buffer[0..len]) |result| {
result.item catch |err| {
// 簡易的なエラーログ
std.log.err("Detached task has error: {s}", .{ @errorName(err) });
};
}
}
/// 結果を格納するための型
/// ここでは1種類のため1つだけvariantを用意
/// 分類が必要な場合は複数のvariantを用意し、
/// async / concurrentコールの際に指定する
const TaskResult = union(enum) { item: anyerror!void };
};
}
使い方:
var reaper = try DetachedTaskReaper(32).create(init.io, init.gpa);
defer reaper.deinit(init.gpa);
// タスクの割り当て
try reaper.spawn(task_fn, .{ ... });
// イベントループを回す
while (true) {
...
// イベントループ内で回収
try reaper.tick();
...
}
std.Io.async(...)やstd.Io.concurrent(...)よって作成されるstd.Io.Futureが内部的にアロケーションしているため、
Futureを作っただけの 投げっぱなしジャーマン Fire and forgetをするとメモリリーク報告の洗礼を受ける。
すなわち戻り値のFutureを受け取り、await()を呼ぶ必要がある。
ここで視点を変えて、Futureの代わりにstd.Io.Selectでタスクを作り所有権をこの型に委ね、リソース解放を任せれば擬似的Fire and forgetが実現できそうと考えた。
実際別スレッドで標準入力をハンドリングする際にこのコードを使用した。
とりあえず、それとなくは期待通りに動いている。3
関数の色問題
例示したコードからも分かるように、Zigの非同期関数には他の言語で見られるようなasyncがキーワードとして付与されていない。
asyncをキーワードで付与する多くの言語は、関数を非同期の文脈に入れた場合、その呼び出し元も遡って非同期関数にする必要がある。
これは関数の色(https://journal.stuffwithstuff.com/2015/02/01/what-color-is-your-function/)問題として知られる。
この問題に対して、Zigはstd.Ioを引き回し、明示的に引き渡すことで対処している。
実にZigらしい解決策だと思った。
補足
この遊びで得た知識をもとに std.Ioを使ったnng (https://nng.nanomsg.org)のラッパー書いてる。
機能はまだまだ不十分。