Reactive Extensions いいよ Reactive Extensions。
Reactive Extensions を Java に移植したのが reactive4java。
.NET じゃないので Linq やラムダ式は使えない(いやスクリプトとしてなら使える)けど、並列処理を直列に書けるヨロコビは味わえます。
Android でも使えます。
Android の端末の方位を通知する処理を書いてみました(仕事で必要だったので)。
import hu.akarnokd.reactive4java.base.Func1;
import hu.akarnokd.reactive4java.base.Functions;
import hu.akarnokd.reactive4java.reactive.Observable;
import hu.akarnokd.reactive4java.reactive.Observer;
import hu.akarnokd.reactive4java.reactive.Reactive;
public final class SensorFunctions {
/**
* 端末の確度を Observable として取得する
*
* ※端末は縦向き前提です。横向きには対応していません。
* port from sato-c / sensorcheck https://github.com/sato-c/sensorcheck
*/
static public Observable<Float> getRotationAsObservable(final Context context, final int rate) {
return Reactive.createWithCloseable(new Func1<Observer<? super Float>, Closeable>() {
@Override
public Closeable invoke(final Observer<? super Float> observer) {
final AtomicBoolean _stop = new AtomicBoolean(false); // 停止フラグ
final SensorManager accelMan = (SensorManager)context.getSystemService(Context.SENSOR_SERVICE);
final SensorManager magMan = (SensorManager)context.getSystemService(Context.SENSOR_SERVICE);
final SensorEventListener listener = new SensorEventListener() {
final static private int _MATRIX_SIZE = 16;
final static private int _XYZ_AXIS = 3;
private boolean _sensorReady = false;
private float[] _R = new float[_MATRIX_SIZE];
private float[] _I = new float[_MATRIX_SIZE];
private float[] _magnetic = null;
private float[] _accel = null;
private float[] _orient = new float[_XYZ_AXIS];
/*
* http://developer.android.com/reference/android/hardware/SensorEvent.html#values
*/
@Override
public void onSensorChanged(SensorEvent event) {
// 停止されていたら読み飛ばす
if (_stop.get()) {
return;
}
switch ( event.sensor.getType() ) {
case Sensor.TYPE_ACCELEROMETER:
_accel = event.values.clone();
break;
case Sensor.TYPE_MAGNETIC_FIELD:
_sensorReady = true;
_magnetic = event.values.clone();
break;
}
if ( _sensorReady && _magnetic != null && _accel != null ) {
_sensorReady = false;
SensorManager.getRotationMatrix(_R, _I, _accel, _magnetic);
SensorManager.getOrientation(_R, _orient);
if ( _orient[0] < 0 ) {
_orient[0] = (float) (Math.toDegrees(_orient[0]) + 360.0f);
} else {
_orient[0] = (float) (Math.toDegrees(_orient[0]));
}
// 発火
observer.next(_orient[0]);
}
}
@Override
public void onAccuracyChanged(Sensor sensor, int accuracy) {
}
};
Sensor accelSensor = getSensorOnce(accelMan, Sensor.TYPE_ACCELEROMETER);
if (accelSensor == null) {
observer.error(new InvalidParameterException("Sensor.TYPE_ACCELEROMETER not found."));
return Functions.EMPTY_CLOSEABLE;
}
Sensor magSensor = getSensorOnce(accelMan, Sensor.TYPE_MAGNETIC_FIELD);
if (magSensor == null) {
observer.error(new InvalidParameterException("Sensor.TYPE_MAGNETIC_FIELD not found."));
return Functions.EMPTY_CLOSEABLE;
}
accelMan.registerListener(listener, accelSensor, rate);
magMan.registerListener(listener, magSensor, rate);
return new Closeable() {
@Override
public void close() throws IOException {
// 受信はすぐに止まらないのでフラグで読み飛ばす
if (_stop.get()) {
return;
}
_stop.set(true);
magMan.unregisterListener(listener);
accelMan.unregisterListener(listener);
}
};
}
private Sensor getSensorOnce(SensorManager sensorMan, int sensorType) {
List<Sensor> sensors = sensorMan.getSensorList(sensorType);
if (sensors.size() == 0) {
return null;
}
return sensors.get(0);
}
});
}
}
使うほうはこんな感じ。
Closeable closer =
SensorFunctions.getRotationAsObservable(
context, SensorManager.SENSOR_DELAY_NORMAL))
.register(new Observer<Float>() { // Listen 開始
@Override
public void next(Float value) {
// 角度が変更されると呼ばれる
Log.d(TAG, "next() called. - " + value);
}
@Override
public void finish() {
// 終了(Closeable.Close) すると呼ばれる
Log.d(TAG, "finish() called.");
}
@Override
public void error(Throwable err) {
// なんかエラー
Log.e(TAG, "error() called.", err);
}
});
Thread.sleep(3000); // 3秒待って
closer.Close(); // Listen 終了
.register()
すると、角度が変わる度に、 next()
が呼ばれ、角度が通知されます。
(処理自体は非同期で行われるので、上の例だと Sleep を挟まないとすぐに Listen 停止してしまいます。)
これ実際に実行すると、すごい勢いで next が呼ばれて大変なことになります。
これじゃデータ多すぎるよぉ・・・そんな時も Reactive Extensions は便利。
こうします。
Closeable closer =
ObservableBuilder.from( // メソッドチェーンできるおまじない
SensorFunctions.getRotationAsObservable(
context, SensorManager.SENSOR_DELAY_NORMAL)))
.sample(1, TimeUnit.SECONDS) // 1秒置きの最新値にフィルタ
.register(new Observer<Float>() { // Listen 開始
@Override
public void next(Float value) {
// 角度が変更されると呼ばれる
Log.d(TAG, "next() called. - " + value);
}
@Override
public void finish() {
// 終了(Closeable.Close) すると呼ばれる
Log.d(TAG, "finish() called.");
}
@Override
public void error(Throwable err) {
// なんかエラー
Log.e(TAG, "error() called.", err);
}
});
Thread.sleep(3000); // 3秒待って
closer.Close(); // Listen 終了
すばらしい! .sample(1, TimeUnit.SECONDS)
を挟むだけで、「1秒間隔での取得」に置き換えることができます。
他にもいろいろ便利な機能がある reactive4java ですが、とりまこんなもんで。
##追記 9.27
実はこのプログラム、Cold でしたー。というわけで、Hot についての記事を書きました。
##参考
- Reactive Extensionsで非同期処理を簡単に - Rx の一番わかりやすいスライドだと思います!
- neue cc - Rx
- Reactive Extensions - かずきのBlog@Hatena
- reactive4java
- TYPE_ORIENTATIONを使わずに方位角を取得 - プログラマー'sペイジ
- sato-c / sensorcheck - 方位を取得する処理を移植させて頂きました