本記事はぷりぷりあぷりけーしょんず Advent Calendar 2019 の15日目の記事です。
はじめに
Qiita初投稿です=(:A/ L)_
今年はいろんな技術に出会った一年でした。
その中で、記事が少なくて困りに困った技術の一つ、
RxKotlinについてまとめてみます。
出会ったRxKotlinコード
初めてRxKotlinに出会った時のソースコード。
Androidアプリにて、自分の発言がDBにリアルタイムに保存される機能の一部分。
class MainViewModel @Inject constructor(...): BaseViewModel() {
{
// 省略
private val user = BehaviorSubject.create<User>()
private var speechToText: SpeechToTextRepository.SpeechToText? = null
init {
//省略
user.toFlowable(BackpressureStrategy.LATEST)
.switchMap { user ->
speechToTextRepository.start(user.language, 32000)
.retryWhen { error ->
error.flatMap {
Flowable.timer(
1000,
TimeUnit.MILLISECONDS
)
}
}
.doOnSuccess { speechToText = it }
.flatMapObservable { it.text }
.toFlowable(BackpressureStrategy.LATEST)
.flatMap { translationRepository.translate(it, user.language) }
.map { Message(it, user) }
}.flatMap { messageRepository.addMessage(channelName, it).toFlowable<Unit>() }
.subscribeBy(
onError = { Timber.d(it) }
)
.addTo(compositeDisposable)
}
初めて見た時に、
なにFlowableって?
なにswitchMapって?
...その他多数疑問に陥りました。
そもそもRxの存在を知りませんというところからでした。
#Rxとは?
Reactive Extentionの略。Microsoftが開発した概念。
GPSやライブストリーミングのようなデータが生成されるたびに送信されるデータに対し、受け取ったプログラムがその度処理を行って目的を達成するという考え方。リクエストを投げるような積極性はなく、受け身のような感じ?
一つのサーバーの非同期処理、イベント、時間に関係した処理を簡略化して行うためにLINQの形で、簡単に、宣言的に記述できるようになる。リアクティブプログラミングとも呼ばれている。
RxKotlinはRxJavaをKotlinで使いやすくするための軽量なライブラリなので、
RxKotlinよりも、割と資料が多いRxJavaから入門するのがおすすめ。
RxJavaはNetflix社がRxをJavaに導入したものを公開したライブラリ。
バージョン1.xと2.xがあり、1.xから2.xに上がった背景にはReactive Streamの適用がある。
Reactive Streamは、どのライブラリやフレームワークに関係なく、データストリームを非同期で扱えるようにするための共通の仕組みが作れるインターフェースを提供している。
Reactive Stream
インターフェース名 | 役割 |
---|---|
Publisher | データを生産し、通知をする役割(生産者) |
Subscriber | 通知をされたデータを受け取り、データ処理をする役割(消費者) |
SubScription | データ数のリクエストおよび購読の解除等、Subscriptionのリクエストを設定でもち、Publisherに教える役割。 |
Processor | PublisherとSubscriberの両方の性質をもつ。(両方を継承している) |
上の図のように、
データが流れるように受け渡しされる。
ちなみに③⑤⑦受け取りたいデータをリクエストとあるが、
これはバックプレッシャーという機能があることで、
自分が処理できる能力に合わせて受け取るデータ数、タイミングを指定できる仕組み。
ReactiveStreamを使わないデータストリームは、バックプレッシャーがないので、データは多かろうが少なかろうがドンドン来る。(わんこそばみたい)
RxJavaを書いてみる
##環境構築
Javaプロジェクトを作成し、MavenまたはGradleを入れて下の内容を追記する。
Maven
<!-- https://mvnrepository.com/artifact/io.reactivex.rxjava2/rxjava -->
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.2.15</version>
</dependency>
Gradle
dependencies {
implementation 'io.reactivex.rxjava2:rxjava:2.2.15'
}
インターフェース
RxJavaには、生産者と消費者の関係が大きく分けて2つある。
1.Reactive Streamに対応したインターフェース(バックプレッシャーあり)
インターフェース名 | |
---|---|
生産者 | Flowable |
消費者 | Subscriber |
生産者のデータ量リクエストや購読解除等 | Subscription |
public static void main(String[] args) throws Exception {
//わんこそばの数を通知するFlowableの生成
Flowable<String> flowable
= Flowable.create(new FlowableOnSubscribe<String> () {
/* 購読通知 */
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
// わんこそばの数をカウントするリスト
int[] sobaCountData = {1, 2, 3, 4};
for (String data: sobaCountDatas) {
//購読解除している場合は処理をやめる
if (emitter.isCancelled()) {
return;
}
//データ通知
emitter.onNext(data);
}
// 完了したことを通知する
emitter.onComplete();
}, BackpressureStrategy.BUFFER);//溢れたデータはバッファを取るようにする
//main処理-----------------
flowable
// Subscriberの処理を別スレッドで行う
.observeOn(Schedulers.computation())
/* ①購読する */
.subscribe(new Subscriber<String>() {
// データ数のリクエストおよび購読の解除を行うオブジェクト
private Subscription subscription;
// 食べたそばのお椀の数
private int count = 0;
/* ②データ通知準備ができた通知を受け取った時の処理 */
@Override
public void onSubscribe(Subscription subscription) {
//SubscriptionをSubscriber内で保持する
this.subscription = subscription;
// ③受け取るデータ数をリクエストする
this.subscription.request(1L);
}
/* ④データを受け取った際の処理 */
@Override
public void onNext(String data) {
//受け取ったデータを出力する(掛け声を変える)
if(data%2 != 0) {
System.out.println(data + "杯目食べました!はい、ドンドン〜");
} else {
System.out.println(data + "杯目食べました!はい、じゃんじゃん〜");
}
// お椀の数を加える
this.count += 1
//次に受け取るデータ数をリクエストする
this.subscription.request(1L);
}
/* ⑧完了を通知された際の処理 */
@Override
public void onComplete() {
System.out.println("あなたは" + this.count + "杯食べてお椀の蓋を閉めました!");
}
/* エラーを通知された場合の処理 */
@Override
public void onError(Throwable error) {
error.printStackTrace();
}
});
//しばらく待つ
Thread.sleep(500L);
}
1杯目食べました!はい、ドンドン〜
2杯目食べました!はい、じゃんじゃん〜
3杯目食べました!はい、ドンドン〜
4杯目食べました!はい、じゃんじゃん〜
あなたは4杯食べてお椀の蓋を閉めました!
###2.ReactiveStream対応なしのインターフェース(バックプレッシャーなし)
インターフェース名 | |
---|---|
生産者 | Observable |
消費者 | Observer |
購読解除 | Disposable |
public static void main(String[] args) throws Exception {
//わんこそばの数を通知するObservableの生成
Observable<String> observable
= Observable.create(new ObservableOnSubscribe<String> () {
/* 購読通知 */
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
// わんこそばの数をカウントするリスト
int[] sobaCountData = {1, 2, 3, 4};
for (String data: sobaCountDatas) {
//購読解除している場合は処理をやめる
if (emitter.isCancelled()) {
return;
}
//データ通知
emitter.onNext(data);
}
// 完了したことを通知する
emitter.onComplete();
}, BackpressureStrategy.BUFFER);//溢れたデータはバッファを取るようにする
//main処理-----------------
observable
// Subscriberの処理を別スレッドで行う
.observeOn(Schedulers.computation())
/* ①購読する */
.subscribe(new Observer<String>() {
// 食べたそばのお椀の数
private int count = 0;
/* ②データ通知準備ができた通知を受け取った時の処理 */
@Override
public void onSubscribe(Disposable disposable) {
//なにもしない
}
/* ④データを受け取った際の処理 */
@Override
public void onNext(String data) {
//受け取ったデータを出力する(掛け声を変える)
if(data%2 != 0) {
System.out.println(data + "杯目食べました!はい、ドンドン〜");
} else {
System.out.println(data + "杯目食べました!はい、じゃんじゃん〜");
}
// お椀の数を加える
this.count += 1
//次に受け取るデータ数をリクエストする
this.subscription.request(1L);
}
/* ⑧完了を通知された際の処理 */
@Override
public void onComplete() {
System.out.println("あなたは" + this.count + "杯食べてお椀の蓋を閉めました!");
}
/* エラーを通知された場合の処理 */
@Override
public void onError(Throwable error) {
error.printStackTrace();
}
});
//しばらく待つ
Thread.sleep(500L);
}
1杯目食べました!はい、ドンドン〜
2杯目食べました!はい、じゃんじゃん〜
3杯目食べました!はい、ドンドン〜
4杯目食べました!はい、じゃんじゃん〜
あなたは4杯食べてお椀の蓋を閉めました!
1.ReactiveStreamありとの違いは、バックプレッシャーが使われないところです。
ObserverクラスのonSubscribeメソッドの中で、消費者が受信するデータ量のコントロールが行われていないません。そして、購読解除の役割も果たしていたSubscriptionのポジションは、Disposableでまかなわれます。
今回はひとまとめに生産者や消費者の処理を書いていますが、
クラスでファイルを分けて記述することや、メソッドチェーンを用いた記述等、アプリの規模にあった書き方をすることができます。
また、今回は単純に流れてきたデータをSystem.out.printlnを用いて表示するだけでしたが、
複数の購読者にデータを同時配布したり(「Hot」「Cold」な生産者) 、
オペレータを使って、データの取捨選択(filter)や、変更処理(map)を行うことや、
配布タイミングや手順を変更することが可能です。
今回は書ききれないので省略します。
RxJava公式GitHub:
https://github.com/ReactiveX/RxJava
#RxKotlinを書いてみる
やっと本題。
環境構築
Maven
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxkotlin</artifactId>
<version>2.x.y</version>
</dependency>
Gradle
dependencies {
implementation 'io.reactivex.rxjava2:rxjava:2.x.x'
implementation 'io.reactivex.rxjava2:rxkotlin:2.x.x'
implementation 'io.reactivex.rxjava2:rxandroid:2.x.x'
}
RxJavaで実行したわんこそばをRxKotlinに書き直す
RxJavaの**2.ReactiveStream対応なしのインターフェース(バックプレッシャーなし)**で実装した場合。
package com.example.osusi
import io.reactivex.rxkotlin.subscribeBy
import io.reactivex.rxkotlin.toObservable
fun main(args: Array<String>) {
//そばの数
val list = listOf(1, 2, 3, 4) //→listも拡張されたRxKotlinのインターフェースのため、ReactiveX仕様のメソッドを持つ
list.toObservable()
.subscribeBy(
onNext = {
//受け取ったデータを出力する(掛け声を変える)
if(it%2 != 0) {
print(it.toString() + "杯目食べました!はい、ドンドン〜");
} else {
print(it.toString() + "杯目食べました!はい、じゃんじゃん〜");
}
},
onError = { it.printStackTrace() },
onComplete = { print("あなたはお椀を閉じました!")}
)
}
1杯目食べました!はい、ドンドン〜
2杯目食べました!はい、じゃんじゃん〜
3杯目食べました!はい、ドンドン〜
4杯目食べました!はい、じゃんじゃん〜
あなたはお椀の蓋を閉めました!
データの型にRxKotlinの拡張インターフェースを使用しているため、すでにObservable、Flowableで必要なメソッドが準備されている。
公式のGitHub:
https://github.com/ReactiveX/RxKotlin
#最後に
最初は、なんだこれと思っていましたが、
RxJavaとRxKotlinをまとめている中で色々考えていたところ、
リアルタイム通知や、自動更新(投稿データがリアルに追加される)とかで活躍するんだろうなと思いました。
ReactiveExtensionの技術自体は2009年ごろからあるみたいですが、今後も新たなインターフェイスで登場したりするのかなと思います。
RxJavaもRxKotlinも奥が深いのでまとめきれませんでした。また、記事を書きたいなと思います。
ありがとうございました。
#参考資料
ReactiveExtension概要:
https://www.atmarkit.co.jp/fdotnet/introrx/introrx_01/introrx_01_01.html
ReacticeX公式:
http://reactivex.io/
RxKotlinをつかってみたメモ書き:
https://qiita.com/pongi/items/6f222cdfdf7cb560d5cf
RxJavaリアクティブプログラミング(著:須田智之):
https://www.amazon.co.jp/dp/B06XGYSHCN/ref=dp-kindle-redirect?_encoding=UTF8&btkr=1