はじめに
プログラミング言語を巡って、常に新しいプログラミングパラダイムが生まれています。
ここでは、リアクティブプログラミングという新しいパラダイムがテーマになります(ちなみに、プログラミングパラダイムについての日本語のWikipediaでは、リアクティブプログラミングが、リストの最後に〜つまり、最も新しいものとして〜掲載されていました)。
さて、リアクティブプログラミングに関心を持った初学者が、リアクティブプログラミングを学ぶに際して、どういったアプローチが考えられるでしょうか?
手始めとして、まず概念的に理解することが考えられます。インターネットを「リアクティブプログラミング」というキーワードで検索すると、「リアクティブプログラミングとは(何か)」といった記事が、まず目につきます。
そして、実際にリアクティブプログラミングのフレームワークを使って、プログラミングすること、が次に思いつくのではないかと思います。
しかし、ここで多くの人が躓くことが想像できます。結局何を作れば良いのかがわからない、という状況になってしまいがちで、あるいは、フレームワークのチュートリアルを読み、数行のプログラムを書いてみたりはしたものの、それ以上先に進めないということもあるかもしれません。
ここでは、まずリアクティブフレームワークを用いて実装されたAPIを利用することで、リアクティブプログラミングを「体験」してみることにしたいと思います。どんな体験かといえば、(NoSQL)データベースにおけるデータ操作を題材にすることで、大規模データ取り扱い時の挙動という、リアクティブ(非ブロックAPI)と非リアクティブ(ブロックAPI)との違いを体感することを意図しています。
少し脱線になるかもしれませんが、過去には、WEBアプリケーション(やWEBブラウザ)というものが新しいものであった時代がありました。この時に、一度もWEBアプリケーションをユーザとして使ったことがない人が、WEBアプリケーションを開発したり、ましてやWEBアプリケーションの概念を学んだり、といったことがあったとしたら、それはかなり特殊な状況であることが分かるかと思います。
そういった意味でも、ここでのアプローチには、一定の意味があるのではないかと考えます。
体験のための素材の選択
リアクティブ(非ブロック)APIを提供しているデータベースであれば、何を使っても良いところですが、ここではドキュメント指向NoSQLデータベースである、Couchbase Serverを題材として用います。プログラミング言語としては、Javaを用いることにします。
体験の際の手法
リアクティブ(非ブロック)APIと、ブロック(非リアクティブ)APIの両方を使って、同じ処理を実行し、挙動の違いを観察します。
プログラム解説
基底クラス
リアクティブおよびブロックAPIの両方で共通する処理を基底クラスとして定義しました。
特にCouchbaseの知見がない人であって、データベースに接続し、RDBのテーブルに類するデータ格納先(Bucket
の中のCollection
)を取得していることはイメージしやすいかと思います。
この後で用いる、データの登録もこのクラスで実行しています(upsert
メソッドを用い、実行時常に新規追加あるいは上書きします)。今回は、データ取得の挙動の違いを見ることにフォーカスするため、このupsert
の処理については、旧来のブロックAPIを用いています。
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Collection;
import com.couchbase.client.java.json.JsonObject;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
public abstract class Base {
protected final Cluster cluster;
protected final Bucket bucket;
protected final Collection collection;
public static final String bucketName = "default";
public static final String userName = "Administrator";
public static final String userPass = "password";
public static final String seedNode = "127.0.0.1";
public int NUM_KEY = 20000;
protected List<String> keys = new ArrayList<String>(NUM_KEY);
protected List<Object> listResults = new ArrayList<Object>(NUM_KEY);
public Base() {
cluster = Cluster.connect(seedNode, userName, userPass);
bucket = cluster.bucket(bucketName);
collection = bucket.defaultCollection();
bucket.waitUntilReady(Duration.ofSeconds(30));
prepKeys();
prepDocuments();
}
private void prepKeys() {
for (int i = 0; i < NUM_KEY; i++) {
keys.add(String.valueOf(i));
}
}
private void prepDocuments() {
for (String key : keys) {
JsonObject content = JsonObject.create().put("key", String.valueOf(key));
collection.upsert(key, content);
}
}
private void disconnect() {
cluster.disconnect();
}
public void exec() {
long startTime = System.currentTimeMillis();
List<Object> listResults = get();
long endTime = System.currentTimeMillis();
System.out.println("先頭データ:" + listResults.get(0));
System.out.println("末尾データ:" + listResults.get(NUM_KEY -1));
System.out.println("取得サイズ:" + listResults.size());
System.out.println("経過時間(ミリ秒):" + (endTime - startTime));
disconnect();
}
protected abstract List<Object> get();
ブロックAPI使用クラス
基底クラスを継承し、アブストラクトメソッド(get
)をブロックAPIを用いて実装しています。
import java.util.List;
import com.couchbase.client.java.kv.GetResult;
public class Sequential extends Base {
@Override
protected List<Object> get() {
for (String key : keys) {
GetResult result = collection.get(String.valueOf(key));
listResults.add(result);
}
return listResults;
}
public static void main(String[] args) {
new Sequential().exec();
}
}
リアクティブAPI使用クラス
基底クラスを継承し、アブストラクトメソッド(get
)をリアクティブAPIを用いて実装しています。
import java.util.List;
import com.couchbase.client.java.ReactiveCollection;
import reactor.core.publisher.Flux;
public class Concurrent extends Base {
@Override
protected List<Object> get() {
ReactiveCollection reactiveCollection = collection.reactive();
Flux<Object> resultFlux = Flux.fromArray(keys.toArray())
.flatMap( k -> reactiveCollection.get(String.valueOf(k)));
List<Object> listResults = resultFlux.collectList().block();
return listResults;
}
public static void main(String[] args) {
new Concurrent().exec();
}
}
実行結果
以下は、それぞれのクラスの実行結果です。
同期API
先頭データ:GetResult{content={"key":"0"}, flags=0x2000000, cas=0x165349f2729e0000, expiry=Optional.empty}
末尾データ:GetResult{content={"key":"19999"}, flags=0x2000000, cas=0x165349f307a30000, expiry=Optional.empty}
取得サイズ:20000
経過時間(ミリ秒):1201
非同期API
先頭データ:GetResult{content={"key":"0"}, flags=0x2000000, cas=0x16534a84e29c0000, expiry=Optional.empty}
末尾データ:GetResult{content={"key":"19999"}, flags=0x2000000, cas=0x16534a856f4e0000, expiry=Optional.empty}
取得サイズ:20000
経過時間(ミリ秒):302
MacラップトップにCouchbase Serverをインストールし、同じくラップトップ上でプログラムを実行していますが、私の環境では、ざっと4倍の速度の違いが見られました。
今回のケースでは、取得したデータをリストに納める際にブロック処理しているので、より非ブロック処理を生かすことができるユースケースであれば、さらに高い効率化が望めると思われます。
最後に
今回は、プログラミング内容にフォーカスして説明しました。
少しでも、興味をもたれた方は、今回掲載したソースコードを実際に実行して見られたり、ご自身で、さらに進んだデータ操作に挑んでみていただければ、幸甚です。
Couchbase Serverそのものの説明や導入については、本記事の範囲ではないため、割愛せざるを得ませんでしたが、手始めに動かしてみるということであれば、Couchbase Serverのインストールや管理コンソールからの操作は、拍子抜けするほど簡単ですので、ご関心に応じ、実行していただければと思います。