(本記事は今年の夏頃に某勉強会で発表した内容の焼き直しです。参加者の方が読むとかなりの部分重複があると思いますがご容赦下さい🙇♂️)
はじめに
旧Cloud Datastore(以降、旧Datastoreと記します)を長く使ってきましたが、そろそろFirestore in Datastore mode(以降Firestoreと記します)に真面目に取り組もうと重い腰を上げることにしました。
Firestoreの仕様はDatastoreとほぼ変わらないですが、主にトランザクション周りに大きな変更があります。
本記事ではFirestoreのトランザクションの挙動を実際にプログラムを動かしていろいろ検証してみます。
なお、本記事ではnative modeについては扱いません。
ただ、おそらくトランザクション周りの挙動はほぼ共通なのではと思っている(違っていたらごめんなさい)ので、参考にはなるかと思います。
基本的には公式ドキュメントに書かれている内容に沿って試します。
https://cloud.google.com/datastore/docs/concepts/transactions
プログラムにはGoのClient Libraryを利用します。
TL;DR
以降、若干長めのgdgdな記事になりますので、先に検証結果をまとめておきます。
- トランザクション内のデータ更新は、必ず全てが成功するか、全てが失敗する(Atomic)
- 旧Datastoreにあった25 Entity Groupsの制限は撤廃🎉
- トランザクション内でグローバルクエリが使える様に!🎉
- 最初にEntityをGetしたトランザクションがロックを確保し、他トランザクションの同じKeyのPutを含むコミットはその開放を待機
- トランザクション内で同じKeyでGet→Putしている場合、先行でGetしてロックを確保した他トランザクションがPut&コミットするとconcurrent transactionエラーとなる
- クエリはインデックスの範囲でロックを確保する
- トランザクション内でのGetやクエリは同じスナップショットを参照する。スナップショットはトランザクション開始時ではなく各Entityの参照時のもの
- Read-onlyトランザクションを使うとGETでロックがかからず他トランザクションはそのコミットを待機しない
Atomic
(公式ドキュメントより引用)
Each transaction is guaranteed to be atomic, meaning that transactions are never partially applied. Either all of the operations in the transaction are applied, or none of them are applied.
トランザクション内のデータ更新は、必ず全てが成功するか、全てが失敗します。
まず成功例を試してみます。
package main
import (
"context"
"fmt"
"log"
"os"
"time"
"cloud.google.com/go/datastore"
)
var client *datastore.Client
func init() {
projectID := os.Getenv("STORE_PROJECT")
fmt.Println(projectID)
clt, err := datastore.NewClient(context.Background(), projectID)
if err != nil {
log.Fatal(err)
}
client = clt
}
// SampleModel is sample model.
type SampleModel struct {
Value int
CreatedAt time.Time
}
func main() {
ctx := context.Background()
if _, err := client.RunInTransaction(ctx, func(tx *datastore.Transaction) error {
key1 := datastore.NameKey("Sample", "sample1", nil)
entity1 := SampleModel{
Value: 1,
CreatedAt: time.Now(),
}
if _, err := tx.Put(key1, &entity1); err != nil {
return err
}
key2 := datastore.NameKey("Sample", "sample2", nil)
entity2 := SampleModel{
Value: 2,
CreatedAt: time.Now(),
}
if _, err := tx.Put(key2, &entity2); err != nil {
return err
}
return nil
}, datastore.MaxAttempts(1)); err != nil {
log.Fatal(err)
}
fmt.Printf("done")
}
ちなみに datastore.MaxAttempts(1)
というオプションを渡しているのはRunInTransactionのリトライ(デフォルト3回)を無効にする為です。リトライが発生すると検証結果がわかりにくくなるので。
成功しました。正しくデータが保存されました。
わざと最後に失敗してみます。
if _, err := client.RunInTransaction(ctx, func(tx *datastore.Transaction) error {
〜前回と同じ処理〜
return errors.New("わざとエラー")
}, datastore.MaxAttempts(1)); err != nil {
log.Fatal(err)
}
前回保存したデータを削除してから実行します。
実行結果:
2020/08/21 04:04:46 わざとエラー
exit status 1
期待通り1件も保存されませんでした。
500エンティティ更新
Transactions can query or lookup any number of entities. You can create, update, or delete up to 500 entities in each transaction.
↑に書かれている、登録・更新・削除は1トランザクション内で500までという制限を試してみます。
まずはちょうど500件
ctx := context.Background()
if _, err := client.RunInTransaction(ctx, func(tx *datastore.Transaction) error {
for i := 0; i < 500; i++ {
key := datastore.NameKey("Sample", fmt.Sprintf("sample%03d", i+1), nil)
entity := SampleModel{
Value: i + 1,
CreatedAt: time.Now(),
}
if _, err := tx.Put(key, &entity); err != nil {
return err
}
}
return nil
}, datastore.MaxAttempts(1)); err != nil {
log.Fatal(err)
}
fmt.Printf("done")
↓実行結果
成功しました!
旧Datastoreでは25エンティティグループ超過している為エラーとなるケースです。Firestoreすごい!\(^o^)/
次に501件Putしてみます。
for i := 0; i < 501; i++ {
↓実行結果
2020/08/22 14:29:17 rpc error: code = InvalidArgument desc = cannot write more than 500 entities in a single call
exit status 1
エラーになりました。
登録、更新、削除の合計が500を超えるとエラーになります。
これは結構ツライ制限ですね(´・ω・`)
ちなみにGAE/Go 1st genでappengine/datastoreパッケージ使うとこの制限はありません。
FirestoreそのものではなくClient Library及びAPIの制限かと思いますが、1st genからの移行の際には注意が必要そうです。
クエリ
旧Datastoreでは、トランザクション内では祖先クエリという特殊なクエリしか実行することが出来ませんでした。
FirestoreではEntityGroupの制約が撤廃された為、トランザクション内でグローバルクエリを実行することが可能となります。
func main() {
ctx := context.Background()
q := datastore.NewQuery("Sample").Limit(100)
total := 0
var cursor datastore.Cursor
for {
count := 0
var tmpCur datastore.Cursor
if _, err := client.RunInTransaction(ctx, func(tx *datastore.Transaction) (err error) {
q := q.Transaction(tx)
if cursor.String() != "" {
q = q.Start(cursor)
}
it := client.Run(ctx, q)
keys := make([]*datastore.Key, 0, 100)
entities := make([]*SampleModel, 0, 100)
for {
entity := &SampleModel{}
key, err := it.Next(entity)
if err != nil {
if err == iterator.Done {
break
}
return err
}
// update
entity.CreatedAt = time.Now()
keys = append(keys, key)
entities = append(entities, entity)
}
if len(keys) > 0 {
tx.PutMulti(keys, entities)
}
tmpCur, err = it.Cursor()
if err != nil {
return err
}
count = len(keys)
return nil
}); err != nil {
log.Fatal(err)
}
total += count
cursor = tmpCur
if count < 100 {
break
}
}
fmt.Printf("done. total:%d\n", total)
}
グローバルクエリが問題なく実行できました!Firestoreスゴイ\(^o^)/
Locks
旧Datastoreではトランザクションに楽観的排他制御という方式が採用されていて、複数のトランザクションが同時に同じEntity Groupに対して操作を行った場合、片方(後からcommitした方)がエラーになります。
FirestoreではTransaction locksに書かれている通り、ロック方式が採用されているそうなので、試してみます。
Read-write transactions use reader/writer locks to enforce isolation and serializability. When two concurrent read-write transactions read or write the same data, the lock held by one transaction can delay the other transaction.
検証に使うコードは以下です。
func main() {
var kind = flag.String("kind", "Sample", "Kind name")
var sleepBefore = flag.Duration("sb", 0, "sleep duration before operation")
var sleepAfter = flag.Duration("sa", 0, "sleep duration after operation")
var keyName = flag.String("k", "test", "Key name")
var value = flag.Int("v", 1, "Value")
var readOnly = flag.Bool("r", false, "read-only")
var double = flag.Bool("d", false, "double")
var limit = flag.Int("l", 0, "limit")
flag.Parse()
operation := flag.Arg(0) // Get, Put, GetPut, Insert, Update, Upsert, GetUpdate, Query
ctx := context.Background()
log.Println("start")
opts := []datastore.TransactionOption{datastore.MaxAttempts(1)}
if *readOnly {
opts = append(opts, datastore.ReadOnly)
}
if _, err := client.RunInTransaction(ctx, func(tx *datastore.Transaction) error {
count := 1
if *double {
count = 2
}
for i := 0; i < count; i++ {
log.Printf("sleeping... %s\n", *sleepBefore)
time.Sleep(*sleepBefore)
key := datastore.NameKey(*kind, *keyName, nil)
switch operation {
case "Get":
log.Println("getting...")
var entity fstx.SampleModel
if err := tx.Get(key, &entity); err != nil {
return err
}
log.Printf("got %+v\n", entity)
case "Put":
log.Println("putting...")
entity := fstx.SampleModel{
Value: *value,
UpdatedAt: time.Now(),
}
if _, err := tx.Put(key, &entity); err != nil {
return err
}
case "GetPut":
log.Println("getting...")
var entity fstx.SampleModel
if err := tx.Get(key, &entity); err != nil && err != datastore.ErrNoSuchEntity {
return err
}
log.Printf("got %+v\n", entity)
log.Println("putting...")
entity.Value = *value
entity.UpdatedAt = time.Now()
if _, err := tx.Put(key, &entity); err != nil {
return err
}
case "Insert":
log.Println("inserting...")
entity := fstx.SampleModel{
Value: *value,
UpdatedAt: time.Now(),
}
if _, err := tx.Mutate(datastore.NewInsert(key, &entity)); err != nil {
return err
}
case "Update":
log.Println("updating...")
entity := fstx.SampleModel{
Value: *value,
UpdatedAt: time.Now(),
}
if _, err := tx.Mutate(datastore.NewUpdate(key, &entity)); err != nil {
return err
}
case "Upsert":
log.Println("upserting...")
entity := fstx.SampleModel{
Value: *value,
UpdatedAt: time.Now(),
}
if _, err := tx.Mutate(datastore.NewUpsert(key, &entity)); err != nil {
return err
}
case "GetUpdate":
var entity fstx.SampleModel
if err := tx.Get(key, &entity); err != nil && err != datastore.ErrNoSuchEntity {
return err
}
log.Printf("got %+v\n", entity)
entity.Value = *value
entity.UpdatedAt = time.Now()
log.Println("updating...")
if _, err := tx.Mutate(datastore.NewUpdate(key, &entity)); err != nil {
return err
}
case "Query":
keyA := datastore.NameKey(*kind, "sample001", nil)
keyC := datastore.NameKey(*kind, "sample010", nil)
q := datastore.NewQuery(*kind).
Filter("__key__ >=", keyA).
Filter("__key__ <=", keyC).
Transaction(tx)
log.Println("querying...")
var entities []*fstx.SampleModel
keys, err := client.GetAll(ctx, q, &entities)
if err != nil {
return err
}
for j, entity := range entities {
log.Printf("%d: %s %+v\n", j, keys[j], entity)
}
case "QueryAll":
q := datastore.NewQuery(*kind).Transaction(tx)
if *limit > 0 {
q = q.Limit(*limit)
}
log.Println("querying...")
var entities []*fstx.SampleModel
keys, err := client.GetAll(ctx, q, &entities)
if err != nil {
return err
}
for j, entity := range entities {
log.Printf("%d: %s %+v\n", j, keys[j], entity)
}
default:
return fmt.Errorf("illegal operation:%s", operation)
}
log.Printf("sleeping... %s\n", *sleepAfter)
time.Sleep(*sleepAfter)
}
log.Println("committing...")
return nil
}, opts...); err != nil {
log.Fatal(err)
}
log.Printf("done.")
}
Get、 Put、 Get&Put、Mutationの各種処理を複数トランザクションから並行に行ってみます。
Get vs Get
単純なGetを並行で実行します。(必要なエンティティは予めPutしておきます)
最初に実行するプロセス(プロセス1)でコミット前にsleepを入れて、後から実行するプロセス(プロセス2)は即コミットします。
結果
プロセス1
$ ./fstx -sa 20s Get
2020/09/10 15:20:43 start
2020/09/10 15:20:44 getting...
2020/09/10 15:20:44 got {Value:1 CreatedAt:2020-09-10 14:19:37.872402 +0900 JST}
2020/09/10 15:20:44 sleeping... 20s
2020/09/10 15:21:04 commiting...
2020/09/10 15:21:04 done.
プロセス2
$ ./fstx Get
2020/09/10 15:20:47 start
2020/09/10 15:20:48 getting...
2020/09/10 15:20:48 got {Value:1 CreatedAt:2020-09-10 14:19:37.872402 +0900 JST}
2020/09/10 15:20:48 sleeping... 0s
2020/09/10 15:20:48 commiting...
2020/09/10 15:20:48 done.
ロックはかかりませんでした。
プロセス2は即正常終了し、プロセス1もSleep後にすぐ正常終了しました。
ドキュメントに書かれているWhen two concurrent read-write transactions read or write the same data, the lock held by one transaction can delay the other transaction.
と矛盾している気がしますね🤔
プロセス2にもsleepを入れてプロセス1より後にコミットする様にしてみます。
結果
プロセス1
$ ./fstx -sa 20s Get
2020/09/10 15:24:20 start
2020/09/10 15:24:20 getting...
2020/09/10 15:24:20 got {Value:1 CreatedAt:2020-09-10 14:19:37.872402 +0900 JST}
2020/09/10 15:24:20 sleeping... 20s
2020/09/10 15:24:40 commiting...
2020/09/10 15:24:40 done.
プロセス2
$ ./fstx -sa 20s Get
2020/09/10 15:24:27 start
2020/09/10 15:24:28 getting...
2020/09/10 15:24:28 got {Value:1 CreatedAt:2020-09-10 14:19:37.872402 +0900 JST}
2020/09/10 15:24:28 sleeping... 20s
2020/09/10 15:24:48 commiting...
2020/09/10 15:24:48 done.
やはりロックはかかりませんでした。
Get vs Put
プロセス1でGetして、sleepしている間にプロセス2でPutします。
プロセス2は即コミットします。
結果
プロセス1
$ ./fstx -sa 20s Get
2020/09/10 16:17:38 start
2020/09/10 16:17:38 getting...
2020/09/10 16:17:38 got {Value:1 CreatedAt:2020-09-10 14:19:37.872402 +0900 JST}
2020/09/10 16:17:38 sleeping... 20s
2020/09/10 16:17:58 commiting...
2020/09/10 16:17:58 done.
プロセス2
$ ./fstx Put
2020/09/10 16:17:43 start
2020/09/10 16:17:43 putting...
2020/09/10 16:17:43 sleeping... 0s
2020/09/10 16:17:43 commiting...
2020/09/10 16:17:58 done.
プロセス2(Put)のコミットでロックがかかりました。
プロセス1のコミットが完了したらロックが開放されてPutも正常にコミットされました。
ロックのタイミングはPutではなくコミット時にかかるのですね。
というか、API仕様を見るとそもそもPut時はAPI呼び出しが行われておらず、Commitでまとめて送信される様です。
ちなみに旧データストアの場合ですと、このパターンはロックかからずどちらも正常終了となります。
Put vs Get
プロセス1でPutして、sleepしている間にプロセス2でGetします。
プロセス2は即コミットします。
結果
プロセス1
$ ./fstx -sa 20s Put
2020/09/10 16:26:54 start
2020/09/10 16:26:55 putting...
2020/09/10 16:26:55 sleeping... 20s
2020/09/10 16:27:15 commiting...
2020/09/10 16:27:15 done.
プロセス2
$ ./fstx Get
2020/09/10 16:26:59 start
2020/09/10 16:27:00 getting...
2020/09/10 16:27:00 got {Value:1 CreatedAt:2020-09-10 16:24:57.462099 +0900 JST}
2020/09/10 16:27:00 sleeping... 0s
2020/09/10 16:27:00 commiting...
2020/09/10 16:27:00 done.
プロセス2のGetはロックされませんでした。
プロセス2も20秒sleepしてみます。
結果
プロセス1
$ ./fstx -sa 20s Put
2020/09/11 05:09:08 start
2020/09/11 05:09:09 putting...
2020/09/11 05:09:09 sleeping... 20s
2020/09/11 05:09:29 commiting...
2020/09/11 05:09:36 done.
プロセス2
$ ./fstx -sa 20s Get
2020/09/11 05:09:15 start
2020/09/11 05:09:16 getting...
2020/09/11 05:09:16 got {Value:1 CreatedAt:2020-09-11 05:03:01.615712 +0900 JST}
2020/09/11 05:09:16 sleeping... 20s
2020/09/11 05:09:36 commiting...
2020/09/11 05:09:36 done.
プロセス1のコミットがプロセス2のコミット終了までブロックされました!
Putを含むコミットは、他トランザクションのGetがあるとそのコミットまでロックされる様です。
RDBの行ロック的なものをイメージしてましたが、根本的に違う概念な気がしてきました(・ω・)
ちなみに旧Datastoreだとロックされずにどちらも正常終了します。
Put vs Put
プロセス1でPutして、sleepしている間にプロセス2で上書きPutします。
プロセス2は即コミットします。
結果
プロセス1
$ ./fstx -sa 20s Put
2020/09/11 03:21:30 start
2020/09/11 03:21:31 putting...
2020/09/11 03:21:31 sleeping... 20s
2020/09/11 03:21:51 commiting...
2020/09/11 03:21:51 done.
プロセス2
$ ./fstx Put
2020/09/11 03:21:35 start
2020/09/11 03:21:36 putting...
2020/09/11 03:21:36 sleeping... 0s
2020/09/11 03:21:36 commiting...
2020/09/11 03:21:36 done.
なんと!どちらもロックされませんでした!
旧Datastoreでも試しましたが同様の挙動をしました。
結構長い期間Datastoreを使ってきましたが、お恥ずかしながらPut同士は衝突するものとばかり思ってました(´・ω・`)
Get&Put vs Get
プロセス1ではエンティティを一度GetしたのちPutしてsleep。
プロセス2はその間にGetして即コミットします。
結果
プロセス1
$ ./fstx -sa 20s GetPut
2020/09/11 04:14:49 start
2020/09/11 04:14:49 getting...
2020/09/11 04:14:49 got {Value:1 CreatedAt:2020-09-11 04:00:30.603741 +0900 JST}
2020/09/11 04:14:49 putting...
2020/09/11 04:14:49 sleeping... 20s
2020/09/11 04:15:09 commiting...
2020/09/11 04:15:10 done.
プロセス2
$ ./fstx Get
2020/09/11 04:14:52 start
2020/09/11 04:14:53 getting...
2020/09/11 04:14:53 got {Value:1 CreatedAt:2020-09-11 04:00:30.603741 +0900 JST}
2020/09/11 04:14:53 sleeping... 0s
2020/09/11 04:14:53 commiting...
2020/09/11 04:14:53 done.
どちらもロックされず成功しました。
Get vs Get&Put
プロセス1ではエンティティをGetしてsleep。
プロセス2はその間にGet&Putし、即コミットします。
結果
プロセス1
$ ./fstx -sa 20s Get
2020/09/11 04:18:39 start
2020/09/11 04:18:40 getting...
2020/09/11 04:18:40 got {Value:1 CreatedAt:2020-09-11 04:18:10.5811 +0900 JST}
2020/09/11 04:18:40 sleeping... 20s
2020/09/11 04:19:00 commiting...
2020/09/11 04:19:00 done.
プロセス2
$ ./fstx GetPut
2020/09/11 04:18:45 start
2020/09/11 04:18:45 getting...
2020/09/11 04:18:45 got {Value:1 CreatedAt:2020-09-11 04:18:10.5811 +0900 JST}
2020/09/11 04:18:45 putting...
2020/09/11 04:18:45 sleeping... 0s
2020/09/11 04:18:45 commiting...
2020/09/11 04:19:00 done.
プロセス2のコミットが、プロセス1のコミット完了までロックされました。
結果はどちらも成功しています。
Get&Put vs Put
プロセス1ではエンティティを一度GetしたのちPutしてsleep。
プロセス2はその間にPutして即コミットします。
結果
プロセス1
$ ./fstx -sa 20s GetPut
2020/09/11 04:21:44 start
2020/09/11 04:21:44 getting...
2020/09/11 04:21:44 got {Value:1 CreatedAt:2020-09-11 04:18:45.825498 +0900 JST}
2020/09/11 04:21:44 putting...
2020/09/11 04:21:44 sleeping... 20s
2020/09/11 04:22:04 commiting...
2020/09/11 04:22:05 done.
プロセス2
$ ./fstx Put
2020/09/11 04:21:49 start
2020/09/11 04:21:50 putting...
2020/09/11 04:21:50 sleeping... 0s
2020/09/11 04:21:50 commiting...
2020/09/11 04:22:05 done.
プロセス2のコミットが、プロセス1のコミット完了までロックされました。
結果はどちらも成功しています。
Put vs Get&Put
プロセス1ではエンティティをPutしてsleep。
プロセス2はその間にGet&Putし、即コミットします。
結果
プロセス1
$ ./fstx -sa 20s Put
2020/09/11 04:32:43 start
2020/09/11 04:32:44 putting...
2020/09/11 04:32:44 sleeping... 20s
2020/09/11 04:33:04 commiting...
2020/09/11 04:33:04 done.
プロセス2
$ ./fstx GetPut
2020/09/11 04:32:48 start
2020/09/11 04:32:48 getting...
2020/09/11 04:32:48 got {Value:1 CreatedAt:2020-09-11 04:31:50.387902 +0900 JST}
2020/09/11 04:32:48 putting...
2020/09/11 04:32:48 sleeping... 0s
2020/09/11 04:32:48 commiting...
2020/09/11 04:32:48 done.
Put vs Put同様ロックがかからず、どちらも成功しました。
プロセス2にもsleepを入れてプロセス1より後にコミットする様にしてみます。
さらに書き込む値も変えてみます。
結果
プロセス1
$ ./fstx -sa 20s -v 2 Put
2020/09/11 04:41:10 start
2020/09/11 04:41:11 putting...
2020/09/11 04:41:11 sleeping... 20s
2020/09/11 04:41:31 commiting...
2020/09/11 04:41:36 done.
プロセス2
$ ./fstx -sa 20s -v 3 GetPut
2020/09/11 04:41:16 start
2020/09/11 04:41:16 getting...
2020/09/11 04:41:16 got {Value:1 CreatedAt:2020-09-11 04:41:05.819933 +0900 JST}
2020/09/11 04:41:16 putting...
2020/09/11 04:41:16 sleeping... 20s
2020/09/11 04:41:36 commiting...
2020/09/11 04:41:36 done.
プロセス1のコミットがプロセス2のコミットまでロックされました。
結果はどちらも成功し、最終的な値はプロセス1の書き込んだValue:2
となりました。
ちなみに旧Datastoreだとプロセス1が即成功終了し、プロセス2がconcurrent transactionエラーとなります。
Get&Put vs Get&Put
プロセス1、2両方でGet&Putします。
プロセス1はsleepし、その間にプロセス2がGet&Put&コミットします。
結果
プロセス1
$ ./fstx -sa 20s GetPut
2020/09/11 09:14:44 start
2020/09/11 09:14:45 getting...
2020/09/11 09:14:45 got {Value:1 CreatedAt:2020-09-11 08:43:20.957081 +0900 JST}
2020/09/11 09:14:45 putting...
2020/09/11 09:14:45 sleeping... 20s
2020/09/11 09:15:05 commiting...
2020/09/11 09:15:05 done.
プロセス2
$ ./fstx GetPut
2020/09/11 09:14:50 start
2020/09/11 09:14:50 getting...
2020/09/11 09:14:50 got {Value:1 CreatedAt:2020-09-11 08:43:20.957081 +0900 JST}
2020/09/11 09:14:50 putting...
2020/09/11 09:14:50 sleeping... 0s
2020/09/11 09:14:50 commiting...
2020/09/11 09:15:05 datastore: concurrent transaction
プロセス2はプロセス1のコミット完了までロックし、concurrent transactionエラーとなりました。
旧Datastoreだとロックは発生せず、プロセス2は即時正常終了し、プロセス1がconcurrent transactionエラーとなります。
Firestoreになって優先されるトランザクションが「先にcommitした方」から「先にロック取得した方」に変わったのですね。
Mutation - Get、 Update、(エンティティが存在している状態での)Upsert
Get、Putと同じ組み合わせを検証しましたが、同じ挙動でした。
Mutation - Insert vs Insert
エンティティが存在しない状態からInsertを同時に行います。
結果
プロセス1
$ ./fstx -sa 20s Insert
2020/09/11 09:36:43 start
2020/09/11 09:36:44 inserting...
2020/09/11 09:36:44 sleeping... 20s
2020/09/11 09:37:04 commiting...
2020/09/11 09:37:04 rpc error: code = AlreadyExists desc = entity already exists: app: "b~kni-fs-tx-test"
path <
Element {
type: "Sample"
name: "test"
}
>
プロセス2
$ ./fstx Insert
2020/09/11 09:36:48 start
2020/09/11 09:36:49 inserting...
2020/09/11 09:36:49 sleeping... 0s
2020/09/11 09:36:49 commiting...
2020/09/11 09:36:49 done.
後から実行したプロセス2は即時成功終了し、プロセス1はコミット時にAlreadyExistsというエラーがおきました。
プロセス2にsleep 20秒を設定すると今度はプロセス2がAlreadyExistsになります。
ロックは発生せず、コミットは先勝になる様です。
Query & Put
予めKey名がA
, B
, C
のエンティティを用意しておきます。
プロセス1が A
<= key <= C
でクエリを実行し、10秒sleepします。
プロセス2が後からKey名AA
でエンティティをPutします。
結果
プロセス1
$ ./locks/locks -sa 10s Query
2020/09/11 17:10:47 start
2020/09/11 17:10:47 sleeping... 0s
2020/09/11 17:10:47 querying...
2020/09/11 17:10:48 0: &{Value:1 UpdatedAt:2020-09-11 16:22:50.11876 +0900 JST}
2020/09/11 17:10:48 1: &{Value:2 UpdatedAt:2020-09-11 16:22:59.032873 +0900 JST}
2020/09/11 17:10:48 2: &{Value:3 UpdatedAt:2020-09-11 16:23:08.504272 +0900 JST}
2020/09/11 17:10:48 sleeping... 10s
2020/09/11 17:10:58 committing...
2020/09/11 17:10:58 done.
プロセス2
$ ./locks/locks -k AA Put
2020/09/11 16:55:26 start
2020/09/11 16:55:27 sleeping... 0s
2020/09/11 16:55:27 putting...
2020/09/11 16:55:27 sleeping... 0s
2020/09/11 16:55:27 committing...
2020/09/11 16:55:42 done.
プロセス2はプロセス1の結果に含まれないエンティティをPutしたのにロックがかかりました。
つまり、クエリを実行した時点で、インデックスの範囲に対してロックが取得されている様です。
まとめ
- 最初にEntityをGetしたトランザクションがロックを確保し、他トランザクションの同じKeyのPutを含むコミットはその開放を待機する(ロック待機はPutではなくコミット時)
- クエリはインデックスの範囲でロックを確保する(たぶん)
- トランザクション内で同じKeyでGet→Putしている場合、先行でGetしてロックを確保した他トランザクションがPut&コミットするとconcurrent transactionとなる
- Putのみの場合は先行トランザクションのロックを待機するがそのまま上書き更新する
Isolation and consistency
トランザクション分離レベルを検証します。
ドキュメントには以下の様に記されています。
Datastore mode databases enforce serializable isolation. Data read or modified by a transaction cannot be concurrently modified.
分離レベルはRDBで言うところのSerializableとなっているとのことです。
Locksで使用したコードを再度使って検証します。
Put vs Get
Locksの検証で使用した単純なPut vs Getのケースで検証します。
予めValueプロパティの値を1にしておきます。
プロセス1がPutを行い20秒間sleep、その間にプロセス2がGetします。
結果
プロセス1
$ ./fstx -sa 20s -v 2 Put
2020/09/11 14:50:31 start
2020/09/11 14:50:32 putting...
2020/09/11 14:50:32 sleeping... 20s
2020/09/11 14:50:52 commiting...
2020/09/11 14:50:52 done.
プロセス2
$ ./fstx Get
2020/09/11 14:50:36 start
2020/09/11 14:50:36 getting...
2020/09/11 14:50:36 got {Value:1 CreatedAt:2020-09-11 14:50:14.517059 +0900 JST}
2020/09/11 14:50:36 sleeping... 0s
2020/09/11 14:50:36 commiting...
2020/09/11 14:50:36 done.
プロセス2は、プロセス1のPutした値ではなく、更新前の値を取得しています。
事前に対象エンティティを削除しておいた場合、プロセス2はno such entityエラーで終了します。
ダーティリードは発生せずREAD COMMITTED分離レベルは担保されていることが見て取れます。
Non-Repeatable Readも発生しなさそうです。
なぜならLocksの検証で確認した通り、トランザクションがGetした後に他トランザクションが更新しようとするとロックがかかる為です。
またQueryも先の検証の通り、インデックス範囲でロックを取得している為、ファントムリードも発生しません。
(訂正)
↑のNon-Repeatable Read及びファントムリードについての記述は齟齬がありました🙇♂️
Read Onlyトランザクションで参照している場合は、他トランザクションのPutはロックされません。
なので「ロックがかかる為」という理由はおかしいですね。
Non-Repeatable Read、ファントムリードが起きないのはたしかなのですが、後のRead-onlyトランザクション章で検証します。
遅延Get Vs Put
ドキュメントの下記記述を検証します。
Queries and lookups in a transaction see a consistent snapshot of the state of the database. This snapshot is guaranteed to contain the effect of all transactions and writes that completed prior to the beginning of the transaction.
予めValueプロパティの値を1にしておきます。
今まではエンティティ操作後にsleepしていましたが、今回プロセス1はまずsleepしてからGetを行います。
プロセス1のsleep中にプロセス2はPutを行います。
結果
プロセス1
$ ./fstx -sb 10s Get
2020/09/11 15:33:01 start
2020/09/11 15:33:02 sleeping... 10s
2020/09/11 15:33:12 getting...
2020/09/11 15:33:12 got {Value:99 CreatedAt:2020-09-11 15:33:06.947901 +0900 JST}
2020/09/11 15:33:12 sleeping... 0s
2020/09/11 15:33:12 commiting...
2020/09/11 15:33:12 done.
プロセス2
$ ./fstx -v 99 Put
2020/09/11 15:33:06 start
2020/09/11 15:33:06 sleeping... 0s
2020/09/11 15:33:06 putting...
2020/09/11 15:33:06 sleeping... 0s
2020/09/11 15:33:06 commiting...
2020/09/11 15:33:07 done.
Getでプロセス2のPutした値が取得されました。
ドキュメントは「トランザクション開始時のスナップショットを参照する」という様に読めますが、トランザクション内で最初にGetした時点の(エンティティ)スナップショットを参照している様に見えますね。
内部を想像するとその方がしっくりきますが(トランザクション開始時のスナップショットはかなり大変そう)、ドキュメント内容はちょっと齟齬がある様な・・・🤔
連続 Get vs Put
ちょっと複雑なケースを検証します。
- プロセス1がトランザクション開始
- プロセス2がエンティティA, B, CをPut
- プロセス1がエンティティAを取得
- プロセス3がエンティティB, CをPut
- プロセス1がエンティティBを取得
- プロセス4がエンティティCをPut
- プロセス1がエンティティBを取得
結果
プロセス1
$ ./getabc
2020/09/11 16:22:46 sleeping... 10s
2020/09/11 16:22:56 key:/Sample,A, value:{Value:1 UpdatedAt:2020-09-11 16:22:50.11876 +0900 JST}
2020/09/11 16:22:56 sleeping... 10s
2020/09/11 16:23:06 key:/Sample,B, value:{Value:2 UpdatedAt:2020-09-11 16:22:59.032873 +0900 JST}
2020/09/11 16:23:06 sleeping... 10s
2020/09/11 16:23:16 key:/Sample,C, value:{Value:3 UpdatedAt:2020-09-11 16:23:08.504272 +0900 JST}
done
プロセス2,3,4
$ ./putabc -v 1 A B C
2020/09/11 16:22:50 putting value:1 into entities [A B C]
done
$ ./putabc -v 2 B C
2020/09/11 16:22:59 putting value:2 into entities [B C]
done
$ ./putabc -v 3 C
2020/09/11 16:23:08 putting value:3 into entities [C]
done
やはり、それぞれのエンティティをGetしたタイミングのスナップショットを参照しています。
Read-only Transaction
(公式ドキュメントより引用)
Read-only transactions cannot modify entities, but in return, they do not contend with any other transactions and do not need to be retried. If you perform only reads in a regular, read-write transaction, then that transaction may contend with transaction that modify the same data.
通常トランザクション内でGetやクエリを使用すると、他トランザクションのPutのコミットはそれを待機する挙動になります。
時間のかかるトランザクション内で大量の範囲にロックのかかるクエリを使ったりすると、アプリケーション全体のスループットに大きな影響が出やすくなります。
Read-onlyトランザクションは、更新を許さない変わりに他トランザクションのPutをロックしません。
ここではRead-onlyトランザクションを使った場合の分離レベルについて検証してみます。
繰り返しGet vs Put
プロセス1はRead-onlyトランザクションを利用してSleepを挟んで2回同じKeyでGetします。
プロセス2はプロセス1のSleepの前にPut&コミットします。
結果
プロセス1
$ ./fstx -sa 10s -r -d Get
2020/12/19 07:27:26 start
2020/12/19 07:27:26 sleeping... 0s
2020/12/19 07:27:26 getting...
2020/12/19 07:27:26 got {Value:1 UpdatedAt:2020-12-18 09:34:20.470126 +0900 JST}
2020/12/19 07:27:26 sleeping... 10s
2020/12/19 07:27:36 sleeping... 0s
2020/12/19 07:27:36 getting...
2020/12/19 07:27:36 got {Value:1 UpdatedAt:2020-12-18 09:34:20.470126 +0900 JST}
2020/12/19 07:27:36 sleeping... 10s
2020/12/19 07:27:46 committing...
2020/12/19 07:27:46 done.
プロセス2
$ ./fstx Put
2020/12/19 07:27:29 start
2020/12/19 07:27:29 sleeping... 0s
2020/12/19 07:27:29 putting...
2020/12/19 07:27:29 sleeping... 0s
2020/12/19 07:27:29 committing...
2020/12/19 07:27:29 done.
プロセス2のPutは、Read-Writeトランザクションの様にプロセス1のコミットを待機していません。
プロセス1はプロセス2のPutを跨いで繰り返しGetしていますが、プロセス2のPut結果を参照せず同一スナップショットを参照しています。
Non-Repeatable Readがないことが確認できます。
繰り返しクエリ vs Put(update)
プロセス1はRead-onlyトランザクションを利用してSleepを挟んで2回クエリを実行します。
プロセス2はプロセス1のSleepの前にプロセス1の結果に含まれるKeyでPut&コミットします。
結果
プロセス1
$ ./fstx -sa 10s -r -d Query
2020/12/19 07:47:42 start
2020/12/19 07:47:43 sleeping... 0s
2020/12/19 07:47:43 querying...
2020/12/19 07:47:43 0: /Sample,sample001 &{Value:1 UpdatedAt:2020-12-18 09:35:39.055707 +0900 JST}
2020/12/19 07:47:43 1: /Sample,sample002 &{Value:2 UpdatedAt:2020-12-19 07:47:06.097679 +0900 JST}
2020/12/19 07:47:43 2: /Sample,sample003 &{Value:3 UpdatedAt:2020-12-19 07:46:56.47674 +0900 JST}
2020/12/19 07:47:43 3: /Sample,sample004 &{Value:4 UpdatedAt:2020-09-12 02:23:03.483699 +0900 JST}
2020/12/19 07:47:43 4: /Sample,sample005 &{Value:5 UpdatedAt:2020-09-12 02:23:03.483701 +0900 JST}
2020/12/19 07:47:43 5: /Sample,sample006 &{Value:6 UpdatedAt:2020-09-12 02:23:03.483704 +0900 JST}
2020/12/19 07:47:43 6: /Sample,sample007 &{Value:7 UpdatedAt:2020-09-12 02:23:03.483707 +0900 JST}
2020/12/19 07:47:43 7: /Sample,sample008 &{Value:8 UpdatedAt:2020-09-12 02:23:03.483709 +0900 JST}
2020/12/19 07:47:43 8: /Sample,sample009 &{Value:9 UpdatedAt:2020-09-12 02:23:03.483712 +0900 JST}
2020/12/19 07:47:43 9: /Sample,sample010 &{Value:10 UpdatedAt:2020-12-19 07:47:15.49936 +0900 JST}
2020/12/19 07:47:43 sleeping... 10s
2020/12/19 07:47:53 sleeping... 0s
2020/12/19 07:47:53 querying...
2020/12/19 07:47:53 0: /Sample,sample001 &{Value:1 UpdatedAt:2020-12-18 09:35:39.055707 +0900 JST}
2020/12/19 07:47:53 1: /Sample,sample002 &{Value:2 UpdatedAt:2020-12-19 07:47:06.097679 +0900 JST}
2020/12/19 07:47:53 2: /Sample,sample003 &{Value:3 UpdatedAt:2020-12-19 07:46:56.47674 +0900 JST}
2020/12/19 07:47:53 3: /Sample,sample004 &{Value:4 UpdatedAt:2020-09-12 02:23:03.483699 +0900 JST}
2020/12/19 07:47:53 4: /Sample,sample005 &{Value:5 UpdatedAt:2020-09-12 02:23:03.483701 +0900 JST}
2020/12/19 07:47:53 5: /Sample,sample006 &{Value:6 UpdatedAt:2020-09-12 02:23:03.483704 +0900 JST}
2020/12/19 07:47:53 6: /Sample,sample007 &{Value:7 UpdatedAt:2020-09-12 02:23:03.483707 +0900 JST}
2020/12/19 07:47:53 7: /Sample,sample008 &{Value:8 UpdatedAt:2020-09-12 02:23:03.483709 +0900 JST}
2020/12/19 07:47:53 8: /Sample,sample009 &{Value:9 UpdatedAt:2020-09-12 02:23:03.483712 +0900 JST}
2020/12/19 07:47:53 9: /Sample,sample010 &{Value:10 UpdatedAt:2020-12-19 07:47:15.49936 +0900 JST}
2020/12/19 07:47:53 sleeping... 10s
2020/12/19 07:48:03 committing...
2020/12/19 07:48:03 done.
プロセス2
$ ./fstx -k sample005 -v 55 Put
2020/12/19 07:47:44 start
2020/12/19 07:47:45 sleeping... 0s
2020/12/19 07:47:45 putting...
2020/12/19 07:47:45 sleeping... 0s
2020/12/19 07:47:45 committing...
2020/12/19 07:47:45 done.
こちらもプロセス2のPutは、プロセス1のコミットを待機していません。
プロセス1はプロセス2のPutを跨いで繰り返しクエリしていますが同一スナップショットを参照しています。
Non-Repeatable Readがないことが確認できます。
繰り返しクエリ vs Put(insert)
プロセス1はRead-onlyトランザクションを利用してSleepを挟んで2回クエリを実行します。
プロセス2はプロセス1のSleepの前にプロセス1のクエリ範囲に含まれる新規KeyでPut&コミットします。
結果
プロセス1
$ ./fstx -sa 10s -r -d Query
2020/12/19 07:55:21 start
2020/12/19 07:55:21 sleeping... 0s
2020/12/19 07:55:21 querying...
2020/12/19 07:55:22 0: /Sample,sample001 &{Value:1 UpdatedAt:2020-12-18 09:35:39.055707 +0900 JST}
2020/12/19 07:55:22 1: /Sample,sample002 &{Value:2 UpdatedAt:2020-12-19 07:47:06.097679 +0900 JST}
2020/12/19 07:55:22 2: /Sample,sample003 &{Value:3 UpdatedAt:2020-12-19 07:46:56.47674 +0900 JST}
2020/12/19 07:55:22 3: /Sample,sample004 &{Value:4 UpdatedAt:2020-09-12 02:23:03.483699 +0900 JST}
2020/12/19 07:55:22 4: /Sample,sample005 &{Value:55 UpdatedAt:2020-12-19 07:47:45.248149 +0900 JST}
2020/12/19 07:55:22 5: /Sample,sample006 &{Value:6 UpdatedAt:2020-09-12 02:23:03.483704 +0900 JST}
2020/12/19 07:55:22 6: /Sample,sample007 &{Value:7 UpdatedAt:2020-09-12 02:23:03.483707 +0900 JST}
2020/12/19 07:55:22 7: /Sample,sample008 &{Value:8 UpdatedAt:2020-09-12 02:23:03.483709 +0900 JST}
2020/12/19 07:55:22 8: /Sample,sample009 &{Value:9 UpdatedAt:2020-09-12 02:23:03.483712 +0900 JST}
2020/12/19 07:55:22 9: /Sample,sample010 &{Value:10 UpdatedAt:2020-12-19 07:47:15.49936 +0900 JST}
2020/12/19 07:55:22 sleeping... 10s
2020/12/19 07:55:32 sleeping... 0s
2020/12/19 07:55:32 querying...
2020/12/19 07:55:32 0: /Sample,sample001 &{Value:1 UpdatedAt:2020-12-18 09:35:39.055707 +0900 JST}
2020/12/19 07:55:32 1: /Sample,sample002 &{Value:2 UpdatedAt:2020-12-19 07:47:06.097679 +0900 JST}
2020/12/19 07:55:32 2: /Sample,sample003 &{Value:3 UpdatedAt:2020-12-19 07:46:56.47674 +0900 JST}
2020/12/19 07:55:32 3: /Sample,sample004 &{Value:4 UpdatedAt:2020-09-12 02:23:03.483699 +0900 JST}
2020/12/19 07:55:32 4: /Sample,sample005 &{Value:55 UpdatedAt:2020-12-19 07:47:45.248149 +0900 JST}
2020/12/19 07:55:32 5: /Sample,sample006 &{Value:6 UpdatedAt:2020-09-12 02:23:03.483704 +0900 JST}
2020/12/19 07:55:32 6: /Sample,sample007 &{Value:7 UpdatedAt:2020-09-12 02:23:03.483707 +0900 JST}
2020/12/19 07:55:32 7: /Sample,sample008 &{Value:8 UpdatedAt:2020-09-12 02:23:03.483709 +0900 JST}
2020/12/19 07:55:32 8: /Sample,sample009 &{Value:9 UpdatedAt:2020-09-12 02:23:03.483712 +0900 JST}
2020/12/19 07:55:32 9: /Sample,sample010 &{Value:10 UpdatedAt:2020-12-19 07:47:15.49936 +0900 JST}
2020/12/19 07:55:32 sleeping... 10s
2020/12/19 07:55:42 committing...
2020/12/19 07:55:42 done.
プロセス2
$ ./fstx -k sample0033 -v 33 Put
2020/12/19 07:55:23 start
2020/12/19 07:55:24 sleeping... 0s
2020/12/19 07:55:24 putting...
2020/12/19 07:55:24 sleeping... 0s
2020/12/19 07:55:24 committing...
2020/12/19 07:55:24 done.
こちらもプロセス2のPutは、プロセス1のコミットを待機していません。
プロセス1はプロセス2のPutを跨いで繰り返しクエリしていますがプロセス1のPutしたエンティティは現れません。
ファントムリードがないことが確認できます。
罠
Firestoreとクライアントライブラリを使った場合のハマりやすい罠について記述します。
Putしたエンティティを自分でGet
一般的なRDBの場合、トランザクション内でINSERT/UPDATEしたレコードをSELECTした場合、更新後のレコードを参照しますが、Firestore(旧Datastoreも同様)の場合、最初に参照した時点のスナップショットを参照し続けますので要注意です。
func main() {
ctx := context.Background()
if _, err := client.RunInTransaction(ctx, func(tx *datastore.Transaction) error {
key := datastore.NameKey("Sample", "sample001", nil)
var entity fstx.SampleModel
if err := tx.Get(key, &entity); err != nil {
return err
}
log.Printf("before:%+v", entity)
entity.Value++
entity.UpdatedAt = time.Now()
if _, err := tx.Put(key, &entity); err != nil {
return err
}
if err := tx.Get(key, &entity); err != nil {
return err
}
log.Printf("before:%+v", entity)
return nil
}, datastore.MaxAttempts(1)); err != nil {
log.Fatal(err)
}
log.Printf("done.")
}
結果
$ ./putget
2020/12/19 11:22:05 before:{Value:1 UpdatedAt:2020-12-18 09:35:39.055707 +0900 JST}
2020/12/19 11:22:05 before:{Value:1 UpdatedAt:2020-12-18 09:35:39.055707 +0900 JST}
2020/12/19 11:22:05 done.
2度目のGetにPutの更新は反映されませんでした。
RunInTransactionは冪等に
RunInTransactionは競合エラーでリトライされる為、処理を冪等にする必要があります。
クエリの章のサンプルコードを再掲します。
func main() {
ctx := context.Background()
q := datastore.NewQuery("Sample").Limit(100)
total := 0
var cursor datastore.Cursor
for {
count := 0
var tmpCur datastore.Cursor
if _, err := client.RunInTransaction(ctx, func(tx *datastore.Transaction) (err error) {
q := q.Transaction(tx)
if cursor.String() != "" {
q = q.Start(cursor)
}
it := client.Run(ctx, q)
keys := make([]*datastore.Key, 0, 100)
entities := make([]*SampleModel, 0, 100)
for {
entity := &SampleModel{}
key, err := it.Next(entity)
if err != nil {
if err == iterator.Done {
break
}
return err
}
// update
entity.CreatedAt = time.Now()
keys = append(keys, key)
entities = append(entities, entity)
}
if len(keys) > 0 {
tx.PutMulti(keys, entities)
}
tmpCur, err = it.Cursor()
if err != nil {
return err
}
count = len(keys)
return nil
}); err != nil {
log.Fatal(err)
}
total += count
cursor = tmpCur
if count < 100 {
break
}
}
fmt.Printf("done. total:%d\n", total)
}
一度tmpCurという変数に入れてから、cursorに設定しています。
一見無駄な処理に見えますが何故こんなことをしているかと言うと、RunInTrasactionはconcurrent transactionエラーの場合にリトライが走る為、直接cursorに設定してしまうとリトライされた場合に処理がロールバックされたままカーソルだけ先に進んでしまうからです。
RunInTransactionに渡すコールバック関数は冪等にしておく必要があります。
またはMaxAttemtsオプションを1にすることでリトライを抑制できます。
トランザクション使ってない問題
GAE/Go 1st gen の appengine/datastoreパッケージの場合、GetやPutのAPIは共通で、context.Contextの状態によってトランザクション、非トランザクションが切り替わる仕様でした。
Client Libraryは少し仕様が変わって、トランザクションは*datastore.Transactionのメソッドとして切り出されています。
ある意味分かりやすくはなったのですが、今度はRunInTransactionの内部で非トランザクションなAPIを読んでしまうような間違いが起きやすいので注意です。
key := datastore.NameKey("Sample", "test", nil)
if _, err := client.RunInTransaction(ctx, func(tx *datastore.Transaction) error {
var entity fstx.SampleModel
if err := tx.Get(key, &entity); err != nil {
return err
}
entity.Value++
entity.UpdatedAt = time.Now()
if _, err := client.Put(ctx, key, &entity); err != nil {
return err
}
return nil
}, datastore.MaxAttempts(1)); err != nil {
log.Fatal(err)
}
↑はclient.Putを呼び出している為、自身のGetと競合してエラーとなります。
トランザクション内の強制終了
key := datastore.NameKey("Sample", "test", nil)
if _, err := client.RunInTransaction(ctx, func(tx *datastore.Transaction) error {
var entity fstx.SampleModel
if err := tx.Get(key, &entity); err != nil {
return err
}
panic("test")
}, datastore.MaxAttempts(1)); err != nil {
log.Fatal(err)
}
RunInTransactionがfalseを返すとトランザクションがロールバックされますが、panicの場合ロックを取得したまま処理が終了してしまいます。
そうなると一定時間ロックが開放されず他トランザクションが待たされることになるので極力panic起こさないようにする(もしくはrecoverしてerrorに変換する)ことをおすすめします。
ロック確保時間は一定ではなさそうですが、ドキュメントに Transactions expire after 270 seconds or if idle for 60 seconds.
とあるように数十秒程度でしょう。
まとめ
Firestoreになってトランザクション周りがかなり強化されました\(^o^)/
いろいろクセもありますが、うまく使えば幸せになれそうです。
本日の記事まとめておきます↓
- トランザクション内のデータ更新は、必ず全てが成功するか、全てが失敗する(Atomic)
- 旧Datastoreにあった25 Entity Groupsの制限は撤廃🎉
- ただしClient Libraryにはappengine APIにはない500 Entitiesという制限 (´・ω・`)
- トランザクション内でグローバルクエリが使える様に!🎉
- 最初にEntityをGetしたトランザクションがロックを確保し、他トランザクションの同じKeyのPutを含むコミットはその開放を待機
- トランザクション内で同じKeyでGet→Putしている場合、先行でGetしてロックを確保した他トランザクションがPut&コミットするとconcurrent transactionエラーとなる
- クエリはインデックスの範囲でロックを確保する
- トランザクション内でのGetやクエリは同じスナップショットを参照する。スナップショットはトランザクション開始時ではなく各Entityの参照時のもの
- Read-onlyトランザクションを使うとGETでロックがかからず他トランザクションはそのコミットを待機しない
参考記事
- Google Cloud Datastore API
-
Transaction Isolation in App Engine
- 旧Datastoreトランザクションの話