Retrofitの返り値はObservable
とSingle
などとはどっちがいいか
RetrofitとRxJava、RxKotlinを使用している場合はAPIの定義を
interface ApiService {
@GET("test")
fun test(): Observable<ResponseBody>
}
のように書くだろう。
しかし人によってはObservable
の代わりにSingle
やMaybe
、Flowable
を使う人もいる。
実際どっちの方がいいのか?
結論
いきなり結論から述べるが、基本的にはSingle
やMaybe
、Flowable
の方がいいと思う。
※2019/08/08現在。
ObservableとSingleの違いは?
もちろん色々とあるがここで自分が観点としたのは
Observable
はonNext
が呼ばれた回数だけデータが流れてくる。
Single
はonSuccess
が呼ばれた際にデータが1回だけ流れてくる。
つまり複数回データが流れ得る場合はObservable
、1回しかデータが流れ得ない場合はSingle
が適しているということになる。
サーバが複数回出力することがあるか?
結論を述べれば__ある__。
普通に出力を行なった場合は基本的には1回しか出力されないが、例えばPHPだとob_flush
などを駆使すれば逐次出力が可能となる。
参考に書くとこんな感じ。
<?php
header('Content-type: text/html; charset=utf-8');
for ($i = 0; $i < 10; ++$i)
{
echo $i . '<br />';
flush();
ob_flush();
sleep(1);
}
?>
ただし、ApacheやNginxなどの設定によってできないこともあるのでその場合はそっちもいじる必要がある。
なんにしてもサーバが複数回出力するという可能性はあるということだ。
実際にRetrofitで複数回出力を受け取ってみる
では、サーバが複数回出力してきた場合にどうなるか?
interface ApiService {
@GET("test")
fun test(): Observable<ResponseBody>
}
val client = OkHttpClient.Builder()
.connectTimeout(60L, TimeUnit.SECONDS)
.readTimeout(60L, TimeUnit.SECONDS)
.writeTimeout(60L, TimeUnit.SECONDS)
.build()
val retrofit = Retrofit.Builder()
.baseUrl("https://example.com/")
.addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io()))
.client(client)
.build()
val service = retrofit.create(ApiService::class.java)
service.test()
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
// output etc.
}
結論として、すべてのデータが1回でまとめて流れてきた。
つまり、サーバが複数回出力しても全部終わるまで待ってデータを流す、という動きになっている。
ソースコードを追ってみる
バージョンによって変わるとは思うので明記しておく。
2019/08/08現在。
最新のmasterコミットの8c93b59dbc57841959f5237cb141ce0b3c18b778
でソースコードを見る。
通信の実行はこの辺でやってる。
155行目からのソースコード。
@Override public Response<T> execute() throws IOException {
okhttp3.Call call;
synchronized (this) {
if (executed) throw new IllegalStateException("Already executed.");
executed = true;
if (creationFailure != null) {
if (creationFailure instanceof IOException) {
throw (IOException) creationFailure;
} else if (creationFailure instanceof RuntimeException) {
throw (RuntimeException) creationFailure;
} else {
throw (Error) creationFailure;
}
}
call = rawCall;
if (call == null) {
try {
call = rawCall = createRawCall();
} catch (IOException | RuntimeException | Error e) {
throwIfFatal(e); // Do not assign a fatal error to creationFailure.
creationFailure = e;
throw e;
}
}
}
if (canceled) {
call.cancel();
}
return parseResponse(call.execute());
}
最後に呼び出しているparseResponse
まではサクサクと処理が実行される。
が、parseResponse
で処理時間がかかっている。
199行目からのソースコード。
Response<T> parseResponse(okhttp3.Response rawResponse) throws IOException {
ResponseBody rawBody = rawResponse.body();
// Remove the body's source (the only stateful object) so we can pass the response along.
rawResponse = rawResponse.newBuilder()
.body(new NoContentResponseBody(rawBody.contentType(), rawBody.contentLength()))
.build();
int code = rawResponse.code();
if (code < 200 || code >= 300) {
try {
// Buffer the entire body to avoid future I/O.
ResponseBody bufferedBody = Utils.buffer(rawBody);
return Response.error(bufferedBody, rawResponse);
} finally {
rawBody.close();
}
}
if (code == 204 || code == 205) {
rawBody.close();
return Response.success(null, rawResponse);
}
ExceptionCatchingResponseBody catchingBody = new ExceptionCatchingResponseBody(rawBody);
try {
T body = responseConverter.convert(catchingBody);
return Response.success(body, rawResponse);
} catch (RuntimeException e) {
// If the underlying source threw an exception, propagate that rather than indicating it was
// a runtime exception.
catchingBody.throwIfCaught();
throw e;
}
}
parseResponse
の中でもT body = responseConverter.convert(catchingBody);
が特に時間がかかっている。
responseConverter
を確認するとretrofit2.BuiltInConverters$BufferingResponseBodyConverter
だった。
96行目からのソースコード。
static final class BufferingResponseBodyConverter
implements Converter<ResponseBody, ResponseBody> {
static final BufferingResponseBodyConverter INSTANCE = new BufferingResponseBodyConverter();
@Override public ResponseBody convert(ResponseBody value) throws IOException {
try {
// Buffer the entire body to avoid future I/O.
return Utils.buffer(value);
} finally {
value.close();
}
}
}
そこのコメントに書かれている通り以降I/Oを避けるためにレスポンスのボディ全体をバッファリングしてるとのことだ。
つまりここでレスポンスが全部返ってくるまで待っているためObservable
であったとしても一括でデータが流れてきたということである。
一応Utils.buffer
も見てみる。
314行目からのソースコード。
static ResponseBody buffer(final ResponseBody body) throws IOException {
Buffer buffer = new Buffer();
body.source().readAll(buffer);
return ResponseBody.create(body.contentType(), body.contentLength(), buffer);
}
というわけでbody.source().readAll
してるため全部のレスポンスを読み込むという動きになる。
総論
たとえサーバ側が複数回出力をする場合でも、Retrofitで全部のレスポンスを読み込むようになっているため、データは1回しか流れてこない。
データが1回しか流れてこないならSingle
などを使う方がシンプルにソースコードを書くことができる。
そのため、自分個人としてはSingle
などを使うのがいいのではないかと思う。
余談
StreamingResponseBodyConverter
?
ソースコードを追う過程でBufferingResponseBodyConverter
の親戚であろうStreamingResponseBodyConverter
というのがあった。
どうもアノテーションをつけることでこっちが使われるようになるらしい。
というわけでやってみる。
interface ApiService {
@Streaming
@GET("test")
fun test(): Observable<ResponseBody>
}
これなら複数回データが流れるのでは、と思ったが流れず1回だけだった。
ドキュメントを読むと変換を行わないだけ、と言っている。
確かにソースコードを読んでもそのまま返しているだけ。
87行目からのソースコード。
static final class StreamingResponseBodyConverter
implements Converter<ResponseBody, ResponseBody> {
static final StreamingResponseBodyConverter INSTANCE = new StreamingResponseBodyConverter();
@Override public ResponseBody convert(ResponseBody value) {
return value;
}
}
つまりは生のResponseBody
を渡ってくるだけ。
そのため自分でストリームの処理を行う必要がある。
余談になるのでざっくりとだが
service.test()
.subscribe {
val source = it.source()
while (!source.exhausted()) {
// source.readUtf8Line() etc.
}
}
のような感じでストリーム処理を行うことになる。
サーバでどうしても複数回出力させたい場合はこうすることで対応ができる。
しかし、結局1回しかデータが流れないのには変わらないのでやはりSingle
などでいいと思う。
総論
基本的にはSingle
やMaybe
、Flowable
を使おう。
では、よいプログラミングライフを!