search
LoginSignup
0
Help us understand the problem. What are the problem?

More than 1 year has passed since last update.

プログラミング技術の変化で得られた知見・苦労話【PR】パソナテック Advent Calendar 2020 Day 6

posted at

updated at

リアクティブプログラミングをNoSQLで「体験」してみる(Couchbase Reactive APIに触れる)

はじめに

プログラミング言語を巡って、常に新しいプログラミングパラダイムが生まれています。
ここでは、リアクティブプログラミングという新しいパラダイムがテーマになります(ちなみに、プログラミングパラダイムについての日本語のWikipediaでは、リアクティブプログラミングが、リストの最後に〜つまり、最も新しいものとして〜掲載されていました)。

さて、リアクティブプログラミングに関心を持った初学者が、リアクティブプログラミングを学ぶに際して、どういったアプローチが考えられるでしょうか?

手始めとして、まず概念的に理解することが考えられます。インターネットを「リアクティブプログラミング」というキーワードで検索すると、「リアクティブプログラミングとは(何か)」といった記事が、まず目につきます。

そして、実際にリアクティブプログラミングのフレームワークを使って、プログラミングすること、が次に思いつくのではないかと思います。

しかし、ここで多くの人が躓くことが想像できます。結局何を作れば良いのかがわからない、という状況になってしまいがちで、あるいは、フレームワークのチュートリアルを読み、数行のプログラムを書いてみたりはしたものの、それ以上先に進めないということもあるかもしれません。

ここでは、まずリアクティブフレームワークを用いて実装されたAPIを利用することで、リアクティブプログラミングを「体験」してみることにしたいと思います。どんな体験かといえば、(NoSQL)データベースにおけるデータ操作を題材にすることで、大規模データ取り扱い時の挙動という、リアクティブ(非ブロックAPI)と非リアクティブ(ブロックAPI)との違いを体感することを意図しています。

少し脱線になるかもしれませんが、過去には、WEBアプリケーション(やWEBブラウザ)というものが新しいものであった時代がありました。この時に、一度もWEBアプリケーションをユーザとして使ったことがない人が、WEBアプリケーションを開発したり、ましてやWEBアプリケーションの概念を学んだり、といったことがあったとしたら、それはかなり特殊な状況であることが分かるかと思います。
そういった意味でも、ここでのアプローチには、一定の意味があるのではないかと考えます。

体験のための素材の選択

リアクティブ(非ブロック)APIを提供しているデータベースであれば、何を使っても良いところですが、ここではドキュメント指向NoSQLデータベースである、Couchbase Serverを題材として用います。プログラミング言語としては、Javaを用いることにします。

体験の際の手法

リアクティブ(非ブロック)APIと、ブロック(非リアクティブ)APIの両方を使って、同じ処理を実行し、挙動の違いを観察します。

プログラム解説

基底クラス

リアクティブおよびブロックAPIの両方で共通する処理を基底クラスとして定義しました。

特にCouchbaseの知見がない人であって、データベースに接続し、RDBのテーブルに類するデータ格納先(Bucketの中のCollection)を取得していることはイメージしやすいかと思います。

この後で用いる、データの登録もこのクラスで実行しています(upsertメソッドを用い、実行時常に新規追加あるいは上書きします)。今回は、データ取得の挙動の違いを見ることにフォーカスするため、このupsertの処理については、旧来のブロックAPIを用いています。

import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Collection;
import com.couchbase.client.java.json.JsonObject;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

public abstract class Base {

    protected final Cluster cluster;
    protected final Bucket bucket;
    protected final Collection collection;

    public static final String bucketName = "default";
    public static final String userName = "Administrator";
    public static final String userPass = "password";
    public static final String seedNode = "127.0.0.1";

    public int NUM_KEY = 20000;
    protected List<String> keys = new ArrayList<String>(NUM_KEY);
    protected List<Object> listResults = new ArrayList<Object>(NUM_KEY);


