24
17

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

C#Advent Calendar 2019

Day 12

追記型ストレージFasterLogについて

Last updated at Posted at 2019-12-11

始めに

Microsoftは、ローカルで使えるストレージライブラリのFASTERというものを開発している。
このFASTER、最初はキーバリューストアであるFasterKVのみ提供していた。しかし、バージョン2019.10.31.1より、FasterLogという、データの追記に特化した機能が追加された。
少し触ってみると、今自分が実現したい機能を丁度良くカバーしてそうなので、使い方や注意点を書いておく。

FasterKVは論文も発表されているので、特徴や理論的な裏付け等が知りたければそちらも参照のこと。

今回紹介するFasterLogは、FasterKVで使用しているストレージ機能のベース部分を利用した、追記と範囲検索に特化したものとなる。

なお、FasterLogは今もなおインターフェイスの更新が行われており、この記事で書かれていることと微妙に差異があるかもしれないので注意が必要。
この記事では1.9.5をベースに解説を行う。

特徴

  • データはディスクに保存される
  • 扱えるデータ型はバイト配列のみ
  • 以下の操作が可能
    • 追記
    • データの範囲検索
    • 過去のある時点までのレコード削除
  • 特定レコードの削除は不可
  • 途中にデータを挿入することは不可

使い方

プロジェクトへの導入

普通にnugetパッケージとして公開されているので、PackageReference等で追加すればOK。

他に、Microsoft.FASTER.Serverや、Microsoft.FASTER.Clientも存在するが、これは FasterKVをクライアントサーバー形式で利用したい場合に使うフレームワーク なので、今回は導入しない。

ストレージデバイスインスタンスを作成する

まず、以下のコードでログを格納するディスク領域を作成する。

// using FASTER.core;
// 独自ストレージを使用する場合、FASTER.core.IDeviceを実装したインスタンスを代わりに生成する
// ファイルパスのみ必須。
// 基本的に同じフォルダに複数ファイルを作る設計なので、複数のログファイルを扱いたい場合、別フォルダに分けるのが良い
IDevice logDevice = Devices.CreateLogDevice("[格納するファイルのパス]");

なお、deleteOnCloseというオプション引数が使用可能だが、これを使用すると、後でデータを開こうとした時に挙動がおかしくなるので、検証またはテスト目的以外では設定しないこと。
また、オプション引数でpreallocateFile=trueとすると、後でFasterLogを生成する時に最初から2^SegmentSizeBitsのファイルを確保してから、そこに書き込むという動作になる。
パフォーマンスを重視する場合はtrueにするのも手だが、その場合突然大きなファイルが作成されることになるため、注意が必要となる。

ここで生成したIDeviceは、必ずプログラム終了時にlogDevice.Dispose()するか、またはusingで囲むこと。

他のIDeviceの実装として、LocalMemoryDeviceというものもあるが、コンストラクタにConsole.Write等が仕込まれている等、テスト目的で作ったような印象なので、リファレンス実装あるいはテストのためのデバイスと思った方が良い。

また、Azure Storage(page blob)を使うIDeviceの実装もある。ただし、こちらの性能等は未検証

FasterLogインスタンスを生成する

ストレージデバイスインスタンスを生成したら、以下のようにして、FasterLogインスタンスを生成する。

// using FASTER.core;
// LogDeviceのみ必須
var fls = new FasterLogSettings()
{
    LogDevice = logDevice
};
using(var fl = new FasterLog(fls))
{
    // 処理
    // flインスタンスはプロセス内で使い回すこと
}

FasterLogSettingsのその他項目について

FasterLogSettingsのその他項目は以下のようになる。

  • PageSizeBits
    • 格納データページの基本単位
    • 各レコードは、メタデータと実データが必ず同じページに収まるように格納される
      • つまり、一レコード当たりの最大サイズがこの値に依存して決定される
      • ファイルの空間効率にも影響するので、サイズ設定は慎重に
    • 単位は何ビット使用するかなので、例えば8を指定したら2^8 = 256(bytes)となる
    • 初期値は22(2^22 ≒ 4MB)
  • MemorySizeBits
    • オンメモリに乗せる最大データ容量
    • 単位はPageSizeBitsと同じ
    • 初期値は23(2^23 ≒ 8MB)
    • 最低でもPageSizeBits+1必要で、下回るとFasterLogインスタンス生成時にエラーが出る
  • SegmentSizeBits
    • オンメモリに乗らないデータを格納するファイルのサイズ
    • 初期値は30(=1GB)
  • LogCommitManager
  • LogCommitFile
    • 1.9.5時点では使われていない
    • 出力先を変えたい場合、LogCommitManagerのインスタンスを作る必要がある
  • GetMemory
    • データ読出しの時に呼ばれるコールバック関数
  • LogChecksum

