0
0

More than 1 year has passed since last update.

Couchbase Lite機能解説:Sync Gatewayを介したCouchbase Serverとの同期〜構成例

Posted at

はじめに

ここでは、Couchbase Liteのレプリケーション構成例をサンプルコードによって紹介します。

概要

Couchbase Liteは、Sync Gatewayとの組み合わせにより、モバイルアプリケーションとデーターセンター/クラウド上のCouchbase Serverデータベースの間の双方向同期(レプリケーション)のためのAPIサポートを提供します。

Sync Gatewayと対話するモジュールは「レプリケーター」と呼ばれます。

アプリケーションはレプリケーター(クライアント)を実行します。レプリケーターはSync Gateway(サーバー)との接続を開始し、データベース変更のレプリケーションに参加して、ローカルデータベース(Couchbase Lite)とリモートデータベース(Couchbase Server)の同期を実現します。

エグゼキューター

Couchbase Liteは、複数のエグゼキューターを起動します。

エグゼキューターは、非同期コールバックを処理するために、スレッドのプールと、キューを管理します。

エグゼキューターによって処理されるCouchbase Lite APIコールを以下に示します。

  • Query.addChangeListener
  • MessageEndpointListerner.addChangeListener
  • LiveQuery.addChangeListener
  • AbstractReplicator.addDocumentReplicationListener
  • AbstractReplicator.addChangeListener
  • Database.addChangeListener
  • Database.addDocumentChangeListener
  • Database.addDatabaseChangeListener
  • Database.addChangeListener

Couchbase Liteは、独自の内部エグゼキューターを使用して非同期クライアントコードを実行する場合があります。これは小さなタスクには問題ありませんが、大きなタスク(計算にかなりの時間がかかるタスク、またはI / Oを実行するタスク)は、Couchbase Liteの処理をブロックする可能性があります。これが発生した場合、アプリケーションはRejectedExecutionExceptionで失敗し、大きなタスクを実行するための別のエグゼキューターを作成する必要がある場合があります。

この後紹介するサンプルでは、クライアントコードで個別のエグゼキューターを指定する方法を示しています。
クライアントコードエグゼキューターを用いることにより、配信順序とスレッド数について、アプリケーション独自のポリシーを適用することができます。

構成例

順序保証配信

以下のサンプルコードは、同期メッセージ配信の順序を保証し、スペースを節約します
リスナーは、(少なくともこのコードに関する限り)スレッドセーフである必要はありません。
リスナーは、エグゼキューターのスレッドでのみ実行され、次の通話が始まる前に、コールから復帰する必要があります。リスナーの実行にかかる時間によって、イベントは遅れて配信される場合があります。


public class InOrderExample {
    private static final ExecutorService IN_ORDER_EXEC = Executors.newSingleThreadExecutor();

    public Replicator runReplicator(Database db1, Database db2, ReplicatorChangeListener listener)
        throws CouchbaseLiteException {
        ReplicatorConfiguration config = new ReplicatorConfiguration(db1, new DatabaseEndpoint(db2));
        config.setReplicatorType(ReplicatorConfiguration.ReplicatorType.PUSH_AND_PULL);
        config.setContinuous(false);

        Replicator repl = new Replicator(config);
        ListenerToken token = repl.addChangeListener(IN_ORDER_EXEC, listener::changed);

        repl.start();

        return repl;
    }
}

最大スループット

以下のサンプルコードは、スループットを最大化します。
CPUの可用性が許す限り迅速に変更通知を配信します。変更通知は、順不同で配信される場合があります。
リスナーは複数のスレッドから呼び出される可能性があるため、スレッドセーフである必要があります。
また、特定のリスナーが複数のスレッドで同時に実行されている可能性があるため、再入可能である必要があります。

通知がプロセッサを圧倒すると、プロセッサを待機している通知は、メモリとGCの影響を伴って、(Runnableではなく)スレッドとしてキューに入れられます。


public class MaxThroughputExample {
    private static final ExecutorService MAX_THROUGHPUT_EXEC = Executors.newCachedThreadPool();

    public Replicator runReplicator(Database db1, Database db2, ReplicatorChangeListener listener)
        throws CouchbaseLiteException {
        ReplicatorConfiguration config = new ReplicatorConfiguration(db1, new DatabaseEndpoint(db2));
        config.setReplicatorType(ReplicatorConfiguration.ReplicatorType.PUSH_AND_PULL);
        config.setContinuous(false);

        Replicator repl = new Replicator(config);
        ListenerToken token = repl.addChangeListener(MAX_THROUGHPUT_EXEC, listener::changed);

        repl.start();

        return repl;
    }
}

コールバック構成

以下のサンプルコードは、Couchbase Liteレプリケーターのコールバックシステムの構成可能性を示しています。

更新は、順不同で配信される可能性があり、リスナーは、スレッドセーフで再入可能である必要があります
(ただし、SynchronousQueueを使用して渡されたタスクを正しく同期します)。
ここに示されているスレッドプールエグゼキューターは、CPUあたりのスレッド数のスイートスポット用に構成されています。
アプリケーションでは、この単一のエグゼキューターをアプリケーション全体で使用することによって、アプリ全体の適切なスレッドポリシーが確立されます。
「実行拒否」が発生した場合(RejectedExecutionHandlerがコールされた場合)、無制限のキューを持つバックアップエグゼキューターを遅延的に作成します。


public class PolicyExample {
    private static final int CPUS = Runtime.getRuntime().availableProcessors();

    private static ThreadPoolExecutor BACKUP_EXEC;

    private static final RejectedExecutionHandler BACKUP_EXECUTION
        = new RejectedExecutionHandler() {
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            synchronized (this) {
                if (BACKUP_EXEC =  null) { BACKUP_EXEC = createBackupExecutor(); }
            }
            BACKUP_EXEC.execute(r);
        }
    };

    private static ThreadPoolExecutor createBackupExecutor() {
        ThreadPoolExecutor exec
            = new ThreadPoolExecutor(CPUS + 1, 2 * CPUS + 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
        exec.allowCoreThreadTimeOut(true);
        return exec;
    }

    private static final ThreadPoolExecutor STANDARD_EXEC
        = new ThreadPoolExecutor(CPUS + 1, 2 * CPUS + 1, 30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());

    static { STANDARD_EXEC.setRejectedExecutionHandler(BACKUP_EXECUTION); }

    public Replicator runReplicator(Database db1, Database db2, ReplicatorChangeListener listener)
        throws CouchbaseLiteException {
        ReplicatorConfiguration config = new ReplicatorConfiguration(db1, new DatabaseEndpoint(db2));
        config.setReplicatorType(ReplicatorConfiguration.ReplicatorType.PUSH_AND_PULL);
        config.setContinuous(false);

        Replicator repl = new Replicator(config);
        ListenerToken token = repl.addChangeListener(STANDARD_EXEC, listener::changed);

        repl.start();

        return repl;
    }
}

関連情報

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0