今日のゴール
- バルク操作の必要性と利点を理解する
- writeAll, takeAllの使い方をマスターする
- 大量データを効率的に処理するパターンを学ぶ
なぜバルク操作が必要か
個別操作を繰り返すと、オーバーヘッドが蓄積します。
| 方式 | 操作回数 | オーバーヘッド |
|---|---|---|
| 個別 | N回 | ロック取得 × N |
| バルク | 1回 | ロック取得 × 1 |
writeAll: 一括書き込み
複数タプルを一度に書き込みます。
let tuples = @[
toTuple(strVal("user"), strVal("alice"), intVal(25)),
toTuple(strVal("user"), strVal("bob"), intVal(30)),
toTuple(strVal("user"), strVal("charlie"), intVal(35))
]
let count = await ts.writeAllAsync(tuples)
echo "Wrote ", count, " tuples"
有効期限付きバルク書き込み
各タプルに異なるTTLを設定できます。
let tuples = @[tuple1, tuple2, tuple3]
let expirations = @[
now + 1000, # 1秒
now + 5000, # 5秒
now + 10000 # 10秒
]
await ts.writeAllAsync(tuples, expirations)
ポイント:
-
tuplesとexpirationsの長さは同じ - 期限なしは
0を指定
takeAll: 一括取り出し
パターンにマッチする全タプルを一度に取り出します。
# パターンにマッチする全タプルを取り出し
let jobs = await ts.takeAllAsync(toPattern(strVal("job"), nilValue()))
echo "Took ", jobs.len, " jobs"
リミット付き取り出し
取り出し件数を制限できます。
# 最大10件だけ取り出し
let batch = await ts.takeAllAsync(
toPattern(strVal("task"), nilValue()),
limit = 10
)
ポイント:
-
limitなしは全件取り出し - バッチ処理では適切なサイズを指定
バッチ処理パターン
パターン1: 定期バッチ
一定間隔でまとめて処理。
proc batchProcessor() {.async.} =
while true:
await sleepAsync(200) # 200msごと
let batch = await ts.takeAllAsync(
toPattern(strVal("log"), nilValue(), nilValue()),
limit = 20
)
if batch.len > 0:
# バッチ処理(ファイル書き込み、DB挿入など)
processBatch(batch)
パターン2: サイズベースバッチ
一定件数に達したら処理。
const BATCH_SIZE = 100
while true:
let batch = await ts.takeAllAsync(pattern, limit = BATCH_SIZE)
if batch.len == 0:
break
processBatch(batch)
実践例: ETLパイプライン
Extract → Transform → Load の各ステージをバルク操作で実装。
# Transform: バッチ変換
while true:
let batch = await raw.takeAllAsync(pattern, limit = 20)
if batch.len == 0: break
var transformed: seq[Tuple] = @[]
for record in batch:
transformed.add(transformRecord(record))
await transformedSpace.writeAllAsync(transformed)
ポイント:
- 各ステージは独立してスケール可能
- 失敗時は該当バッチだけ再処理
パフォーマンス比較
1000件の書き込み比較:
| 方式 | 処理時間 |
|---|---|
| 個別 write × 1000 | 約100ms |
| writeAll 1000件 | 約5ms |
💡 Tip: 10件以上をまとめて処理する場合は、バルク操作を検討しましょう。
まとめ
学んだこと
| トピック | ポイント |
|---|---|
| writeAllAsync | 複数タプルを一括書き込み |
| takeAllAsync | パターンマッチで一括取り出し |
| limit | 取り出し件数の制限 |
| 用途 | バッチ処理、ETL、大量データ移行 |
いつ使うか
| 場面 | 推奨操作 |
|---|---|
| 大量データ投入 | writeAll |
| バッチ処理 | takeAll + limit |
| 全件削除 | takeAll(limitなし) |
| ストリーム処理 | 個別 write/take |
演習問題
問題11-1: バッチインポート
CSVファイルの各行をタプルに変換し、100件ずつバッチでインポートする機能を実装してください。
ヒント
- CSVをパースして
seq[Tuple]を作成 - 100件ごとに
writeAllAsync - 進捗表示:
echo "Imported ", total, " records" - 最後の端数(100件未満)も忘れずに処理
問題11-2: 条件付き一括削除
特定の条件(例: 1時間以上前のログ)に合致するタプルを一括削除する機能を実装してください。
ヒント
-
readAllAsyncで全件取得 - Nimのfilter関数で条件に合うものを抽出
- 該当タプルを
takeAllAsyncで削除 - パターンマッチングだけでは時刻の範囲検索はできないため、取得後にフィルタリング
問題11-3: ETLパイプライン
raw → validated → processed の3段階ETLパイプラインを実装してください。
- raw: 生データ(バリデーション前)
- validated: バリデーション済み
- processed: 最終処理済み
ヒント
- 3つの名前付きスペースを作成
- 各ステージは
takeAll(limit=N)→ 変換 →writeAllのパターン - バリデーション失敗は別スペース(errors)に振り分け
- 各ステージを独立した
asyncプロシージャとして実装