生成されるファイル

FasterLogをnewした時点で、以下のファイルが生成される

  • ログセグメントデータ: [CreateLogDeviceで指定したファイル].[0始まり数字]
    • データ本体が格納される
    • preallocateFile=trueの場合、2^[SegmentSizeBits]bytesのサイズをアロケートしようとするので注意(デフォルト1GB)
    • ログデータが2^[SegmentSizeBits]を超えるたびに、新しくセグメントファイルを作成する
  • トランザクションファイル: デフォルトは log-commits/commit.0.0
    • 論理的なアドレスの開始点、終点が書き込まれる
    • サイズは概ね一定
    • 複数のログデータで共有するとおかしなことになるので、同じディレクトリに複数ログデータを作成する場合は、デフォルト値から変更した方が良い
    • DeviceLogCommitCheckpointManager を作る場合、log-commitsディレクトリのベースパスを指定することができる

データを追加する

データの追加は以下のように行う

// FasterLog fl;
// byte[] data;
// ReadOnlySpan<byte>でも可
// Enqueueの時点ではまだ永続化はされない
long recordAddress = fl.Enqueue(data);
// ここで返ってくるaddress値は、WaitForCommit系のAPIでuntilAddressとして使用する
// コミットデータの永続化
fl.Commit(true);

Commitした時点でディスクに書き出される。
戻り値として、追加したデータの論理アドレスが取得できる。

例ではEnqueueを使用したが、これは内部的にEnqueueを成功するまで繰り返すという挙動のため、タイミングが悪いと時間がかかる場合もある。
一定回数追加を試みて、だめならエラーを出すか後でやり直すという挙動をしたい場合、bool FasterLog.TryEnqueue(byte[] data, out var logicalAddress)というAPIを使用する。
Enqueueも実は内部的にTryEnqueueを使っている

データを読み出す

データの読み出しは、C#8.0とそれより前でやり方が異なる。

共通事項

下記のように、FasterLog.Scan([開始アドレス], [終端アドレス])を使用する。

// FasterLog fl;
// fl.Scan([論理開始アドレス], [論理終端アドレス])という風にして指定する
using(FastLogScanIterator iter = fl.Scan(fl.CommittedBeginAddress, fl.CommittedUntilAddress))
{
    // enumeratorで走査
}

ここでいう開始アドレスと終端は、コミット済みの全てのログを見るならFasterLog.CommittedBeginAddressFasterLog.CommittedUntilAddressをそれぞれ指定すると良い。
全てのログを見たくない場合は、終端アドレスに、[開始アドレス] + [適当なバイト数]を指定する。

走査中に、現在見ているログの論理アドレスを知りたい場合は、FastLogScanIterator.CurrentAddress、次のエントリのアドレスを知る場合は、FastLogScanIterator.NextAddressを使用する。

