この記事はサンプルアプリ作成編で、ScalarDBのサンプルアプリを実行してみます。
環境構築編はこちら
ScalarDBの操作
ScalarDBの操作用コンポーネントはStorageコンポーネントとTransactionコンポーネントの2つがあります。
Storageコンポーネント
このコンポーネントは以下の機能を持っています。
- アトミックなCRUD操作
- 逐次一貫性のサポート
- 原子性/線形性条件下でのデータ変更(Create/Update/Delete)
- レコードごとにユーザ定義のメタデータを含める
また、ここでのKeyの役割はCassandraのPartition Key,Clustering Keyとの対応で、Listやカンマ区切りの形で複数定義が可能です。
さらに、ScalarDBのStorageコンポーネントはCRUD操作のために4つの関数を用意しています。
- Get:1件のエントリーを取得する
- Put:1件のエントリーを挿入/更新する
- Delete:1件のエントリーを削除する
- Scan:特定のパーティション内のエントリーを複数件取得する
Transactionコンポーネント
このコンポーネントは、Storageコンポーネントの機能に加えて、マスターレスのトランザクション機能を提供しています。
トランザクション機能を使用する場合、まずTransactionServiceインスタンスを取得する必要があります。そしてTransactionServiceインスタンスに対してGet,Put,Delete,Scanの操作を実行した後、TransactionServiceインスタンスをコミットして操作内容をDBに反映させます。
サンプルアプリ
サンプルアプリでScalarDBの実際のアプリケーションの動作を確認します。
アプリの概要
- ユーザごとに口座を持つ
- 口座に指定した金額をチャージ
- 口座から別の口座へ支払い
- 口座の残高を確認
これらの機能を持つサンプルアプリを実装します。
スキーマ定義
KEYSPACE:emoney
TABLE:account
カラム
・ group_id:グループID(Partition Key)
・ id:ユーザID(Clustering Key)
・ balance:ユーザの所持金額
処理の流れ
チャージ
- 指定したユーザの情報を取得
- 取得したユーザ情報にチャージ額を追加
- 更新した情報を書き込み
以上の一連の動作を1つのトランザクション内で実行します。
支払い
- 送金元、送金先情報を取得
- 送金元の残額が送金額よりも多いことを確認
- 送金元から送金額を減額し、送金先に送金額を増額
- 更新した情報を書き込み
以上の一連の動作を1つのトランザクション内で実行します。
残高確認
- 全てのユーザ情報を取得
- ループ処理でユーザ情報を1件ずつ取り出す
- ユーザ情報から残高を取得し、コンソールに表示させる
環境構築
動作に必要なアプリケーションは既にインストール済として、gradleを使って実行用jarファイルを取得します。
1.Scalar DB Schema Toolsへのパスを設定する
・.bash_profileにSchema Toolsへのパス設定を追加する
export SCHEMATOOL=/home/(自身の環境のユーザ名)/scalardb/tools/schema
・設定変更を反映させる
$ source ~/.bash_profile
2.サンプルプロジェクト用ディレクトリの設定
・ディレクトリを作成して移動
$ mkdir ~/scalardb_sample
$ cd ~/scalardb_sample
・gradle初期化のコマンドを実行
$ gradle init --type java-application
3.gradle定義ファイル作成
$ vi build.gradle
# mainClassNameの指定を変更(18行目)
# 変更前
mainClassName = 'App'
# 変更後
mainClassName = 'sample.emoney.ElectronicMoneyMain'
# dependenciesにScalar DB定義を追加(22行目)
dependencies {
// This dependency is found on compile classpath of this component and consumers.
compile 'com.google.guava:guava:23.0'
compile group: 'com.scalar-labs', name: 'scalardb', version: '1.0.0'
// Use JUnit test framework
testCompile 'junit:junit:4.12'
4.gradleで実行用jarファイルを取得
$ gradle build
BUILD SUCCESSFUL
と表示されればOK
実装
1.スキーマファイル作成
$ vi emoney.sdbql
REPLICATION FACTOR 1;
CREATE NAMESPACE emoney;
CREATE TRANSACTION TABLE emoney.account (
group_id TEXT PARTITIONKEY,
id TEXT CLUSTERINGKEY,
balance INT,
);
2.Schema Toolを利用して、Casssandraにスキーマを作成
$ $SCHEMATOOL/loader emoney.sdbql
cqlshでスキーマの確認
$ cqlsh
キースペースにemoneyがいることを確認
cqlsh> DESCRIBE KEYSPACES ;
emoney system_auth coordinator system_traces
system_schema system system_distributed
テーブルにaccountがいることを確認
cqlsh> use emoney;
cqlsh:emoney> DESCRIBE TABLES ;
account
cqlshを終了
cqlsh:emoney> exit
コンストラクタと関数を記述してあるクラスファイルと、引数を受け取って関数を呼び出す実行用クラスファイルの2つを作成します。
ディレクトリとjavaファイルを作成
$ mkdir -p src/main/java/sample/emoney
$ cd src/main/java/sample/emoney
$ vi ElectronicMoney.java
package sample.emoney;
//インポート追加
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.scalar.database.config.DatabaseConfig;
import com.scalar.database.service.StorageModule;
import com.scalar.database.service.StorageService;
import com.scalar.database.service.TransactionModule;
import com.scalar.database.service.TransactionService;
import java.io.File;
import java.io.IOException;
import com.scalar.database.api.DistributedTransaction;
import com.scalar.database.api.Get;
import com.scalar.database.api.Put;
import com.scalar.database.api.Delete;
import com.scalar.database.api.Result;
import com.scalar.database.io.IntValue;
import com.scalar.database.io.Key;
import com.scalar.database.io.TextValue;
import com.scalar.database.exception.storage.ExecutionException;
import com.scalar.database.exception.transaction.CommitException;
import com.scalar.database.exception.transaction.CrudException;
import com.scalar.database.exception.transaction.UnknownTransactionStatusException;
import java.util.Optional;
import com.scalar.database.api.Scan;
import com.scalar.database.api.Scanner;
public class ElectronicMoney {
// クラス変数を定義
private final String NAMESPACE = "emoney";
private final String TABLE_NAME = "account";
private final String ID = "id";
private final String GROUP_ID = "group_id";
private final String BALANCE = "balance";
private final StorageService storageService;
private final TransactionService transactionService;
// コンストラクタを実装
public ElectronicMoney() throws IOException {
File prop_file = new File("/etc/scalar/database.properties");
DatabaseConfig config = new DatabaseConfig(prop_file);
Injector injector = Guice.createInjector(new StorageModule(config));
storageService = injector.getInstance(StorageService.class);
storageService.with(NAMESPACE, TABLE_NAME);
injector = Guice.createInjector(new TransactionModule(config));
transactionService = injector.getInstance(TransactionService.class);
transactionService.with(NAMESPACE, TABLE_NAME);
}
public void charge(String groupId, String id, int amount) throws CrudException, CommitException, UnknownTransact ionStatusException {
//トランザクション開始
DistributedTransaction tx = transactionService.start();
Key partitionKey = new Key(new TextValue(GROUP_ID, groupId));
Key clusteringKey = new Key(new TextValue(ID, id));
Get get = new Get(partitionKey, clusteringKey);
Optional<Result> result = tx.get(get);
int balance = amount;
if (result.isPresent()) {
int current = ((IntValue) result.get().getValue(BALANCE).get()).get();
balance += current;
}
// 残高を更新
Put put = new Put(partitionKey, clusteringKey).withValue(new IntValue(BALANCE, balance));
tx.put(put);
// トランザクションコミット
tx.commit();
}
public void pay(String groupId, String fromId, String toId, int amount) throws CrudException, CommitException, UnknownTransactionStatusException {
// トランザクション開始
DistributedTransaction tx = transactionService.start();
// 送金元、送金先のアカウント情報を取得
Key partitionKey = new Key(new TextValue(GROUP_ID, groupId));
Key fromKey = new Key(new TextValue(ID, fromId));
Key toKey = new Key(new TextValue(ID, toId));
Get fromGet = new Get(partitionKey, fromKey);
Get toGet = new Get(partitionKey, toKey);
Optional<Result> fromResult = tx.get(fromGet);
Optional<Result> toResult = tx.get(toGet);
if (!fromResult.isPresent()) {
throw new RuntimeException(fromId + " doesn't exist.");
}
if (!toResult.isPresent()) {
throw new RuntimeException(toId + " doesn't exist.");
}
int newFromBalance = ((IntValue) (fromResult.get().getValue(BALANCE).get())).get() - amount;
int newToBalance = ((IntValue) (toResult.get().getValue(BALANCE).get())).get() + amount;
if (newFromBalance < 0) {
throw new RuntimeException(fromId + " doesn't have enough balances.");
}
// 残高を更新
Put fromPut = new Put(partitionKey, fromKey).withValue(new IntValue(BALANCE, newFromBalance));
Put toPut = new Put(partitionKey, toKey).withValue(new IntValue(BALANCE, newToBalance));
tx.put(fromPut); tx.put(toPut);
// トランザクションコミット
tx.commit();
}
public void balances(String groupId) throws ExecutionException {
Key partitionKey = new Key(new TextValue(GROUP_ID, groupId));
Scan scan = new Scan(partitionKey);
Scanner scanner = storageService.scan(scan);
scanner.forEach(r -> {
r.getValue(ID).ifPresent(v -> System.out.print(((TextValue) v).getString().get()));
System.out.print(" : ");
r.getValue(BALANCE).ifPresent(v -> System.out.println(((IntValue) v).get()));
});
}
public void deleteUser(String groupId, String id) throws CrudException, CommitException, UnknownTransactionStatu sException {
// トランザクション開始
DistributedTransaction tx = transactionService.start();
Key partitionKey = new Key(new TextValue(GROUP_ID, groupId));
Key clusteringKey = new Key(new TextValue(ID, id));
Get get = new Get(partitionKey, clusteringKey);
Optional<Result> result = tx.get(get);
if (!result.isPresent()) {
tx.abort();
return;
}
// 残高を更新
Delete delete = new Delete(partitionKey, clusteringKey);
tx.delete(delete);
// トランザクションコミット
tx.commit();
}
public void close() {
storageService.close();
transactionService.close();
}
}
$ vi ElectronicMoneyMain.java
package sample.emoney;
public class ElectronicMoneyMain {
public static void main(String[] args) throws Exception {
String action = null;
int amount = 0;
String group = null;
String to = null;
String from = null;
for (int i = 0; i < args.length; ++i) {
if ("-action".equals(args[i])) {
action = args[++i];
} else if ("-group".equals(args[i])) {
group = args[++i];
} else if ("-amount".equals(args[i])) {
amount = Integer.parseInt(args[++i]);
} else if ("-to".equals(args[i])) {
to = args[++i];
} else if ("-from".equals(args[i])) {
from = args[++i];
} else if ("-help".equals(args[i])) {
printUsageAndExit();
}
}
ElectronicMoney eMoney = new ElectronicMoney();
if (action.equalsIgnoreCase("charge")) {
eMoney.charge(group, to, amount);
} else if (action.equalsIgnoreCase("pay")) {
if (from == null) {
printUsageAndExit();
}
eMoney.pay(group, from, to, amount);
} else if (action.equalsIgnoreCase("balances")) {
eMoney.balances(group);
} else if (action.equalsIgnoreCase("delete")) {
eMoney.deleteUser(group, to);
}
eMoney.close();
}
private static void printUsageAndExit() {
System.err.println(
"ElectronicMoneyMain -action charge/pay/balances/delete -group id -to id [-amount number (needed for charge/pay)] [-from id (needed for pay)]"
);
System.exit(1);
}
}
実行
1.チャージ
user1に1000チャージ
$ gradle run --args="-action charge -amount 1000 -group groupA -to user1"
user2に0チャージ
$ gradle run --args="-action charge -amount 0 -group groupA -to user2"
user1,user2の残高を確認
$ gradle run --args="-action balances -group groupA"
2.支払い
user1からuser2へ300支払い
$ gradle run --args="-action pay -amount 300 -group groupA -to user2 -from user1"
user1とuser2の残高を確認
$ gradle run --args="-action balances -group groupA"
3.ユーザ削除
user1を削除
$ gradle run --args="-action delete -group groupA -to user1"
user2の残高を確認し、user1が削除されていることを確認
$ gradle run --args="-action balances -group groupA"
4.トランザクション機能の確認
トランザクション機能が働いていることを確認するために、ElectronicMoney.java
を修正します。
$ vi ElectronicMoney.java
public void pay(String groupId, String fromId, String toId, int amount) throws CrudException, CommitException, UnknownTransactionStatusException {
// トランザクション開始
DistributedTransaction tx = transactionService.start();
// 送金元、送金先のアカウント情報を取得
Key partitionKey = new Key(new TextValue(GROUP_ID, groupId));
Key fromKey = new Key(new TextValue(ID, fromId));
Key toKey = new Key(new TextValue(ID, toId));
Get fromGet = new Get(partitionKey, fromKey);
Get toGet = new Get(partitionKey, toKey);
Optional<Result> fromResult = tx.get(fromGet);
Optional<Result> toResult = tx.get(toGet);
if (!fromResult.isPresent()) {
throw new RuntimeException(fromId + " doesn't exist.");
}
if (!toResult.isPresent()) {
throw new RuntimeException(toId + " doesn't exist.");
}
int newFromBalance = ((IntValue) (fromResult.get().getValue(BALANCE).get())).get() - amount;
int newToBalance = ((IntValue) (toResult.get().getValue(BALANCE).get())).get() + amount;
if (newFromBalance < 0) {
throw new RuntimeException(fromId + " doesn't have enough balances.");
}
// 残高を更新
Put fromPut = new Put(partitionKey, fromKey).withValue(new IntValue(BALANCE, newFromBalance));
// ----------------ここから追加行-----------------------
// 実験用に必ずエラーをthrowする文の追加
if (newFromBalance >= 0){
throw new RuntimeException("test error.");
}
// ----------------ここまで追加行-----------------------
// 送金先の残高は更新されない
Put toPut = new Put(partitionKey, toKey).withValue(new IntValue(BALANCE, newToBalance));
tx.put(fromPut); tx.put(toPut);
// トランザクションコミット
tx.commit();
}
user1に1000チャージ
$ gradle run --args="-action charge -amount 1000 -group groupA -to user1"
user2に0チャージ
$ gradle run --args="-action charge -amount 0 -group groupA -to user2"
実行前の残高を確認しておく
gradle run --args="-action balances -group groupA"
送金処理を実行してみる
gradle run --args="-action pay -amount 300 -group groupA -to user2 -from user1"
※エラーメッセージ("test error.")が表示されたことを確認したら、Ctrl+Cで終了する。
残高を確認する
gradle run --args="-action balances -group groupA"
トランザクション機能が働いているので、送金元の残高だけ減っていることもなく、実行前と同じ残高が表示されます。
サンプルアプリ実行は以上となります。お読みいただきありがとうございました。