今日のゴール
- 3つの基本操作(write, read, take)を深く理解する
- ブロッキング操作と非ブロッキング操作の違いを把握する
- 操作の原子性について理解する
Lindaの3つの基本操作
タプルスペースの操作は、たった3つの基本操作で構成されています。
Write(書き込み)
特性
| 特性 | 説明 |
|---|---|
| 即座に完了 | 書き込みはブロックしない |
| 重複を許可 | 同じタプルを何度でも書き込める |
| 原子的 | 書き込みは他の操作と干渉しない |
基本的な使い方
# シンプルな書き込み
await ts.writeAsync(toTuple(strVal("message"), strVal("hello")))
# 数値を含むタプル
await ts.writeAsync(toTuple(strVal("count"), intVal(42)))
# 複合タプル(型、名前、年齢、アクティブ)
await ts.writeAsync(toTuple(
strVal("user"), strVal("alice"), intVal(25), boolVal(true)
))
重複書き込み
# 同じタプルを3回書き込むと、3つの別々のタプルになる
await ts.writeAsync(toTuple(strVal("item"), intVal(1)))
await ts.writeAsync(toTuple(strVal("item"), intVal(1)))
await ts.writeAsync(toTuple(strVal("item"), intVal(1)))
let all = await ts.readAllAsync(pattern)
echo all.len # 3 - 重複はマージされない
Read(読み取り)
特性
| 特性 | 説明 |
|---|---|
| 非破壊的 | タプルはスペースに残る |
| ブロッキング | マッチするタプルが見つかるまで待機 |
| 複数リーダー | 同じタプルを複数プロセスが読める |
ブロッキング vs 非ブロッキング
使い分け
# ブロッキング: タスクを待つワーカー
let job = await ts.readAsync(jobPattern) # 来るまで待つ
# 非ブロッキング: ポーリング
let maybeJob = await ts.tryReadAsync(jobPattern)
if maybeJob.isNone:
echo "No job available"
Take(取り出し)
特性
| 特性 | 説明 |
|---|---|
| 破壊的 | タプルはスペースから削除される |
| ブロッキング | マッチするタプルが見つかるまで待機 |
| 排他的 | 同じタプルは1プロセスだけが取得 |
readとtakeの違い
排他的取得の例
# タプルは1つだけ
await ts.writeAsync(toTuple(strVal("prize"), intVal(100)))
# 2つのワーカーが同時に取りに行く
# → 1つだけが成功し、もう1つはNoneを受け取る
💡 ポイント:
takeは分散システムにおける「一度だけ処理」を保証する基本メカニズムです。
操作の比較表
| 操作 | 破壊的 | ブロッキング | 複数取得 | 用途 |
|---|---|---|---|---|
write |
- | No | - | データ追加 |
read |
No | Yes | No | 参照、共有データ読み取り |
tryRead |
No | No | No | ポーリング |
readAll |
No | No | Yes | 全件参照 |
take |
Yes | Yes | No | タスク取得、排他処理 |
tryTake |
Yes | No | No | 非ブロッキングタスク取得 |
takeAll |
Yes | No | Yes | バッチ処理 |
原子性について
タプルスペースの各操作は原子的です。
# この操作は原子的
# 途中で他のプロセスがタプルを盗むことはない
let result = await ts.takeAsync(pattern)
複数の操作を原子的に行いたい場合は、バルク操作を使います(Day 11で詳説):
# 複数タプルを原子的に書き込み
await ts.writeAllAsync(@[tuple1, tuple2, tuple3])
実践例: シンプルなジョブキュー
3つの操作を組み合わせた典型的なパターン:
# プロデューサー: ジョブを投入
proc producer() {.async.} =
for i in 1..5:
await ts.writeAsync(toTuple(strVal("job"), intVal(i)))
# コンシューマー: ジョブを処理(排他的に取得)
proc consumer(id: string) {.async.} =
while true:
let job = await ts.tryTakeAsync(jobPattern)
if job.isSome:
echo id, " processing ", job.get()[1].intVal
ポイント:
-
write: プロデューサーがジョブを追加 -
tryTake: コンシューマーが排他的にジョブを取得 - 複数コンシューマーがいても、各ジョブは1回だけ処理される
まとめ
3つの基本操作
| 操作 | Linda名 | 特徴 |
|---|---|---|
write |
out | 追加、即座完了、重複可 |
read |
rd | 読取、ブロック、残る |
take |
in | 取出、ブロック、消える |
バリエーション
-
非ブロッキング版:
tryRead,tryTake- マッチなしならNone -
バッチ版:
readAll,takeAll- マッチする全タプルを処理
選び方の指針
共有データを参照したい → read
排他的に処理したい → take
待機したくない → tryXxx
全部まとめて → xxxAll
演習問題
問題4-1: カウンター
タプルスペースを使ったカウンターを実装してください。値を増やす操作は、現在値をtakeして+1した新しいタプルをwriteします。
問題4-2: 複数タイプのデータ
"config", "log", "metric"の3種類のタプルを混在させ、それぞれを別々に読み取るプログラムを書いてください。
問題4-3: FIFOの確認
10個のタプルを順番に書き込み、順番に取り出して、FIFOになっているか確認してください。
(ヒント: nindaは順序を保証しません)
💡 完全な実装例は GitHubリポジトリ を参照してください。
前回: 環境構築 | 目次 | 次回: パターンマッチング