はじめに
ここでは、Couchbase Liteのレプリケーション構成例をサンプルコードによって紹介します。
概要
Couchbase Liteは、Sync Gatewayとの組み合わせにより、モバイルアプリケーションとデーターセンター/クラウド上のCouchbase Serverデータベースの間の双方向同期(レプリケーション)のためのAPIサポートを提供します。
Sync Gatewayと対話するモジュールは「レプリケーター」と呼ばれます。
アプリケーションはレプリケーター(クライアント)を実行します。レプリケーターはSync Gateway(サーバー)との接続を開始し、データベース変更のレプリケーションに参加して、ローカルデータベース(Couchbase Lite)とリモートデータベース(Couchbase Server)の同期を実現します。
エグゼキューター
Couchbase Liteは、複数のエグゼキューターを起動します。
エグゼキューターは、非同期コールバックを処理するために、スレッドのプールと、キューを管理します。
エグゼキューターによって処理されるCouchbase Lite APIコールを以下に示します。
- Query.addChangeListener
- MessageEndpointListerner.addChangeListener
- LiveQuery.addChangeListener
- AbstractReplicator.addDocumentReplicationListener
- AbstractReplicator.addChangeListener
- Database.addChangeListener
- Database.addDocumentChangeListener
- Database.addDatabaseChangeListener
- Database.addChangeListener
Couchbase Liteは、独自の内部エグゼキューターを使用して非同期クライアントコードを実行する場合があります。これは小さなタスクには問題ありませんが、大きなタスク(計算にかなりの時間がかかるタスク、またはI / Oを実行するタスク)は、Couchbase Liteの処理をブロックする可能性があります。これが発生した場合、アプリケーションはRejectedExecutionException
で失敗し、大きなタスクを実行するための別のエグゼキューターを作成する必要がある場合があります。
この後紹介するサンプルでは、クライアントコードで個別のエグゼキューターを指定する方法を示しています。
クライアントコードエグゼキューターを用いることにより、配信順序とスレッド数について、アプリケーション独自のポリシーを適用することができます。
構成例
順序保証配信
以下のサンプルコードは、同期メッセージ配信の順序を保証し、スペースを節約します
リスナーは、(少なくともこのコードに関する限り)スレッドセーフである必要はありません。
リスナーは、エグゼキューターのスレッドでのみ実行され、次の通話が始まる前に、コールから復帰する必要があります。リスナーの実行にかかる時間によって、イベントは遅れて配信される場合があります。
public class InOrderExample {
private static final ExecutorService IN_ORDER_EXEC = Executors.newSingleThreadExecutor();
public Replicator runReplicator(Database db1, Database db2, ReplicatorChangeListener listener)
throws CouchbaseLiteException {
ReplicatorConfiguration config = new ReplicatorConfiguration(db1, new DatabaseEndpoint(db2));
config.setReplicatorType(ReplicatorConfiguration.ReplicatorType.PUSH_AND_PULL);
config.setContinuous(false);
Replicator repl = new Replicator(config);
ListenerToken token = repl.addChangeListener(IN_ORDER_EXEC, listener::changed);
repl.start();
return repl;
}
}
最大スループット
以下のサンプルコードは、スループットを最大化します。
CPUの可用性が許す限り迅速に変更通知を配信します。変更通知は、順不同で配信される場合があります。
リスナーは複数のスレッドから呼び出される可能性があるため、スレッドセーフである必要があります。
また、特定のリスナーが複数のスレッドで同時に実行されている可能性があるため、再入可能である必要があります。
通知がプロセッサを圧倒すると、プロセッサを待機している通知は、メモリとGCの影響を伴って、(Runnableではなく)スレッドとしてキューに入れられます。
public class MaxThroughputExample {
private static final ExecutorService MAX_THROUGHPUT_EXEC = Executors.newCachedThreadPool();
public Replicator runReplicator(Database db1, Database db2, ReplicatorChangeListener listener)
throws CouchbaseLiteException {
ReplicatorConfiguration config = new ReplicatorConfiguration(db1, new DatabaseEndpoint(db2));
config.setReplicatorType(ReplicatorConfiguration.ReplicatorType.PUSH_AND_PULL);
config.setContinuous(false);
Replicator repl = new Replicator(config);
ListenerToken token = repl.addChangeListener(MAX_THROUGHPUT_EXEC, listener::changed);
repl.start();
return repl;
}
}
コールバック構成
以下のサンプルコードは、Couchbase Liteレプリケーターのコールバックシステムの構成可能性を示しています。
更新は、順不同で配信される可能性があり、リスナーは、スレッドセーフで再入可能である必要があります
(ただし、SynchronousQueue
を使用して渡されたタスクを正しく同期します)。
ここに示されているスレッドプールエグゼキューターは、CPUあたりのスレッド数のスイートスポット用に構成されています。
アプリケーションでは、この単一のエグゼキューターをアプリケーション全体で使用することによって、アプリ全体の適切なスレッドポリシーが確立されます。
「実行拒否」が発生した場合(RejectedExecutionHandlerがコールされた場合)、無制限のキューを持つバックアップエグゼキューターを遅延的に作成します。
public class PolicyExample {
private static final int CPUS = Runtime.getRuntime().availableProcessors();
private static ThreadPoolExecutor BACKUP_EXEC;
private static final RejectedExecutionHandler BACKUP_EXECUTION
= new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
synchronized (this) {
if (BACKUP_EXEC = null) { BACKUP_EXEC = createBackupExecutor(); }
}
BACKUP_EXEC.execute(r);
}
};
private static ThreadPoolExecutor createBackupExecutor() {
ThreadPoolExecutor exec
= new ThreadPoolExecutor(CPUS + 1, 2 * CPUS + 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
exec.allowCoreThreadTimeOut(true);
return exec;
}
private static final ThreadPoolExecutor STANDARD_EXEC
= new ThreadPoolExecutor(CPUS + 1, 2 * CPUS + 1, 30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
static { STANDARD_EXEC.setRejectedExecutionHandler(BACKUP_EXECUTION); }
public Replicator runReplicator(Database db1, Database db2, ReplicatorChangeListener listener)
throws CouchbaseLiteException {
ReplicatorConfiguration config = new ReplicatorConfiguration(db1, new DatabaseEndpoint(db2));
config.setReplicatorType(ReplicatorConfiguration.ReplicatorType.PUSH_AND_PULL);
config.setContinuous(false);
Replicator repl = new Replicator(config);
ListenerToken token = repl.addChangeListener(STANDARD_EXEC, listener::changed);
repl.start();
return repl;
}
}
関連情報