2
Help us understand the problem. What are the problem?

More than 3 years have passed since last update.

posted at

updated at

RxAndroidとRxSwingのScheduler

RxAndroidを参考に、RxJava2.xに対応したSchedulerをSwingのUIスレッド用に作りました。
Swiftではありません。おつかれさまでした。

doOnSubscribe

この記事の狙い

  • RxJavaを使ってる人が、RxJavaのSchedulerのコードを読みたくなる
  • RxAndroidを使ってる人が、RxAndroidのSchedulerのコードを読みたくなる

がこの記事のゴールです。
RxSwingを使ってる人は、存在しなさそうです。
また、Swingの知識は不要です。

RxSwingは存在します、が

ReactiveXのプロジェクトの一つとしてRxSwingは存在します。
https://github.com/ReactiveX/RxSwing

しかし、RxAndroidやRxCocoaと違って、
ReactiveX for platforms and frameworksには含まれていません。
http://reactivex.io/languages.html

また、RxJava1.xに対応したものはまだありません。

RxJava2.xに対応したPull Requestがありますが、放置されてる感じです。
https://github.com/ReactiveX/RxSwing/pull/63

公式のRxSwing2プロジェクトも枠はありますが、こちらのPull Requestも放置されてます。
https://github.com/ReactiveX/RxSwing2/pull/1

Swingがいつまで残るかというのもありますが、
おそらくSwingが終わるより先にRxJava3が出るでしょう。(適当なことを言っています。)
そんな早いサイクルの中でRxJava2対応したRxSwing2を出してメンテするの辛いと思うので、
リリースとそのためのレビューが躊躇されている、といったところでしょうか。

そんな中、もし最低限のサポートをするとしたら、UIを動かすためのスレッド制御でしょう。*1

UIスレッド

AndroidのUIスレッドのように、
SwingにもUIを動かすためのEvent Dispatch Thread(EDT)があります。
(ここでいうEventとはUIに関わるイベントのことです。)
ただし、AndroidはUIスレッドがメインスレッドですが、*2
SwingのEDTはJavaのメインスレッドとは別のスレッドです。

Rxシリーズでは、スレッド制御はSchedulerに任せます。
Rxシリーズでだいたい同じようなAPIでコードを書けるように、
各プラットフォームや言語でのスレッドの扱いをScheduler内部に閉じ込めているわけです。
Androidの場合はLooperおよびHandlerが、iOSの場合はGCDが、
それぞれ使われているようですね。
https://github.com/ReactiveX/RxAndroid/blob/2.x/rxandroid/src/main/java/io/reactivex/android/schedulers/AndroidSchedulers.java
https://github.com/ReactiveX/RxSwift/blob/master/RxSwift/Schedulers/MainScheduler.swift

Swingの場合はというと、幸いにもJavaのThreadExecutorを直接扱う必要は無く、
TimerクラスとSwingUtilitiesクラスが使われています。
https://github.com/ReactiveX/RxSwing/blob/0.x/src/main/java/rx/schedulers/SwingScheduler.java

RxJavaにおけるScheduler

public abstract Worker createWorker();

RxJavaにおいてSchedulerを作るというのは、何よりまず、以下のメソッドを実装することです。

public abstract Worker createWorker();

このWokerは最低限、以下のメソッドと、Disposableを実装する必要があります。

public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
public interface Disposable {
    void dispose();
    boolean isDisposed();
}

Workerの責務

そして、schedule()メソッドに渡されたRunnableについて

  • 指定の時間後に実行すること
  • disposeされた場合は実行しないこと

を保証するのがWorkerの責務となります。

RxAndroidのScheduler

RxJavaでUIを動かしたい場合、
RxAndroidであればAndroidSchedulers.mainThread()を使いますね。
これはHandlerSchedulerのインスタンスで、
メインスレッド用のHandlerをコンストラクタに渡して作られたものが使い回されます。

new HandlerScheduler(new Handler(Looper.getMainLooper()));

AndroidSchedulers.java
HandlerScheduler.java

Handlerの機能を使って、指定時間にコールバックを呼び出したり、コールバックを削除したりしています。
下のほうで抜粋しますが、待ちきれない人はコードを見て「なるほど」と言ってみてください。