    public Base() {
        cluster = Cluster.connect(seedNode, userName, userPass);
        bucket = cluster.bucket(bucketName);
        collection = bucket.defaultCollection();
        bucket.waitUntilReady(Duration.ofSeconds(30));
        prepKeys();
        prepDocuments();
    }

    private void prepKeys() {
        for (int i = 0; i < NUM_KEY; i++) {
            keys.add(String.valueOf(i));
        }
    }
    private void prepDocuments() {
        for (String key : keys) {
            JsonObject content = JsonObject.create().put("key", String.valueOf(key));
            collection.upsert(key, content);
        }
    }
    private void disconnect() {
        cluster.disconnect();
    }

    public void exec() {
        long startTime = System.currentTimeMillis();
        List<Object> listResults = get();
        long endTime = System.currentTimeMillis();

        System.out.println("先頭データ:" + listResults.get(0));
        System.out.println("末尾データ:" + listResults.get(NUM_KEY -1));
        System.out.println("取得サイズ:" + listResults.size());
        System.out.println("経過時間(ミリ秒):" + (endTime - startTime));
        disconnect();
    }

    protected abstract List<Object> get();

ブロックAPI使用クラス

基底クラスを継承し、アブストラクトメソッド(get)をブロックAPIを用いて実装しています。

import java.util.List;

import com.couchbase.client.java.kv.GetResult;

public class Sequential extends Base {

    @Override
    protected List<Object> get() {  
        for (String key : keys) {
            GetResult result = collection.get(String.valueOf(key));
            listResults.add(result);
        }
        return listResults;
    }

    public static void main(String[] args) {
        new Sequential().exec();
    }
}

リアクティブAPI使用クラス

基底クラスを継承し、アブストラクトメソッド(get)をリアクティブAPIを用いて実装しています。

import java.util.List;

import com.couchbase.client.java.ReactiveCollection;
import reactor.core.publisher.Flux;

public class Concurrent extends Base {

    @Override
    protected List<Object>  get() {
        ReactiveCollection reactiveCollection = collection.reactive();

        Flux<Object> resultFlux = Flux.fromArray(keys.toArray())
            .flatMap( k -> reactiveCollection.get(String.valueOf(k)));

        List<Object> listResults = resultFlux.collectList().block();
        return listResults;
    }

    public static void main(String[] args) {
        new Concurrent().exec();
    }
}

実行結果

以下は、それぞれのクラスの実行結果です。

同期API

先頭データ:GetResult{content={"key":"0"}, flags=0x2000000, cas=0x165349f2729e0000, expiry=Optional.empty}
末尾データ:GetResult{content={"key":"19999"}, flags=0x2000000, cas=0x165349f307a30000, expiry=Optional.empty}
取得サイズ:20000
経過時間(ミリ秒):1201

非同期API

先頭データ:GetResult{content={"key":"0"}, flags=0x2000000, cas=0x16534a84e29c0000, expiry=Optional.empty}
末尾データ:GetResult{content={"key":"19999"}, flags=0x2000000, cas=0x16534a856f4e0000, expiry=Optional.empty}
取得サイズ:20000
経過時間(ミリ秒):302

MacラップトップにCouchbase Serverをインストールし、同じくラップトップ上でプログラムを実行していますが、私の環境では、ざっと4倍の速度の違いが見られました。

今回のケースでは、取得したデータをリストに納める際にブロック処理しているので、より非ブロック処理を生かすことができるユースケースであれば、さらに高い効率化が望めると思われます。

最後に

今回は、プログラミング内容にフォーカスして説明しました。
少しでも、興味をもたれた方は、今回掲載したソースコードを実際に実行して見られたり、ご自身で、さらに進んだデータ操作に挑んでみていただければ、幸甚です。

Couchbase Serverそのものの説明や導入については、本記事の範囲ではないため、割愛せざるを得ませんでしたが、手始めに動かしてみるということであれば、Couchbase Serverのインストールや管理コンソールからの操作は、拍子抜けするほど簡単ですので、ご関心に応じ、実行していただければと思います。

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
What you can do with signing up
0
Help us understand the problem. What are the problem?