RxAndroidを参考に、RxJava2.xに対応したScheduler
をSwingのUIスレッド用に作りました。
Swiftではありません。おつかれさまでした。
doOnSubscribe
この記事の狙い
- RxJavaを使ってる人が、RxJavaのSchedulerのコードを読みたくなる
- RxAndroidを使ってる人が、RxAndroidのSchedulerのコードを読みたくなる
がこの記事のゴールです。
RxSwingを使ってる人は、存在しなさそうです。
また、Swingの知識は不要です。
RxSwingは存在します、が
ReactiveXのプロジェクトの一つとしてRxSwingは存在します。
https://github.com/ReactiveX/RxSwing
しかし、RxAndroidやRxCocoaと違って、
ReactiveX for platforms and frameworks
には含まれていません。
http://reactivex.io/languages.html
また、RxJava1.xに対応したものはまだありません。
RxJava2.xに対応したPull Requestがありますが、放置されてる感じです。
https://github.com/ReactiveX/RxSwing/pull/63
公式のRxSwing2プロジェクトも枠はありますが、こちらのPull Requestも放置されてます。
https://github.com/ReactiveX/RxSwing2/pull/1
Swingがいつまで残るかというのもありますが、
おそらくSwingが終わるより先にRxJava3が出るでしょう。(適当なことを言っています。)
そんな早いサイクルの中でRxJava2対応したRxSwing2を出してメンテするの辛いと思うので、
リリースとそのためのレビューが躊躇されている、といったところでしょうか。
そんな中、もし最低限のサポートをするとしたら、UIを動かすためのスレッド制御でしょう。*1
UIスレッド
AndroidのUIスレッドのように、
SwingにもUIを動かすためのEvent Dispatch Thread(EDT)があります。
(ここでいうEventとはUIに関わるイベントのことです。)
ただし、AndroidはUIスレッドがメインスレッドですが、*2
SwingのEDTはJavaのメインスレッドとは別のスレッドです。
Rxシリーズでは、スレッド制御はScheduler
に任せます。
Rxシリーズでだいたい同じようなAPIでコードを書けるように、
各プラットフォームや言語でのスレッドの扱いをScheduler
内部に閉じ込めているわけです。
Androidの場合はLooperおよびHandlerが、iOSの場合はGCDが、
それぞれ使われているようですね。
https://github.com/ReactiveX/RxAndroid/blob/2.x/rxandroid/src/main/java/io/reactivex/android/schedulers/AndroidSchedulers.java
https://github.com/ReactiveX/RxSwift/blob/master/RxSwift/Schedulers/MainScheduler.swift
Swingの場合はというと、幸いにもJavaのThread
やExecutor
を直接扱う必要は無く、
Timer
クラスとSwingUtilities
クラスが使われています。
https://github.com/ReactiveX/RxSwing/blob/0.x/src/main/java/rx/schedulers/SwingScheduler.java
RxJavaにおけるScheduler
public abstract Worker createWorker();
RxJavaにおいてScheduler
を作るというのは、何よりまず、以下のメソッドを実装することです。
public abstract Worker createWorker();
このWoker
は最低限、以下のメソッドと、Disposable
を実装する必要があります。
public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
public interface Disposable {
void dispose();
boolean isDisposed();
}
Worker
の責務
そして、schedule()
メソッドに渡されたRunnable
について
- 指定の時間後に実行すること
- disposeされた場合は実行しないこと
を保証するのがWorker
の責務となります。
RxAndroidのScheduler
RxJavaでUIを動かしたい場合、
RxAndroidであればAndroidSchedulers.mainThread()
を使いますね。
これはHandlerScheduler
のインスタンスで、
メインスレッド用のHandlerをコンストラクタに渡して作られたものが使い回されます。
new HandlerScheduler(new Handler(Looper.getMainLooper()));
AndroidSchedulers.java
HandlerScheduler.java
Handlerの機能を使って、指定時間にコールバックを呼び出したり、コールバックを削除したりしています。
下のほうで抜粋しますが、待ちきれない人はコードを見て「なるほど」と言ってみてください。
RxSwing2のWorker
上述のように、RxJava2.xに対応したPRが公式レポジトリに2つありますが、
基本的に同じですので、RxSwing2にPRを出してるほうのScheduler
およびWorker
だけ見ます。
RxJava1.x用のものとの違いはほとんどありません。
しかし、1点だけ気になることがあります。
以下、RxSwing2のWorker
クラスの抜粋です。
https://github.com/UeliKurmann/RxSwing2/blob/8012ae881aa58cbb829433554489f9b83e6411ea/src/main/java/rx/schedulers/SwingScheduler.java
// Worker of RxSwing2 for RxJava2.x
private final CompositeDisposable innerSubscription = new CompositeDisposable();
@Override
public Disposable schedule(final Runnable action, long delayTime, TimeUnit unit) {
// 省略
// innerSubscription をそのまま返している
return innerSubscription;
}
@Override
public void dispose() {
innerSubscription.dispose();
}
@Override
public boolean isDisposed() {
return innerSubscription.isDisposed();
}
つまり、次の2つの処理が等しいことになります。
-
Worker
インスタンスへのdispose()
-
Worker
インスタンスのschedule()
が返すDisposable
インスタンスへのdispose()
そして、この使い方ならCompositeDisposable
ではなく、
Disposables.empty()
を使うべきではないかと思います。
CompositeDisposable
を使ったのは、単に、旧RxSwingの名残りなのかもしれません。
RxSwing1のWorker
RxJava1.xのことを忘れたひとのために、対応するクラスを並べておきます。
Version | クラス | メソッド |
---|---|---|
RxJava1.x | Subscription | unsubscribe() |
RxJava2.x | Disposable | dispose() |
以下、RxSwing1のWorker
クラスの抜粋です。
https://github.com/ReactiveX/RxSwing/blob/281ddb9500bea4ce8b44fccde907963712647ab4/src/main/java/rx/schedulers/SwingScheduler.java
// Worker of RxSwing for RxJava1.x
private final CompositeSubscription innerSubscription = new CompositeSubscription();
@Override
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
// 省略
final BooleanSubscription s = BooleanSubscription.create();
// 省略
innerSubscription.add(s);
// wrap for returning so it also removes it from the 'innerSubscription'
return Subscriptions.create(new Action0() {
@Override
public void call() {
timer.stop();
s.unsubscribe();
innerSubscription.remove(s);
}
});
}
@Override
public void unsubscribe() {
innerSubscription.unsubscribe();
}
@Override
public boolean isUnsubscribed() {
return innerSubscription.isUnsubscribed();
}
RxJava1.x用のRxSwingをRxSwing1と呼ぶことにしましょう。
RxSwing1では、上記のように2つのunsubscribe()
は別物となっています
-
Worker
インスタンスへのunsubscribe()
-
Worker
インスタンスのschedule()
が返すSubscription
インスタンスへのunsubscribe()
RxAndroidのWorker
例えば、RxJava2.x でWokerに要求されるものが変わった、ということはありえます。
何が正しいかを知るために、我らがJake神のRxAndroidを見てみましょう。
// Worker of RxAndroid for RxJava2.x
private volatile boolean disposed;
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
// 省略
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
// 省略
return scheduled;
}
@Override
public void dispose() {
disposed = true;
handler.removeCallbacksAndMessages(this /* token */);
}
@Override
public boolean isDisposed() {
return disposed;
}
private static final class ScheduledRunnable implements Runnable, Disposable {
private final Handler handler;
private final Runnable delegate;
private volatile boolean disposed;
ScheduledRunnable(Handler handler, Runnable delegate) {
this.handler = handler;
this.delegate = delegate;
}
@Override
public void run() {
try {
delegate.run();
} catch (Throwable t) {
RxJavaPlugins.onError(t);
}
}
@Override
public void dispose() {
disposed = true;
handler.removeCallbacks(this);
}
@Override
public boolean isDisposed() {
return disposed;
}
}
このようにRxJava2.xに対応しているはずのRxAndroidでは、
以下の2つの処理は、等しくありません。
Worker
インスタンスへのdispose()
Worker
インスタンスのschedule()
が返すDisposable
インスタンスへのdispose()
というわけで、2つdispose()
が等しいのは、問題となる可能性がありそうです。
ただし、利用パターンを私は見つけられていません。
RxAndroidのユーザ数とJake神への信仰によらずに、正否を示せるとベストなんですが、
このあたりで力と興味が尽きました。
RxJava2.xに対応したRxSwingなScheduler
の実装例
というわけで、
「1つのインスタンスに対して、タスクごとにschedule()
を呼べて、個別にdispose()
できる」
そのようなWorker
を返すのがScheduler
の責務なのだと思います。
以上を踏まえて、RxJava2.xに対応したRxSwingなScheduler
の実装例です。
https://github.com/guignol/SwingBinding/blob/05913b51d9aa6daf8cb0aa3113c699f23879c77e/src/main/java/com/github/guignol/swing/rx/SwingScheduler.java
public class SwingScheduler extends Scheduler {
private static final SwingScheduler INSTANCE = new SwingScheduler();
public static SwingScheduler getInstance() {
return INSTANCE;
}
private SwingScheduler() {
}
@Override
public Disposable scheduleDirect(Runnable run) {
// TODO RxAndroidは何故これをオーバーライドしてるのか
// TODO このまま任せると、DisposableTaskでdisposeされる可能性がある?
return super.scheduleDirect(run);
}
@Override
public Worker createWorker() {
return new InnerSwingScheduler();
}
private static class InnerSwingScheduler extends Worker {
private final CompositeDisposable composite = new CompositeDisposable();
@Override
public Disposable schedule(Runnable original, long delayTime, TimeUnit unit) {
if (original == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
final long delay = Math.max(0, unit.toMillis(delayTime));
assertThatTheDelayIsValidForTheSwingTimer(delay);
final Disposable local;
if (delay == 0) {
local = Disposables.empty();
if (SwingUtilities.isEventDispatchThread()) {
// 即時実行
original.run();
} else {
SwingUtilities.invokeLater(() -> {
if (composite.isDisposed() || local.isDisposed()) {
return;
}
original.run();
composite.remove(local);
});
}
} else {
final Timer timer = new Timer((int) delay, null);
local = Disposables.fromRunnable(timer::stop);
timer.setRepeats(false);
timer.addActionListener(e -> {
if (composite.isDisposed() || local.isDisposed()) {
return;
}
original.run();
composite.remove(local);
});
timer.start();
}
composite.add(local);
// UeliKurmannのSwingSchedulerはWorkerのCompositeDisposableを返していて、タスク単位のdisposeができない
// AndroidのSchedulerもタスク単位のdisposeを提供しているので必要だと思うが、違いの出る利用パターンを見つけられていない
return local;
}
@Override
public void dispose() {
composite.dispose();
}
@Override
public boolean isDisposed() {
return composite.isDisposed();
}
private static void assertThatTheDelayIsValidForTheSwingTimer(long delay) {
if (delay < 0 || delay > Integer.MAX_VALUE) {
throw new IllegalArgumentException(String.format("The swing timer only accepts non-negative delays up to %d milliseconds.", Integer.MAX_VALUE));
}
}
}
}
コードとしては小さなものですが、
RxAndroidという見本があり、RxSwingという実装例があり、
なかなか楽しいタスクでした。
doOnDispose
他にも、Schedulerに関して、気になってる謎がありますが、この記事は以上です。
おつかれさまでしたー。
註
*1
実際、RxAndroidはSchedulerを提供するライブラリです。
RxSwing2もそういう方針にすればリリースできそうですが。
*2
JVMで動いているわけじゃないのでアレですが、
Zygoteがアプリのプロセスをforkして最初に実行するのがActivtyThreadクラスのmain()メソッドで、
ここでUIスレッドとなるイベントループが回ります。
なので、実体としてもJavaのメインスレッドと同様のものです。
この辺については詳しくは『Androidを支える技術』を読みましょう。最高ですので是非。
https://www.amazon.co.jp/dp/4774187593