RxSwing2のWorker

上述のように、RxJava2.xに対応したPRが公式レポジトリに2つありますが、
基本的に同じですので、RxSwing2にPRを出してるほうのSchedulerおよびWorkerだけ見ます。

RxJava1.x用のものとの違いはほとんどありません。
しかし、1点だけ気になることがあります。

以下、RxSwing2のWorkerクラスの抜粋です。
https://github.com/UeliKurmann/RxSwing2/blob/8012ae881aa58cbb829433554489f9b83e6411ea/src/main/java/rx/schedulers/SwingScheduler.java

        // Worker of RxSwing2 for RxJava2.x

        private final CompositeDisposable innerSubscription = new CompositeDisposable();

        @Override
        public Disposable schedule(final Runnable action, long delayTime, TimeUnit unit) {
            // 省略

            // innerSubscription をそのまま返している
            return innerSubscription;
        }

        @Override
        public void dispose() {
            innerSubscription.dispose();
        }

        @Override
        public boolean isDisposed() {
            return innerSubscription.isDisposed();
        }

つまり、次の2つの処理が等しいことになります。

  • Workerインスタンスへのdispose()
  • Workerインスタンスのschedule()が返すDisposableインスタンスへのdispose()

そして、この使い方ならCompositeDisposableではなく、
Disposables.empty()を使うべきではないかと思います。
CompositeDisposableを使ったのは、単に、旧RxSwingの名残りなのかもしれません。

RxSwing1のWorker

RxJava1.xのことを忘れたひとのために、対応するクラスを並べておきます。

Version クラス メソッド
RxJava1.x Subscription unsubscribe()
RxJava2.x Disposable dispose()

以下、RxSwing1のWorkerクラスの抜粋です。
https://github.com/ReactiveX/RxSwing/blob/281ddb9500bea4ce8b44fccde907963712647ab4/src/main/java/rx/schedulers/SwingScheduler.java

        // Worker of RxSwing for RxJava1.x

        private final CompositeSubscription innerSubscription = new CompositeSubscription();

        @Override
        public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
            // 省略

            final BooleanSubscription s = BooleanSubscription.create();

            // 省略

            innerSubscription.add(s);

            // wrap for returning so it also removes it from the 'innerSubscription'
            return Subscriptions.create(new Action0() {

                @Override
                public void call() {
                    timer.stop();
                    s.unsubscribe();
                    innerSubscription.remove(s);
                }
            });
        }

        @Override
        public void unsubscribe() {
            innerSubscription.unsubscribe();
        }

        @Override
        public boolean isUnsubscribed() {
            return innerSubscription.isUnsubscribed();
        }

RxJava1.x用のRxSwingをRxSwing1と呼ぶことにしましょう。
RxSwing1では、上記のように2つのunsubscribe()は別物となっています

  • Workerインスタンスへのunsubscribe()
  • Workerインスタンスのschedule()が返すSubscriptionインスタンスへのunsubscribe()

RxAndroidのWorker

