Posted at

Retrofitの返り値は`Observable`と`Single`などとはどっちがいいか


Retrofitの返り値はObservableSingleなどとはどっちがいいか

RetrofitとRxJava、RxKotlinを使用している場合はAPIの定義を


ApiService.kt

interface ApiService {

@GET("test")
fun test(): Observable<ResponseBody>
}


のように書くだろう。

しかし人によってはObservableの代わりにSingleMaybeFlowableを使う人もいる。

実際どっちの方がいいのか?


結論

いきなり結論から述べるが、基本的にはSingleMaybeFlowableの方がいいと思う。

※2019/08/08現在。


ObservableとSingleの違いは?

もちろん色々とあるがここで自分が観点としたのは

ObservableonNextが呼ばれた回数だけデータが流れてくる。

SingleonSuccessが呼ばれた際にデータが1回だけ流れてくる。

つまり複数回データが流れ得る場合はObservable、1回しかデータが流れ得ない場合はSingleが適しているということになる。


サーバが複数回出力することがあるか?

結論を述べればある

普通に出力を行なった場合は基本的には1回しか出力されないが、例えばPHPだとob_flushなどを駆使すれば逐次出力が可能となる。

参考に書くとこんな感じ。


flush.php

<?php

header('Content-type: text/html; charset=utf-8');
for ($i = 0; $i < 10; ++$i)
{
echo $i . '<br />';
flush();
ob_flush();
sleep(1);
}
?>

ただし、ApacheやNginxなどの設定によってできないこともあるのでその場合はそっちもいじる必要がある。

なんにしてもサーバが複数回出力するという可能性はあるということだ。


実際にRetrofitで複数回出力を受け取ってみる

では、サーバが複数回出力してきた場合にどうなるか?


ApiService.kt

interface ApiService {

@GET("test")
fun test(): Observable<ResponseBody>
}



Call.kt

val client = OkHttpClient.Builder()

.connectTimeout(60L, TimeUnit.SECONDS)
.readTimeout(60L, TimeUnit.SECONDS)
.writeTimeout(60L, TimeUnit.SECONDS)
.build()

val retrofit = Retrofit.Builder()
.baseUrl("https://example.com/")
.addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io()))
.client(client)
.build()

val service = retrofit.create(ApiService::class.java)

service.test()
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
// output etc.
}


結論として、すべてのデータが1回でまとめて流れてきた。

つまり、サーバが複数回出力しても全部終わるまで待ってデータを流す、という動きになっている。


ソースコードを追ってみる

バージョンによって変わるとは思うので明記しておく。

2019/08/08現在。

最新のmasterコミットの8c93b59dbc57841959f5237cb141ce0b3c18b778でソースコードを見る。

通信の実行はこの辺でやってる。

155行目からのソースコード。


OkHttpCall.java

  @Override public Response<T> execute() throws IOException {

okhttp3.Call call;

synchronized (this) {
if (executed) throw new IllegalStateException("Already executed.");
executed = true;

if (creationFailure != null) {
if (creationFailure instanceof IOException) {
throw (IOException) creationFailure;
} else if (creationFailure instanceof RuntimeException) {
throw (RuntimeException) creationFailure;
} else {
throw (Error) creationFailure;
}
}

call = rawCall;
if (call == null) {
try {
call = rawCall = createRawCall();
} catch (IOException | RuntimeException | Error e) {
throwIfFatal(e); // Do not assign a fatal error to creationFailure.
creationFailure = e;
throw e;
}
}
}

if (canceled) {
call.cancel();
}

return parseResponse(call.execute());
}


最後に呼び出しているparseResponseまではサクサクと処理が実行される。

が、parseResponseで処理時間がかかっている。

199行目からのソースコード。


