LoginSignup
40
12

More than 3 years have passed since last update.

RxKotlin入門を目指してRxについて知る

Last updated at Posted at 2019-12-15

本記事はぷりぷりあぷりけーしょんず Advent Calendar 2019 の15日目の記事です。

はじめに

Qiita初投稿です=(:A/ L)_
今年はいろんな技術に出会った一年でした。
その中で、記事が少なくて困りに困った技術の一つ、
RxKotlinについてまとめてみます。

出会ったRxKotlinコード

初めてRxKotlinに出会った時のソースコード。
Androidアプリにて、自分の発言がDBにリアルタイムに保存される機能の一部分。

MainViewModel.kt
   class MainViewModel @Inject constructor(...): BaseViewModel() {
{

// 省略

   private val user = BehaviorSubject.create<User>()
   private var speechToText: SpeechToTextRepository.SpeechToText? = null

init {

                //省略

   user.toFlowable(BackpressureStrategy.LATEST)
            .switchMap { user ->
                speechToTextRepository.start(user.language, 32000)
                    .retryWhen { error ->
                        error.flatMap {
                            Flowable.timer(
                                1000,
                                TimeUnit.MILLISECONDS
                            )
                        }
                    }
                    .doOnSuccess { speechToText = it }
                    .flatMapObservable { it.text }
                    .toFlowable(BackpressureStrategy.LATEST)
                    .flatMap { translationRepository.translate(it, user.language) }
                    .map { Message(it, user) }
            }.flatMap { messageRepository.addMessage(channelName, it).toFlowable<Unit>() }
            .subscribeBy(
                onError = { Timber.d(it) }
            )
            .addTo(compositeDisposable)
}

初めて見た時に、
なにFlowableって?
なにswitchMapって?
...その他多数疑問に陥りました。
そもそもRxの存在を知りませんというところからでした。

Rxとは?

Reactive Extentionの略。Microsoftが開発した概念。
GPSやライブストリーミングのようなデータが生成されるたびに送信されるデータに対し、受け取ったプログラムがその度処理を行って目的を達成するという考え方。リクエストを投げるような積極性はなく、受け身のような感じ?
一つのサーバーの非同期処理、イベント、時間に関係した処理を簡略化して行うためにLINQの形で、簡単に、宣言的に記述できるようになる。リアクティブプログラミングとも呼ばれている。

RxKotlinはRxJavaをKotlinで使いやすくするための軽量なライブラリなので、
RxKotlinよりも、割と資料が多いRxJavaから入門するのがおすすめ。

RxJavaはNetflix社がRxをJavaに導入したものを公開したライブラリ。
バージョン1.xと2.xがあり、1.xから2.xに上がった背景にはReactive Streamの適用がある。

Reactive Streamは、どのライブラリやフレームワークに関係なく、データストリームを非同期で扱えるようにするための共通の仕組みが作れるインターフェースを提供している。

Reactive Stream

Reactive Stream-5.jpg

インターフェース名 役割
Publisher データを生産し、通知をする役割(生産者)
Subscriber 通知をされたデータを受け取り、データ処理をする役割(消費者) 
SubScription データ数のリクエストおよび購読の解除等、Subscriptionのリクエストを設定でもち、Publisherに教える役割。
Processor PublisherとSubscriberの両方の性質をもつ。(両方を継承している)

上の図のように、
データが流れるように受け渡しされる。
ちなみに③⑤⑦受け取りたいデータをリクエストとあるが、
これはバックプレッシャーという機能があることで、
自分が処理できる能力に合わせて受け取るデータ数、タイミングを指定できる仕組み。
ReactiveStreamを使わないデータストリームは、バックプレッシャーがないので、データは多かろうが少なかろうがドンドン来る。(わんこそばみたい)
Reactive Stream-6.jpg

RxJavaを書いてみる

環境構築

Javaプロジェクトを作成し、MavenまたはGradleを入れて下の内容を追記する。

Maven

pom.xml

<!-- https://mvnrepository.com/artifact/io.reactivex.rxjava2/rxjava -->
<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>2.2.15</version>
</dependency>

Gradle

app/build.gradle

dependencies {
 implementation 'io.reactivex.rxjava2:rxjava:2.2.15'
}

インターフェース

RxJavaには、生産者と消費者の関係が大きく分けて2つある。

1.Reactive Streamに対応したインターフェース(バックプレッシャーあり)

インターフェース名
生産者 Flowable
消費者 Subscriber
生産者のデータ量リクエストや購読解除等 Subscription
public static void main(String[] args) throws Exception {
    //わんこそばの数を通知するFlowableの生成
   Flowable<String> flowable 
     = Flowable.create(new FlowableOnSubscribe<String> () {

      /* 購読通知 */
      @Override 
      public void subscribe(FlowableEmitter<String> emitter) throws Exception {

        // わんこそばの数をカウントするリスト
        int[] sobaCountData = {1, 2, 3, 4};

        for (String data: sobaCountDatas) {
          //購読解除している場合は処理をやめる
          if (emitter.isCancelled()) {
             return;
          }

          //データ通知
          emitter.onNext(data);
        }
        // 完了したことを通知する
        emitter.onComplete();
     }, BackpressureStrategy.BUFFER);//溢れたデータはバッファを取るようにする

    //main処理-----------------

    flowable
    // Subscriberの処理を別スレッドで行う
    .observeOn(Schedulers.computation())
    /* ①購読する */
    .subscribe(new Subscriber<String>() {
        // データ数のリクエストおよび購読の解除を行うオブジェクト
        private Subscription subscription;
        // 食べたそばのお椀の数
        private int count = 0;

        /* ②データ通知準備ができた通知を受け取った時の処理 */
        @Override
        public void onSubscribe(Subscription subscription) {
            //SubscriptionをSubscriber内で保持する
            this.subscription = subscription;
            // ③受け取るデータ数をリクエストする
            this.subscription.request(1L);
        }

        /* ④データを受け取った際の処理 */
        @Override 
        public void onNext(String data) {
            //受け取ったデータを出力する(掛け声を変える)
            if(data%2 != 0) {
                System.out.println(data + "杯目食べました!はい、ドンドン〜");
            } else {
                System.out.println(data + "杯目食べました!はい、じゃんじゃん〜");
            }
            // お椀の数を加える
            this.count += 1
            //次に受け取るデータ数をリクエストする
            this.subscription.request(1L);
        }

        /* ⑧完了を通知された際の処理 */
        @Override 
        public void onComplete() {
            System.out.println("あなたは" + this.count + "杯食べてお椀の蓋を閉めました!");
        }

        /* エラーを通知された場合の処理 */
        @Override 
        public void onError(Throwable error) {
            error.printStackTrace();
        }
    });

    //しばらく待つ
    Thread.sleep(500L);

   }

実行結果
1杯目食べました!はい、ドンドン〜
2杯目食べました!はい、じゃんじゃん〜
3杯目食べました!はい、ドンドン〜
4杯目食べました!はい、じゃんじゃん〜
あなたは4杯食べてお椀の蓋を閉めました!

2.ReactiveStream対応なしのインターフェース(バックプレッシャーなし)

インターフェース名
生産者 Observable
消費者 Observer
購読解除 Disposable
public static void main(String[] args) throws Exception {
    //わんこそばの数を通知するObservableの生成
   Observable<String> observable 
     = Observable.create(new ObservableOnSubscribe<String> () {

      /* 購読通知 */
      @Override 
      public void subscribe(ObservableEmitter<String> emitter) throws Exception {

        // わんこそばの数をカウントするリスト
        int[] sobaCountData = {1, 2, 3, 4};

        for (String data: sobaCountDatas) {
          //購読解除している場合は処理をやめる
          if (emitter.isCancelled()) {
             return;
          }

          //データ通知
          emitter.onNext(data);
        }
        // 完了したことを通知する
        emitter.onComplete();
     }, BackpressureStrategy.BUFFER);//溢れたデータはバッファを取るようにする

    //main処理-----------------

    observable 
    // Subscriberの処理を別スレッドで行う
    .observeOn(Schedulers.computation())
    /* ①購読する */
    .subscribe(new Observer<String>() {

        // 食べたそばのお椀の数
        private int count = 0;

        /* ②データ通知準備ができた通知を受け取った時の処理 */
        @Override
        public void onSubscribe(Disposable disposable) {
            //なにもしない
        }

        /* ④データを受け取った際の処理 */
        @Override 
        public void onNext(String data) {
            //受け取ったデータを出力する(掛け声を変える)
            if(data%2 != 0) {
                System.out.println(data + "杯目食べました!はい、ドンドン〜");
            } else {
                System.out.println(data + "杯目食べました!はい、じゃんじゃん〜");
            }
            // お椀の数を加える
            this.count += 1
            //次に受け取るデータ数をリクエストする
            this.subscription.request(1L);
        }

        /* ⑧完了を通知された際の処理 */
        @Override 
        public void onComplete() {
            System.out.println("あなたは" + this.count + "杯食べてお椀の蓋を閉めました!");
        }

        /* エラーを通知された場合の処理 */
        @Override 
        public void onError(Throwable error) {
            error.printStackTrace();
        }
    });

    //しばらく待つ
    Thread.sleep(500L);

   }
実行結果
1杯目食べました!はい、ドンドン〜
2杯目食べました!はい、じゃんじゃん〜
3杯目食べました!はい、ドンドン〜
4杯目食べました!はい、じゃんじゃん〜
あなたは4杯食べてお椀の蓋を閉めました!

1.ReactiveStreamありとの違いは、バックプレッシャーが使われないところです。
ObserverクラスのonSubscribeメソッドの中で、消費者が受信するデータ量のコントロールが行われていないません。そして、購読解除の役割も果たしていたSubscriptionのポジションは、Disposableでまかなわれます。

今回はひとまとめに生産者や消費者の処理を書いていますが、
クラスでファイルを分けて記述することや、メソッドチェーンを用いた記述等、アプリの規模にあった書き方をすることができます。

また、今回は単純に流れてきたデータをSystem.out.printlnを用いて表示するだけでしたが、
複数の購読者にデータを同時配布したり(「Hot」「Cold」な生産者) 、
オペレータを使って、データの取捨選択(filter)や、変更処理(map)を行うことや、
配布タイミングや手順を変更することが可能です。
今回は書ききれないので省略します。

RxJava公式GitHub:
https://github.com/ReactiveX/RxJava

RxKotlinを書いてみる

やっと本題。

環境構築

Maven

pom.xml
<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxkotlin</artifactId>
    <version>2.x.y</version>
</dependency>

Gradle

app/build.gradle
dependencies {
    implementation 'io.reactivex.rxjava2:rxjava:2.x.x'
    implementation 'io.reactivex.rxjava2:rxkotlin:2.x.x'
    implementation 'io.reactivex.rxjava2:rxandroid:2.x.x'
}

RxJavaで実行したわんこそばをRxKotlinに書き直す

RxJavaの2.ReactiveStream対応なしのインターフェース(バックプレッシャーなし)で実装した場合。

package com.example.osusi

import io.reactivex.rxkotlin.subscribeBy
import io.reactivex.rxkotlin.toObservable

fun main(args: Array<String>) {

    //そばの数
    val list = listOf(1, 2, 3, 4) //→listも拡張されたRxKotlinのインターフェースのため、ReactiveX仕様のメソッドを持つ

    list.toObservable()
        .subscribeBy(
            onNext = {
                //受け取ったデータを出力する(掛け声を変える)
                if(it%2 != 0) {
                    print(it.toString() + "杯目食べました!はい、ドンドン〜");
                } else {
                    print(it.toString() + "杯目食べました!はい、じゃんじゃん〜");
                }
            },
            onError =  { it.printStackTrace() },
            onComplete = { print("あなたはお椀を閉じました!")}
        )
}
実行結果
1杯目食べました!はい、ドンドン〜
2杯目食べました!はい、じゃんじゃん〜
3杯目食べました!はい、ドンドン〜
4杯目食べました!はい、じゃんじゃん〜
あなたはお椀の蓋を閉めました!

データの型にRxKotlinの拡張インターフェースを使用しているため、すでにObservable、Flowableで必要なメソッドが準備されている。

公式のGitHub:
https://github.com/ReactiveX/RxKotlin

最後に

最初は、なんだこれと思っていましたが、
RxJavaとRxKotlinをまとめている中で色々考えていたところ、
リアルタイム通知や、自動更新(投稿データがリアルに追加される)とかで活躍するんだろうなと思いました。
ReactiveExtensionの技術自体は2009年ごろからあるみたいですが、今後も新たなインターフェイスで登場したりするのかなと思います。
RxJavaもRxKotlinも奥が深いのでまとめきれませんでした。また、記事を書きたいなと思います。
ありがとうございました。

参考資料

ReactiveExtension概要:
https://www.atmarkit.co.jp/fdotnet/introrx/introrx_01/introrx_01_01.html

ReacticeX公式:
http://reactivex.io/

RxKotlinをつかってみたメモ書き:
https://qiita.com/pongi/items/6f222cdfdf7cb560d5cf

RxJavaリアクティブプログラミング(著:須田智之):
https://www.amazon.co.jp/dp/B06XGYSHCN/ref=dp-kindle-redirect?_encoding=UTF8&btkr=1

40
12
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
40
12