例えば、RxJava2.x でWokerに要求されるものが変わった、ということはありえます。
何が正しいかを知るために、我らがJake神のRxAndroidを見てみましょう。

        // Worker of RxAndroid for RxJava2.x

        private volatile boolean disposed;

 @Override
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {

            // 省略

            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.

            handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

            // 省略

            return scheduled;
        }

        @Override
        public void dispose() {
            disposed = true;
            handler.removeCallbacksAndMessages(this /* token */);
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }
    private static final class ScheduledRunnable implements Runnable, Disposable {
        private final Handler handler;
        private final Runnable delegate;

        private volatile boolean disposed;

        ScheduledRunnable(Handler handler, Runnable delegate) {
            this.handler = handler;
            this.delegate = delegate;
        }

        @Override
        public void run() {
            try {
                delegate.run();
            } catch (Throwable t) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public void dispose() {
            disposed = true;
            handler.removeCallbacks(this);
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }
    }

このようにRxJava2.xに対応しているはずのRxAndroidでは、
以下の2つの処理は、等しくありません。

  • Workerインスタンスへのdispose()
  • Workerインスタンスのschedule()が返すDisposableインスタンスへのdispose()

というわけで、2つdispose()が等しいのは、問題となる可能性がありそうです。
ただし、利用パターンを私は見つけられていません。
RxAndroidのユーザ数とJake神への信仰によらずに、正否を示せるとベストなんですが、
このあたりで力と興味が尽きました。

RxJava2.xに対応したRxSwingなSchedulerの実装例

というわけで、
「1つのインスタンスに対して、タスクごとにschedule()を呼べて、個別にdispose()できる」
そのようなWorkerを返すのがSchedulerの責務なのだと思います。

以上を踏まえて、RxJava2.xに対応したRxSwingなSchedulerの実装例です。
https://github.com/guignol/SwingBinding/blob/05913b51d9aa6daf8cb0aa3113c699f23879c77e/src/main/java/com/github/guignol/swing/rx/SwingScheduler.java

public class SwingScheduler extends Scheduler {

    private static final SwingScheduler INSTANCE = new SwingScheduler();

    public static SwingScheduler getInstance() {
        return INSTANCE;
    }

    private SwingScheduler() {
    }

    @Override
    public Disposable scheduleDirect(Runnable run) {
        // TODO RxAndroidは何故これをオーバーライドしてるのか
        // TODO このまま任せると、DisposableTaskでdisposeされる可能性がある?
        return super.scheduleDirect(run);
    }

    @Override
    public Worker createWorker() {
        return new InnerSwingScheduler();
    }

    private static class InnerSwingScheduler extends Worker {

        private final CompositeDisposable composite = new CompositeDisposable();

        @Override
        public Disposable schedule(Runnable original, long delayTime, TimeUnit unit) {
            if (original == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");

            final long delay = Math.max(0, unit.toMillis(delayTime));
            assertThatTheDelayIsValidForTheSwingTimer(delay);

            final Disposable local;
            if (delay == 0) {
                local = Disposables.empty();
                if (SwingUtilities.isEventDispatchThread()) {
                    // 即時実行
                    original.run();
                } else {
                    SwingUtilities.invokeLater(() -> {
                        if (composite.isDisposed() || local.isDisposed()) {
                            return;
                        }
                        original.run();
                        composite.remove(local);
                    });
                }
            } else {
                final Timer timer = new Timer((int) delay, null);
                local = Disposables.fromRunnable(timer::stop);
                timer.setRepeats(false);
                timer.addActionListener(e -> {
                    if (composite.isDisposed() || local.isDisposed()) {
                        return;
                    }
                    original.run();
                    composite.remove(local);
                });
                timer.start();
            }
            composite.add(local);

            // UeliKurmannのSwingSchedulerはWorkerのCompositeDisposableを返していて、タスク単位のdisposeができない
            // AndroidのSchedulerもタスク単位のdisposeを提供しているので必要だと思うが、違いの出る利用パターンを見つけられていない
            return local;
        }

        @Override
        public void dispose() {
            composite.dispose();
        }

        @Override
        public boolean isDisposed() {
            return composite.isDisposed();
        }

        private static void assertThatTheDelayIsValidForTheSwingTimer(long delay) {
            if (delay < 0 || delay > Integer.MAX_VALUE) {
                throw new IllegalArgumentException(String.format("The swing timer only accepts non-negative delays up to %d milliseconds.", Integer.MAX_VALUE));
            }
        }
    }
}

コードとしては小さなものですが、
RxAndroidという見本があり、RxSwingという実装例があり、
なかなか楽しいタスクでした。

doOnDispose

他にも、Schedulerに関して、気になってる謎がありますが、この記事は以上です。
おつかれさまでしたー。

*1

実際、RxAndroidはSchedulerを提供するライブラリです。
RxSwing2もそういう方針にすればリリースできそうですが。

*2

JVMで動いているわけじゃないのでアレですが、
Zygoteがアプリのプロセスをforkして最初に実行するのがActivtyThreadクラスのmain()メソッドで、
ここでUIスレッドとなるイベントループが回ります。
なので、実体としてもJavaのメインスレッドと同様のものです。
この辺については詳しくは『Androidを支える技術』を読みましょう。最高ですので是非。
https://www.amazon.co.jp/dp/4774187593

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
2
Help us understand the problem. What are the problem?