非同期(C#8.0)

IAsyncEnumerableを使用する。

// FasterLog fl;
// fl.Scan([論理開始アドレス], [論理終端アドレス])という風にして指定する
using(FastLogScanIterator iter = fl.Scan(fl.CommittedBeginAddress, fl.CommittedUntilAddress))
{
    // dataの型はbyte[]
    // lenはデータ長(bytes)
    // 2019.11.18では二つだけだが、最新ソースでは更に long currentAddress も追加になる模様
    // https://github.com/microsoft/FASTER/commit/bf657635374873958d96b31db1299b58ef9a17b1
    await foreach(var (data, len, currentAddress, nextAddress) in iter.GetAsyncEnumerable())
    {
        // データの参照
    }
}

FastLogScanIterator.GetAsyncEnumerable(CancellationToken ct = default)では、foreachの要素に単純にnewされたbyte[]を受け取るが、代わりにSystem.Buffers.MemoryPool<byte>のインスタンスを渡すと、メモリプール経由でバッファの確保を行うため、アロケーションが減らせる。
ただし、IMemoryOwner<byte>自体のアロケーションは避けられないため、ゼロではない。
また、使い終わったIMemoryOwner<byte>はDisposeを行わないと、メモリリークの原因になる。

非同期(C#7.x以前)

FastLogScanIterator.[GetNext,NextAddress,WaitAsync]等を駆使する

// FasterLog fl;
using(FastLogScanIterator iter = fl.Scan(fl.CommittedBeginAddress, fl.CommittedUntilAddress))
{
    // 終端ではnextAddressが-1になる
    while(iter.NextAddress >= 0)
    {
        await iter.WaitAsync();
        // データを取り出すまでループと待機を行う
        // 引数は、データ本体、データ長、現在のアドレスの三つ
        while(iter.GetNext(out var entry, out var length, out var currentAddress, out var nextAddress))
        {
            // データの参照
        }
    }
}

データの削除

データの削除には、FasterLog.TruncateUntil(long untilAddress)またはFasterLog.TruncateUntilPageStart(long untilAddress)を使用する。

TruncateUntil

TruncateUntilデータの開始アドレス(BeginAddress)から、指定されたアドレス直前までのデータを削除するという挙動である。
注意点として、レコード境界ではない中途半端なアドレスを指定すると、次回のScan時にエラーが出るという仕様がある。

回避するには、末尾アドレス(FasterLog.TailAddress)を指定するか、下記のようにScanの途中で得たNextAddressで、正確なレコード境界を取得して指定するというやり方がある

// FasterLog fl;
long untilAddress = 0;
using(var iter = fl.Scan(fl.BeginAddress, [終端]))
{
    await foreach(var x in iter.GetAsyncEnumerable())
    {
        // 処理
        untilAddress = iter.NextAddress;
    }
}
fl.TruncateUntil(untilAddress);
// 最後にCommitすると変更が反映される
fl.Commit(true);

より安全に、かつ大雑把に消したい場合は、後述のTruncateUntilPageStartを使用する

TruncateUntilPageStart

TruncateUntilPageStartデータの開始アドレスから、指定されたアドレスに紐づくページの直前までを消去するという挙動である。
レコードはページをまたぐことは仕様上ないため、TruncateUntilで起こったような問題は起きない。
ただし、正確な消去はできないため、大雑把にログローテーション等をしたい場合に使うと良いだろう

パフォーマンス上の注意点

追記、削除する時は、操作後にコミットを行い永続化する必要があるが、コミットはFasterLogの中で最もコストのかかる処理だという事を念頭に置いた方が良い。しかし、コミットしないとデータが永続化されないので、信頼性が下がる。悩ましいところである。
つまり、より高速にデータを処理したい場合は、信頼性を下げずにいかにコミット回数を減らすかということを考える。

ではどうすればいいか。以下のようなやり方を一例として示そうと思う。

Commitタスクを独立させる

箇条書きにすると以下のような動作になる

  • 追記タスクを並列化する
  • 追記タスクは、自分の書き込みが完了するまでWaitForCommitAsyncで待機する
    • 要件による
  • 追加で、一つだけひたすらコミットだけするタスクを作成する
  • 追記タスクは、コミットタスクに自分が追記したレコードのアドレスを渡す
  • コミットタスクは、現在のコミット済み終端アドレスと受け取ったレコードアドレスを比較して、未コミットと判断したら、コミットを行う

タスク間のデータ受け渡しは、System.Threading.Channelsが使えると思う。

追加されたデータが永続化されるまで待機する

さて、このやり方では、追記とコミットが別タスクで行われる状態になる。しかし、要件によっては、データ消失を可能な限り避けるため、自分で追加したデータが、確実に永続化されたかどうか確認する必要が出てくる。
そこで、FasterLog.WaitForCommitAsync(long address, CancellationToken ct)を使用する。
引数にはFasterLog.Enqueueで得たアドレスを指定する。
これを使うと、指定されたアドレスがコミットされたと判断されるまで(address <= CommittedUntilAddressになるまで)待機が発生する。

コード例

具体的には、少々長くなるが以下のようなコードになる。

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Channels;
using FASTER.core;
using System.IO;
using System.Linq;

namespace fasterlabs
{
    static class FasterLogCommitTest
    {
        static long EnqueueData(long value, FasterLog fl)
        {
            Span<long> data = stackalloc long[1];

            if (fl.TryEnqueue(System.Runtime.InteropServices.MemoryMarshal.AsBytes(data), out var logicalAddress))
            {
                return logicalAddress;
            }
            else
            {
                return -1;
            }
        }
        public static async ValueTask DoTest(int TaskNum)
        {
            const long TotalCount = 100000;
            // ensure using cleared data 
            if (File.Exists("logcommittest.log.0"))
            {
                File.Delete("logcommittest.log.0");
            }
            if (Directory.Exists("log-commits"))
            {
                Directory.Delete("log-commits");
            }
            var log = Devices.CreateLogDevice("logcommittest.log");
            var channel = Channel.CreateUnbounded<long>();
            using (var fl = new FasterLog(new FasterLogSettings() { LogDevice = log }))
            {
                var sw = new System.Diagnostics.Stopwatch();
                sw.Start();
                using (var csrc = new CancellationTokenSource(1000 * 240))
                {
                    await Task.WhenAll(
                        Task.WhenAll(Enumerable.Range(0, TaskNum).Select(async idx =>
                        {
                            // データを追加するタスク
                            long logicalAddress = 0;
                            try
                            {
                                for (int i = 0; i < TotalCount / TaskNum; i++)
                                {
                                    logicalAddress = EnqueueData(i + idx * TotalCount, fl);
                                    await channel.Writer.WriteAsync(logicalAddress, csrc.Token).ConfigureAwait(false);
                                    await fl.WaitForCommitAsync(logicalAddress, csrc.Token).ConfigureAwait(false);
                                    // Console.WriteLine($"{idx}, {i}, {logicalAddress}");
                                }
                            }
                            catch (Exception e)
                            {
                                Console.WriteLine($"producer error({idx}, {logicalAddress}, {fl.CommittedUntilAddress}, {fl.TailAddress}): {e}");
                            }
                            // Console.WriteLine($"exit producer({idx}, {sw.Elapsed})");
                        })).ContinueWith(t => channel.Writer.Complete()),
                        Task.Run(async () =>
                        {
                            // コミットする方のタスク
                            int commitCount = 0;
                            try
                            {
                                while (true)
                                {
                                    if (!await channel.Reader.WaitToReadAsync(csrc.Token).ConfigureAwait(false))
                                    {
                                        break;
                                    }
                                    while (channel.Reader.TryRead(out var untiladdr))
                                    {
                                        if (fl.CommittedUntilAddress != fl.TailAddress)
                                        {
                                            fl.Commit(true);
                                            // await fl.CommitAsync(csrc.Token).ConfigureAwait(false);
                                            commitCount++;
                                        }
                                    }
                                }
                            }
                            catch (Exception e)
                            {
                                Console.WriteLine($"consumer error:{e}");
                            }
                            Console.WriteLine($"exit consumer({commitCount})");
                        }).ContinueWith(t =>
                        {
                            if(fl.CommittedUntilAddress != fl.TailAddress)
                            {
                                Console.WriteLine($"last commit");
                                fl.Commit(true);
                            }
                        })
                    ).ConfigureAwait(false);
                    sw.Stop();
                    Console.WriteLine($"Multi({TotalCount}, {TaskNum}): {sw.Elapsed}, iops = {(TotalCount * 1000) / sw.Elapsed.TotalMilliseconds}");
                }
            }
            log.Close();
        }
    }
}

バージョン2019.11.18.1より前のバージョンでは、CommitAsyncするとWaitForCommitAsyncとスレッドプールの消費が競合して、デッドロック状態になる場合があるので注意すること。(該当github issue)

終りに

今回はログ向けストレージ機能を持つFasterLogを紹介した。実際制約もあるので、あらゆる場面で使用できるわけではないが、それでも他には無い特徴を持っているため、役に立つ場面では役に立つと思われる。
機会があれば、コミット回数や並列数を変えて性能テスト等を行ってみたい。

また、データ検索についてはComplete周り等、もう少し掘り下げられそうな所があるので、別記事にするか、あるいはこの記事に追記したい。

参考リンク

24
17
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
24
17

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?