はじめに
ここでは、Couchbase Liteのレプリケーターについて解説します。
なお、Couchbase Mobileについては、Couchbase Mobileアプリケーション開発へのロードマップに記事をまとめている他、(これらの記事を元に構成した)以下の電子書籍を無償で頒布しています。
また、Couchbase Mobileは、Couchbase LiteとCouchbase Serverとのデータ同期機能を提供します。Couchbase Serverの存在意義、機能詳細、利用方法等については、拙著NoSQLドキュメント指向データベースCouchbase Serverファーストステップガイド(インプレスR&D刊)や、NoSQL/JSONデータベースCouchbase Server理解・活用へのロードマップにまとめてある記事をご参考ください。
レプリケーター解説
概要
Couchbase Liteは、Sync Gatewayとの組み合わせにより、モバイルアプリケーションとデーターセンター/クラウド上のCouchbase Serverデータベースとの間で、データの双方向同期(「レプリケーション」)を行うことができます。
ここでは、「レプリケーション」のためのAPIについて解説します。
Sync Gatewayと対話するモジュールは「レプリケーター」と呼ばれます。
アプリケーションはレプリケーター(クライアント)を実行します。レプリケーターはSync Gateway(サーバー)との接続を開始し、データベース変更のレプリケーションに参加して、ローカルデータベース(Couchbase Lite)とリモートデータベース(Couchbase Server)の同期を実現します。
初期化と起動
Couchbase Serverと同期するCouchbase Liteデータベースインスタンスごとにレプリケーターを構成して初期化します。
次のコードは、構成と初期化のプロセスを示しています。
// ローカルDBと、リモートDBのエンドポイントを指定してReplicatorConfigurationを生成
final ReplicatorConfiguration thisConfig
= new ReplicatorConfiguration(
thisDB,
URLEndpoint(URI("wss://listener.com:8954")));
// レプリケーションのタイプを(プッシュアンドプルに)指定
thisConfig.setReplicatorType(
ReplicatorConfiguration.ReplicatorType.PUSH_AND_PULL);
// ベーシック認証利用
final BasicAuthenticator thisAuth
= new BasicAuthenticator(
"Our Username",
"Our PasswordValue"));
thisConfig.setAuthenticator(thisAuth)
// レプリケーターを生成
// ※ GCされないためには、リファレンスを保持する必要があります。
final Replicator thisReplicator = new Replicator(thisConfig);
// レプリケーターを開始
thisReplicator.start();
レプリケーションのモニタリング
チェンジリスナーとreplication.status.activity
プロパティの組み合わせを使用して、レプリケーションのステータスを監視できます。これにより、たとえば、レプリケーションがアクティブにデータを転送しているのかどうかを知ることができます。
また、ドキュメントの変更を監視するためにリスターを利用することもできます。
チェンジリスナー
リスナーを使用して、変更を監視し、同期の進行状況を通知します。レプリケーターはいつでもリスナーを追加および変更でき、登録された時点からの変更を報告します。
Replicator
クラスを使用して、Replicator
へのコールバックとしてチェンジリスナーを追加します(addChangeListener())
—例12を参照してください。その後、状態の変化が非同期で通知されます。
リスナーの削除
後でリスナーを削除できるように、トークンを保存することを忘れないでください。
removeChangeListener(ListenerToken token)
を使用して変更リスナーを削除できます。
レプリケーターステータス
レプリケーターには、レプリケーターの状態を確認するためのアクセサーが備わっています。
次のAPIが利用可能です。
-
getActivityLevel()
: 後述のレプリケーションステータスを参照 getProgress()
-
completed
: 完了した変更の総数 -
total
: 処理される変更の総数 -
getError()
: 現在のエラー(ある場合)
利用例:getError
ListenerToken thisListener =
new thisReplicator.addChangeListener(change -> {
final CouchbaseLiteException err =
change.getStatus().getError();
if (err != null) {
Log.i(TAG, "Error code :: " + err.getCode(), e);
}
});
利用例:getActivityLevel()
、getStatus().getProgess().getTotal()
Log.i(TAG, "The Replicator is currently " +
thisReplicator.getStatus().getActivityLevel());
Log.i(TAG, "The Replicator has processed " + t);
if (thisReplicator.getStatus().getActivityLevel() ==
Replicator.ActivityLevel.BUSY) {
Log.i(TAG, "Replication Processing");
Log.i(TAG, "It has completed " +
thisReplicator.getStatus().getProgess().getTotal() +
" changes");
}
レプリケーションステータス
以下は、APIで報告されるさまざまな状態またはアクティビティレベルとそれぞれの意味を示しています。
-
STOPPED
レプリケーションが終了、あるいは致命的なエラーが発生しています。 -
OFFLINE
リモートホストに到達できないため、レプリケータはオフラインです。 -
CONNECTING
レプリケーターはリモートホストに接続しています。 -
IDLE
レプリケーションは、サーバーから利用可能なすべての変更に追いついている状態。IDLE
は継続的レプリケーションの場合に使用されるステータスです。 -
BUSY
レプリケーションはアクティブにデータを転送しています。
レプリケーションステータスとアプリのライフサイクル
Couchbase Mobileレプリケーションは、リモートシステムまたはアプリケーションが接続を終了しない限り、アプリが終了するまで実行を継続します。
Android OSが警告なしにアプリケーションを強制終了する可能性があることに留意する必要があります。レプリケーションプロセスに干渉する可能性のあるソケット接続がOSによって閉じられるのを防ぐために、レプリケーションプロセスが役に立たなくなった(たとえば、SUSPENDED
またはIDE¥LE
)場合には、レプリケーターを明示的に停止する必要があります。
ドキュメント変更監視
レプリケーション中にドキュメントの更新を登録することを選択できます。
次のコードは、変数によって参照されるレプリケータによって実行されるドキュメントレプリケーションを監視するリスナーを登録します。送受信された各ドキュメントのドキュメントIDを出力します。
ListenerToken token = replicator.addDocumentReplicationListener(replication -> {
Log.i(TAG, "Replication type: " + replication.isPush( ? "Push" : "Pull"));
for (ReplicatedDocument document : replication.getDocuments()) {
Log.i(TAG, "Doc ID: " + document.getID());
CouchbaseLiteException err = document.getError();
if (err != null) {
// There was an error
Log.e(TAG, "Error replicating document: ", err);
return;
}
if (document.flags().contains(DocumentFlag.DocumentFlagsDeleted)) {
Log.i(TAG, "Successfully replicated a deleted document");
}
}
});
replicator.start();
次のコードは、前の例のトークンを使用してリスナーを停止する方法を示しています。
replicator.removeChangeListener(token);
ドキュメントアクセス権取り消し
Sync Gatewayでドキュメントへのアクセス権が取り消されると、ドキュメントレプリケーションリスナーはAccessRemoved
フラグがtrue
に設定された通知を送信します。その後、データベースからドキュメントを削除します。
プッシュ保留中のドキュメント
次のような、プッシュ保留中のドキュメントが待機しているかどうかを確認することのできるAPIが用意されています。
-
Replicator.getPendingDocumentIds()
メソッドは、ローカルで変更されているが、まだサーバーにプッシュされていないドキュメントIDのリストを返します。これは、プッシュ同期の進行状況を追跡したり、アプリがエンドユーザーにステータスを視覚的に示したり、いつ安全に終了できるかを判断したりするのに役立ちます。 -
Replicator.isDocumentPending(docId)
メソッドは、個々のドキュメントがプッシュを保留しているかどうかをすばやく確認するために利用することができます。
以下に利用例を示します。
private void onStatusChanged(
@NonNull final Set<String> pendingDocs,
@NonNull final Replicator.Status status) {
// ... sample onStatusChanged function
//
Log.i(TAG,
"Replicator activity level is " + status.getActivityLevel().toString());
// iterate and report-on previously
// retrieved pending docids 'list'
for (Iterator<String> itr = pendingDocs.iterator(); itr.hasNext(); ) {
final String docId = itr.next();
try {
if (!replicator.isDocumentPending(docId)) { continue; }
itr.remove();
Log.i(TAG, "Doc ID " + docId + " has been pushed");
}
catch (CouchbaseLiteException e) {
Log.w(TAG, "isDocumentPending failed", e); }
}
}
レプリケーション停止
レプリケーションの停止はstop()
を使用して行われます。
これは非同期操作でため、必ずしも即時ではありません。アプリは、後続の操作を試行する前に、この潜在的な遅延を考慮する必要があります。
thisReplicator.stop();
レプリケーションが停止すると、アクティブな変更リスナーはすべて停止します。
ネットワークエラー発生時の挙動
レプリケーターがネットワークエラーを検出した場合、エラーのタイプ(永続的または一時的)に応じてその状態を更新し、適切なHTTPエラーコードを返します。
恒久的なネットワークエラーの場合
(たとえば、「404 not found」、または「401 unauthorized」のような) 恒久的なネットワークエラーの場合、レプリケーターはステータスをSTOPPED
に設定します。そして、setContinuous
が真または偽であるかどうかにかかわらずレプリケーターは停止します。
回復可能なエラーまたは一時的なエラーの場合
レプリケーターはステータスをOFFLINE
に設定します。
さらに、以下のようにsetContinuous
設定の違いによって、挙動が異なります。
-
setContinuous=true
:接続を無期限に再試行します -
setContinuous=false
:一定回数、接続を再試行します。
次のエラーコードは、レプリケーターによって一時的なものと見なされるため、接続の再試行が行われます。
- 408: Request Timeout
- 429: Too Many Requests
- 500: Internal Server Error
- 502: Bad Gateway
- 503: Service Unavailable
- 504: Gateway Timeout
- 1001: DNS resolution error
通信プロトコル
Couchbase Mobileは、データを送信するための通信プロトコルとしてWebSocketを使用します。
デフォルトでは、WebSocketプロトコルは通信データを圧縮し、速度と帯域幅の使用率を最適化します。
圧縮レベルはSync Gatewayで設定され、構成プロパティ(replicator_compression
)で調整できます。
ログ設定
レプリケーションに関連するログ出力レベルを設定することができます。
Database.setLogLevel(LogDomain.REPLICATOR, LogLevel.VERBOSE);
詳細は下記のドキュメントを参照してください。
プログラミング上の注意
バックグラウンドスレッドの使用
他のネットワークまたはファイルI/Oアクティビティと同様、Couchbase LiteアクティビティはUIスレッドではなく、常にバックグラウンドスレッドを使用する必要があります。
開発時等に利用可能な設定
チェックポイント開始
レプリケーターは、チェックポイントを使用して、ターゲットデータベースに送信されたドキュメントを追跡します。
この機能は通常の運用において、アプリケーション開発者が意識する必要はありません。
ただし、レプリケーションを強制的にゼロから再開する場合は、レプリケーターを開始するときにチェックポイントリセット引数を使用することができます。
チェックポイントをリセットした場合、Couchbase Liteは、過去に行われたレプリケーションで対象データベースの一部またはすべてが既にレプリケートされている場合でも、データベース全体をレプリケートします。
以下の例では、start
メソッドのリセットオプションをtrue
に設定しています。
デフォルト('false')は、引数として明示的に使用する必要はありません。
if (resetCheckpointRequired) {
replicator.start(true);
}
Kotlin開発
FlowsとLiveDataの使用
Android Kotlin開発者は、FlowsとLiveDataを利用してレプリケーターを監視できます。
val replState: LiveData<ReplicatorActivityLevel> = argRepl.replicatorChangesFlow()
.map { it.status.activityLevel }
.asLiveData()
関連情報