OkHttpCall.java

  Response<T> parseResponse(okhttp3.Response rawResponse) throws IOException {

ResponseBody rawBody = rawResponse.body();

// Remove the body's source (the only stateful object) so we can pass the response along.
rawResponse = rawResponse.newBuilder()
.body(new NoContentResponseBody(rawBody.contentType(), rawBody.contentLength()))
.build();

int code = rawResponse.code();
if (code < 200 || code >= 300) {
try {
// Buffer the entire body to avoid future I/O.
ResponseBody bufferedBody = Utils.buffer(rawBody);
return Response.error(bufferedBody, rawResponse);
} finally {
rawBody.close();
}
}

if (code == 204 || code == 205) {
rawBody.close();
return Response.success(null, rawResponse);
}

ExceptionCatchingResponseBody catchingBody = new ExceptionCatchingResponseBody(rawBody);
try {
T body = responseConverter.convert(catchingBody);
return Response.success(body, rawResponse);
} catch (RuntimeException e) {
// If the underlying source threw an exception, propagate that rather than indicating it was
// a runtime exception.
catchingBody.throwIfCaught();
throw e;
}
}


parseResponseの中でもT body = responseConverter.convert(catchingBody);が特に時間がかかっている。

responseConverterを確認するとretrofit2.BuiltInConverters$BufferingResponseBodyConverterだった。

96行目からのソースコード。


BuiltInConverters.java

  static final class BufferingResponseBodyConverter

implements Converter<ResponseBody, ResponseBody> {
static final BufferingResponseBodyConverter INSTANCE = new BufferingResponseBodyConverter();

@Override public ResponseBody convert(ResponseBody value) throws IOException {
try {
// Buffer the entire body to avoid future I/O.
return Utils.buffer(value);
} finally {
value.close();
}
}
}


そこのコメントに書かれている通り以降I/Oを避けるためにレスポンスのボディ全体をバッファリングしてるとのことだ。

つまりここでレスポンスが全部返ってくるまで待っているためObservableであったとしても一括でデータが流れてきたということである。

一応Utils.bufferも見てみる。

314行目からのソースコード。


Utils.java

  static ResponseBody buffer(final ResponseBody body) throws IOException {

Buffer buffer = new Buffer();
body.source().readAll(buffer);
return ResponseBody.create(body.contentType(), body.contentLength(), buffer);
}

というわけでbody.source().readAllしてるため全部のレスポンスを読み込むという動きになる。


総論

たとえサーバ側が複数回出力をする場合でも、Retrofitで全部のレスポンスを読み込むようになっているため、データは1回しか流れてこない。

データが1回しか流れてこないならSingleなどを使う方がシンプルにソースコードを書くことができる。

そのため、自分個人としてはSingleなどを使うのがいいのではないかと思う。


余談


StreamingResponseBodyConverter

ソースコードを追う過程でBufferingResponseBodyConverterの親戚であろうStreamingResponseBodyConverterというのがあった。

どうもアノテーションをつけることでこっちが使われるようになるらしい。

というわけでやってみる。


ApiService.kt

interface ApiService {

@Streaming
@GET("test")
fun test(): Observable<ResponseBody>
}


これなら複数回データが流れるのでは、と思ったが流れず1回だけだった。

ドキュメントを読むと変換を行わないだけ、と言っている。

確かにソースコードを読んでもそのまま返しているだけ。

87行目からのソースコード。


BuiltInConverters.java

  static final class StreamingResponseBodyConverter

implements Converter<ResponseBody, ResponseBody> {
static final StreamingResponseBodyConverter INSTANCE = new StreamingResponseBodyConverter();

@Override public ResponseBody convert(ResponseBody value) {
return value;
}
}


つまりは生のResponseBodyを渡ってくるだけ。

そのため自分でストリームの処理を行う必要がある。

余談になるのでざっくりとだが


Call.kt

service.test()

.subscribe {
val source = it.source()
while (!source.exhausted()) {
// source.readUtf8Line() etc.
}
}

のような感じでストリーム処理を行うことになる。

サーバでどうしても複数回出力させたい場合はこうすることで対応ができる。

しかし、結局1回しかデータが流れないのには変わらないのでやはりSingleなどでいいと思う。


総論

基本的にはSingleMaybeFlowableを使おう。

では、よいプログラミングライフを!