3
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

AndroidでRxJava(RxAndroid)による非同期処理

Last updated at Posted at 2018-12-21

はじめに

前回以下の記事でサンプルアプリを作成した話をしました。
AndroidでRxJava(RxAndroid) + RxBus + Data Binding + roomのサンプル(Java1.7ベース)。

今回はその中でRxJavaを使用してAPIから非同期にデータを取得する実装を解説してみます。
なお、動作環境などは上記の前回記事を参照願います。

Androidアプリによる非同期処理の必要性

Web上に存在するAPIからデータを取得しようとすると、
通信状況やデータ量によっては処理に時間がかかる場合があります。
この間、画面に対する操作(キャンセルなど)が受け付けられないとUXが悪くなるので、
時間がかかる可能性がある処理は非同期処理でやるのが一般的です。

非同期処理を実現するためのライブラリ

Androidではこの非同期処理を実現するライブラリがいくつかあります。

今回はRxJavaを使いましたが、
LiveDataなど他のライブラリの使用を検討しても良いでしょう。
ちなみに私がRxJavaを選んだ理由は、
使用例が多く事例が多いと思ったことと、
LiveDataに比べれば"枯れている"と思ったため、です。

前提条件

今回はReactive ProgrammingやRxJavaについては解説しません。
詳しくはググるといろいろヒットしますのでそちらにて…。

またとりあえず動くのに最低限必要な解説しかしませんので、
業務用に使う際はくれぐれも要確認で、ご利用願います。

RxJavaによる非同期処理の実現と説明。

解説するソースはこちらより参照できます。

GitHub SampleAndroidApps

API実行側の処理(監視される側Observer)

該当のソースはこちら

    Observable.create(new ObservableOnSubscribe<GithubRepositoryApiResponseEntity>() {
        @Override
        public void subscribe(ObservableEmitter<GithubRepositoryApiResponseEntity> emitter) throws Exception {
    
            // 非同期の処理をする
            Request request = new Request.Builder()
                    .url(url)
                    .get()
                    .build();
    
            OkHttpClient client = new OkHttpClient();
            try {
                Response res = client.newCall(request).execute();
                String responseBodyStr = res.body().string();
                // GSONでjsonをオブジェクト化する
                Gson gson = new Gson();
                GithubRepositoryApiResponseEntity responceData = (GithubRepositoryApiResponseEntity) gson.fromJson(responseBodyStr, GithubRepositoryApiResponseEntity.class);
                emitter.onNext(responceData);
                emitter.onComplete();
            } catch (Exception e) {
                e.printStackTrace();
                emitter.onError(e);
            } finally {
                loading = false;
            }
        }
    })
    .observeOn(Schedulers.newThread())  // observeOn以降(レスポンス)も新しいスレッドでやる
    .subscribeOn(Schedulers.newThread())       // 新しいスレッドで非同期処理でやる
    .subscribe(observer);             // 実行する

Observable.createにObservableOnSubscribe型の無名クラスを渡します。
ObservableOnSubscribeにはStreamに流す(Observerが受け取る結果)のオブジェクトの型を指定します。
subscribeメソッドに非同期にしたい処理を書きます。
(今回はGitHubのRepositoryを取得するApiを叩く処理)

emitter.onNextにStreamに流す(Observerが受け取る結果)を指定します。
その後、忘れずにemitter.onComplete();を呼びます。
異常系はemitter.onError(Exception);で。

こうしてObservable.createした結果をobserveOnに渡して、
処理するスレッドを指定します。
AndroidSchedulers.mainThread()を指定するとメインスレッドで動作します。
Schedulers.``newThread``()を指定すると新しいスレッド(メインスレッドを邪魔しない)で動作します。

最後にsubscribeメソッドでobserver(監視する側)を指定して登録します。

API実行結果を受け取った側の処理(監視する側Observer)

該当のソースはこちら

    private Observer observer = new Observer<GithubRepositoryApiResponseEntity>() {
        GithubRepositoryApiCompleteEventEntity eventResult = new GithubRepositoryApiCompleteEventEntity();
    
        @Override
        public void onSubscribe(Disposable d) {
    
        }
    
        @Override
        public void onNext(GithubRepositoryApiResponseEntity response) {
            eventResult.setResult(false);
    
            List<GithubRepositoryApiResponseEntity.Repository> repositoryList = response.getItems();
            ArrayList<GithubRepositoryListItemEntity> data = new ArrayList<>();
            for (GithubRepositoryApiResponseEntity.Repository repository : repositoryList) {
                GithubRepositoryListItemEntity listItem = new GithubRepositoryListItemEntity();
                listItem.setFullName(repository.getFull_name());
                data.add(listItem);
            }
    
            eventResult.setListItems(data);
            if (response.getItems() != null && response.getItems().size() > 0) {
                eventResult.setResult(true);
            }
    
            // イベントを呼び出す
            RxBus.getInstance().send(eventResult);
        }
    
        @Override
        public void onError(Throwable e) {
            // イベントを呼び出す
            RxBus.getInstance().send(eventResult);
        }
    
        @Override
        public void onComplete() {
    
        }
    };

まず、以下のRxBusについては次回解説します。
RxBus.getInstance().send(eventResult);

最初にObserver型のインスタンスを定義します。
TはObservableから流れてくるオブジェクトの型を指定します。

onSubscribeはデータ準備が完了したら実行されるメソッド。
引数のDisposableを使用して通知解除が可能
(このタイミングで解除する意義があるのかは不明…)

onNextはデータが流れてくると実行されるメソッド。
今回はAPIの結果を1回だけ受け取るのみ。
このサンプルでは表示用のEntityクラスに入れ替えてますが、
GsonとData Bindingをセットで定義できれば不要かなぁと思います。
(そこまで致命的な影響ではないと思ったのでそのままにしました)

onErrorは異常時に呼び出されるメソッド。
このサンプルでは特に処理してないので、
必要があれば何かしましょう。

onCompleteはデータを流すのが終わった時に実行されるメソッド。

以上、こんな感じでした。

ライフサイクルについて

ViewModelは永続化している(Applicationのサブクラスにインスタンスを保持している)ので、
基本的にはライフサイクルによる影響はないと思っています。
ですが、View(ActivityやFragment)が絡んでくると影響があります。

おわりに

というわけでRxJavaによる非同期処理の簡単な解説でした。
もっとガッツリいろんなことにRxJavaを使おうとすると大変そうですね…。
とりあえずは今のレガシーなコードから、
非同期処理を置き換えられるのは実現できそうです。

次回はRxBusによるイベント通知を説明してみたいと思います。

3
8
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?