Java
Android
Kotlin
RxJava
Retrofit

Retrofit2+ Rx2で通信クラス用のadapterを作った

More than 1 year has passed since last update.

はじめに

まだただのメモになっているのでわかりにくいところも多いかと思います。
適宜編集リクエストなどいただければと思います。

よくある書き方

RetrofitやRxなどの知識は一通りある前提です。
その辺を知りたい方はこちらの記事などが非常に参考になります
KotlinでRetrofitとRxを使ってAPIクライアントをサクッと実装する

class MyApiDatasore {
   fun <T> createService(service: Class<T>) =
                Retrofit.Builder()
                        .baseUrl("url")
                        .addConverterFactory(JacksonConverterFactory.create(ObjectMapper()))
                        .addCallAdapterFactory(RxCallAdapterFactory.create())
                        .build()
                        .create(service)!!
}
interface MyApiService {
    @Get("hogehoge")
    fun fetchHogeHoge()

}

呼び出し側

MyApiDatastore().createService(MyApiService::class.java)
                    .fetchHogeHoge()
                    .subscribeOn(Schedulers.newThread())       // バックグランドスレッドで実行し
                    .observeOn(AndroidSchedulers.mainThread()) // メインスレッドに通知する
                    .subscribe({
                        // onSuccess
                    }) { e ->
                        // onError

                    }

問題点

毎回これを書くのがめんどくさい。

  ...
  .subscribeOn(Schedulers.newThread())
  .observeOn(AndroidSchedulers.mainThread())

まとめてしまおう

この辺の記事を参考にしたり、ドキュメントを参考にしながら、上記を毎回行ってくれる自作のカスタムアダプターを作成しました。

**
 * RxJava adapter for Retrofit.
 * Always `subscribeOn(Schedulers.newThread())`
 *        `observeOn(AndroidSchedulers.mainThread())`
 */
private class RxThreadingCallAdapterFactory : CallAdapter.Factory() {

    companion object {
        fun create(): CallAdapter.Factory = RxThreadingCallAdapterFactory()
    }

    // 常に新しいスレッドで実行する
    private val original: RxJava2CallAdapterFactory = RxJava2CallAdapterFactory.createWithScheduler(Schedulers.newThread())

    override fun get(returnType: Type, annotations: Array<Annotation>, retrofit: Retrofit): CallAdapter<*, *> {
        // `original.get() == null` -> Retrofit throws IllegalArgumentException.
        return RxCallAdapterWrapper(original.get(returnType, annotations, retrofit)!!)
    }

    private class RxCallAdapterWrapper<R>(private val wrapped: CallAdapter<R, *>) : CallAdapter<R, Any> {
        override fun responseType(): Type = wrapped.responseType()

        // 常にメインスレッドに通知する
        override fun adapt(call: Call<R>) = wrapped.adapt(call)
                .let {
                    when (it) {
                        is Completable -> {
                            it.observeOn(AndroidSchedulers.mainThread())
                        }
                        is Observable<*> -> {
                            it.observeOn(AndroidSchedulers.mainThread())
                        }
                        is Single<*> -> {
                            it.observeOn(AndroidSchedulers.mainThread())
                        }
                        is Flowable<*> -> {
                            it.observeOn(AndroidSchedulers.mainThread())
                        }
                        is Maybe<*> -> {
                            it.observeOn(AndroidSchedulers.mainThread())
                        }
                        else -> errorIfDebug()
                    }
                }!!
    }
}

使い方

下記のようなretrofitのサービスを使うことで、毎回.subscribeを書くだけでバックグランドで処理が走りメインスレッドに通知する実装がかけることになる。

      Retrofit.Builder()
              .baseUrl("url")
              .addConverterFactory(JacksonConverterFactory.create(ObjectMapper()))
              .addCallAdapterFactory(RxThreadingCallAdapterFactory.create()) //ここでカスタムadapterを使用する
              .build()
              .create(service)!!