LoginSignup
0
0

More than 3 years have passed since last update.

リアクティブプログラミングをNoSQLで「体験」してみる(Couchbase Reactive APIに触れる)

Last updated at Posted at 2020-12-23

はじめに

プログラミング言語を巡って、常に新しいプログラミングパラダイムが生まれています。
ここでは、リアクティブプログラミングという新しいパラダイムがテーマになります(ちなみに、プログラミングパラダイムについての日本語のWikipediaでは、リアクティブプログラミングが、リストの最後に〜つまり、最も新しいものとして〜掲載されていました)。

さて、リアクティブプログラミングに関心を持った初学者が、リアクティブプログラミングを学ぶに際して、どういったアプローチが考えられるでしょうか?

手始めとして、まず概念的に理解することが考えられます。インターネットを「リアクティブプログラミング」というキーワードで検索すると、「リアクティブプログラミングとは(何か)」といった記事が、まず目につきます。

そして、実際にリアクティブプログラミングのフレームワークを使って、プログラミングすること、が次に思いつくのではないかと思います。

しかし、ここで多くの人が躓くことが想像できます。結局何を作れば良いのかがわからない、という状況になってしまいがちで、あるいは、フレームワークのチュートリアルを読み、数行のプログラムを書いてみたりはしたものの、それ以上先に進めないということもあるかもしれません。

ここでは、まずリアクティブフレームワークを用いて実装されたAPIを利用することで、リアクティブプログラミングを「体験」してみることにしたいと思います。どんな体験かといえば、(NoSQL)データベースにおけるデータ操作を題材にすることで、大規模データ取り扱い時の挙動という、リアクティブ(非ブロックAPI)と非リアクティブ(ブロックAPI)との違いを体感することを意図しています。

少し脱線になるかもしれませんが、過去には、WEBアプリケーション(やWEBブラウザ)というものが新しいものであった時代がありました。この時に、一度もWEBアプリケーションをユーザとして使ったことがない人が、WEBアプリケーションを開発したり、ましてやWEBアプリケーションの概念を学んだり、といったことがあったとしたら、それはかなり特殊な状況であることが分かるかと思います。
そういった意味でも、ここでのアプローチには、一定の意味があるのではないかと考えます。

体験のための素材の選択

リアクティブ(非ブロック)APIを提供しているデータベースであれば、何を使っても良いところですが、ここではドキュメント指向NoSQLデータベースである、Couchbase Serverを題材として用います。プログラミング言語としては、Javaを用いることにします。

体験の際の手法

リアクティブ(非ブロック)APIと、ブロック(非リアクティブ)APIの両方を使って、同じ処理を実行し、挙動の違いを観察します。

プログラム解説

基底クラス

リアクティブおよびブロックAPIの両方で共通する処理を基底クラスとして定義しました。

特にCouchbaseの知見がない人であって、データベースに接続し、RDBのテーブルに類するデータ格納先(Bucketの中のCollection)を取得していることはイメージしやすいかと思います。

この後で用いる、データの登録もこのクラスで実行しています(upsertメソッドを用い、実行時常に新規追加あるいは上書きします)。今回は、データ取得の挙動の違いを見ることにフォーカスするため、このupsertの処理については、旧来のブロックAPIを用いています。

import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Collection;
import com.couchbase.client.java.json.JsonObject;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

public abstract class Base {

    protected final Cluster cluster;
    protected final Bucket bucket;
    protected final Collection collection;

    public static final String bucketName = "default";
    public static final String userName = "Administrator";
    public static final String userPass = "password";
    public static final String seedNode = "127.0.0.1";

    public int NUM_KEY = 20000;
    protected List<String> keys = new ArrayList<String>(NUM_KEY);
    protected List<Object> listResults = new ArrayList<Object>(NUM_KEY);


