0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Ninda 11日目: バルク操作 〜大量データの効率的な処理〜

Last updated at Posted at 2025-12-10

今日のゴール

  • バルク操作の必要性と利点を理解する
  • writeAll, takeAllの使い方をマスターする
  • 大量データを効率的に処理するパターンを学ぶ

なぜバルク操作が必要か

個別操作を繰り返すと、オーバーヘッドが蓄積します。

方式 操作回数 オーバーヘッド
個別 N回 ロック取得 × N
バルク 1回 ロック取得 × 1

writeAll: 一括書き込み

複数タプルを一度に書き込みます。

let tuples = @[
  toTuple(strVal("user"), strVal("alice"), intVal(25)),
  toTuple(strVal("user"), strVal("bob"), intVal(30)),
  toTuple(strVal("user"), strVal("charlie"), intVal(35))
]

let count = await ts.writeAllAsync(tuples)
echo "Wrote ", count, " tuples"

有効期限付きバルク書き込み

各タプルに異なるTTLを設定できます。

let tuples = @[tuple1, tuple2, tuple3]
let expirations = @[
  now + 1000,   # 1秒
  now + 5000,   # 5秒
  now + 10000   # 10秒
]

await ts.writeAllAsync(tuples, expirations)

ポイント:

  • tuplesexpirationsの長さは同じ
  • 期限なしは0を指定

takeAll: 一括取り出し

パターンにマッチする全タプルを一度に取り出します。

# パターンにマッチする全タプルを取り出し
let jobs = await ts.takeAllAsync(toPattern(strVal("job"), nilValue()))
echo "Took ", jobs.len, " jobs"

リミット付き取り出し

取り出し件数を制限できます。

# 最大10件だけ取り出し
let batch = await ts.takeAllAsync(
  toPattern(strVal("task"), nilValue()),
  limit = 10
)

ポイント:

  • limitなしは全件取り出し
  • バッチ処理では適切なサイズを指定

バッチ処理パターン

パターン1: 定期バッチ

一定間隔でまとめて処理。

proc batchProcessor() {.async.} =
  while true:
    await sleepAsync(200)  # 200msごと

    let batch = await ts.takeAllAsync(
      toPattern(strVal("log"), nilValue(), nilValue()),
      limit = 20
    )

    if batch.len > 0:
      # バッチ処理(ファイル書き込み、DB挿入など)
      processBatch(batch)

パターン2: サイズベースバッチ

一定件数に達したら処理。

const BATCH_SIZE = 100

while true:
  let batch = await ts.takeAllAsync(pattern, limit = BATCH_SIZE)
  if batch.len == 0:
    break
  processBatch(batch)

実践例: ETLパイプライン

Extract → Transform → Load の各ステージをバルク操作で実装。

# Transform: バッチ変換
while true:
  let batch = await raw.takeAllAsync(pattern, limit = 20)
  if batch.len == 0: break

  var transformed: seq[Tuple] = @[]
  for record in batch:
    transformed.add(transformRecord(record))

  await transformedSpace.writeAllAsync(transformed)

ポイント:

  • 各ステージは独立してスケール可能
  • 失敗時は該当バッチだけ再処理

パフォーマンス比較

1000件の書き込み比較:

方式 処理時間
個別 write × 1000 約100ms
writeAll 1000件 約5ms

💡 Tip: 10件以上をまとめて処理する場合は、バルク操作を検討しましょう。


まとめ

学んだこと

トピック ポイント
writeAllAsync 複数タプルを一括書き込み
takeAllAsync パターンマッチで一括取り出し
limit 取り出し件数の制限
用途 バッチ処理、ETL、大量データ移行

いつ使うか

場面 推奨操作
大量データ投入 writeAll
バッチ処理 takeAll + limit
全件削除 takeAll(limitなし)
ストリーム処理 個別 write/take

演習問題

問題11-1: バッチインポート

CSVファイルの各行をタプルに変換し、100件ずつバッチでインポートする機能を実装してください。

ヒント
  • CSVをパースしてseq[Tuple]を作成
  • 100件ごとにwriteAllAsync
  • 進捗表示: echo "Imported ", total, " records"
  • 最後の端数(100件未満)も忘れずに処理

問題11-2: 条件付き一括削除

特定の条件(例: 1時間以上前のログ)に合致するタプルを一括削除する機能を実装してください。

ヒント
  • readAllAsyncで全件取得
  • Nimのfilter関数で条件に合うものを抽出
  • 該当タプルをtakeAllAsyncで削除
  • パターンマッチングだけでは時刻の範囲検索はできないため、取得後にフィルタリング

問題11-3: ETLパイプライン

raw → validated → processed の3段階ETLパイプラインを実装してください。

  • raw: 生データ(バリデーション前)
  • validated: バリデーション済み
  • processed: 最終処理済み
ヒント
  • 3つの名前付きスペースを作成
  • 各ステージは takeAll(limit=N) → 変換 → writeAll のパターン
  • バリデーション失敗は別スペース(errors)に振り分け
  • 各ステージを独立したasyncプロシージャとして実装

前回: 有効期限(TTL) | 目次 | 次回: 型安全API

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?