    public Base() {
        cluster = Cluster.connect(seedNode, userName, userPass);
        bucket = cluster.bucket(bucketName);
        collection = bucket.defaultCollection();
        bucket.waitUntilReady(Duration.ofSeconds(30));
        prepKeys();
        prepDocuments();
    }

    private void prepKeys() {
        for (int i = 0; i < NUM_KEY; i++) {
            keys.add(String.valueOf(i));
        }
    }
    private void prepDocuments() {
        for (String key : keys) {
            JsonObject content = JsonObject.create().put("key", String.valueOf(key));
            collection.upsert(key, content);
        }
    }
    private void disconnect() {
        cluster.disconnect();
    }

    public void exec() {
        long startTime = System.currentTimeMillis();
        List<Object> listResults = get();
        long endTime = System.currentTimeMillis();

        System.out.println("先頭データ:" + listResults.get(0));
        System.out.println("末尾データ:" + listResults.get(NUM_KEY -1));
        System.out.println("取得サイズ:" + listResults.size());
        System.out.println("経過時間(ミリ秒):" + (endTime - startTime));
        disconnect();
    }

    protected abstract List<Object> get();

ブロックAPI使用クラス

基底クラスを継承し、アブストラクトメソッド(get)をブロックAPIを用いて実装しています。

import java.util.List;

import com.couchbase.client.java.kv.GetResult;

public class Sequential extends Base {

    @Override
    protected List<Object> get() {  
        for (String key : keys) {
            GetResult result = collection.get(String.valueOf(key));
            listResults.add(result);
        }
        return listResults;
    }

    public static void main(String[] args) {
        new Sequential().exec();
    }
}

リアクティブAPI使用クラス

基底クラスを継承し、アブストラクトメソッド(get)をリアクティブAPIを用いて実装しています。

import java.util.List;

import com.couchbase.client.java.ReactiveCollection;
import reactor.core.publisher.Flux;

public class Concurrent extends Base {

    @Override
    protected List<Object>  get() {
        ReactiveCollection reactiveCollection = collection.reactive();

        Flux<Object> resultFlux = Flux.fromArray(keys.toArray())
            .flatMap( k -> reactiveCollection.get(String.valueOf(k)));

        List<Object> listResults = resultFlux.collectList().block();
        return listResults;
    }

    public static void main(String[] args) {
        new Concurrent().exec();
    }
}

実行結果

以下は、それぞれのクラスの実行結果です。

同期API

先頭データ:GetResult{content={"key":"0"}, flags=0x2000000, cas=0x165349f2729e0000, expiry=Optional.empty}
末尾データ:GetResult{content={"key":"19999"}, flags=0x2000000, cas=0x165349f307a30000, expiry=Optional.empty}
取得サイズ:20000
経過時間(ミリ秒):1201

非同期API

先頭データ:GetResult{content={"key":"0"}, flags=0x2000000, cas=0x16534a84e29c0000, expiry=Optional.empty}
末尾データ:GetResult{content={"key":"19999"}, flags=0x2000000, cas=0x16534a856f4e0000, expiry=Optional.empty}
取得サイズ:20000
経過時間(ミリ秒):302

MacラップトップにCouchbase Serverをインストールし、同じくラップトップ上でプログラムを実行していますが、私の環境では、ざっと4倍の速度の違いが見られました。

今回のケースでは、取得したデータをリストに納める際にブロック処理しているので、より非ブロック処理を生かすことができるユースケースであれば、さらに高い効率化が望めると思われます。

最後に

今回は、プログラミング内容にフォーカスして説明しました。
少しでも、興味をもたれた方は、今回掲載したソースコードを実際に実行して見られたり、ご自身で、さらに進んだデータ操作に挑んでみていただければ、幸甚です。

Couchbase Serverそのものの説明や導入については、本記事の範囲ではないため、割愛せざるを得ませんでしたが、手始めに動かしてみるということであれば、Couchbase Serverのインストールや管理コンソールからの操作は、拍子抜けするほど簡単ですので、ご関心に応じ、実行していただければと思います。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0