目的
・初めてRxJava/Kotlinでプログラミングをしていく開発メンバー用の導入資料を作りたい。
・間違いがあったら指摘してほしい。
対象となる人
・JoinしたプロジェクトにRxが導入されていたけどよくわからない人。
・Rxってよく言われているからとりあえず書いてみたい人。
・難しいことは考えず、とりあえず書き始めるために必要な知識だけ書きたい人。
書くこと
・Rxの書き方
書かないこと
・FRP, RPについて
・ストリームという概念の詳しい説明
結局Rxを導入するとどんな感じになるの
いろいろな場所でいろんなことが書かれているのですが、プログラムを書くだけであれば一旦以下のような認識を持っていれば十分だと思ってます。
1.どんどんイベントが発生してデータが流れている領域があるよ。この領域のことをストリームと呼んでいるよ。
2.このストリームを監視して、特定の条件を満たすタイミングでコールバックを呼びたいんだよ。
サンプルの説明
例えばこんなストリームがあったとします。
val stream = Observable.create<SensorEvent> { subscriber ->
val obj = object : SensorEventListener {
override fun onAccuracyChanged(sensor: Sensor?, accuracy: Int) {
}
// 端末のセンサーが変更されたときに呼ばれるイベント
override fun onSensorChanged(event: SensorEvent?) {
if (event == null) return
// もし加速度の変更だったらストリームにデータを流す
if (event.sensor.type == Sensor.TYPE_ACCELEROMETER) subscriber.onNext(event)
}
}
sensorManager.registerListener(obj, sensor, SensorManager.SENSOR_DELAY_NORMAL)
}!!
上記ソースコード上のコメントにあるように、端末の加速度センサーが反応したらストリーム上にデータが流れる、そんなストリームです。
例えば、1回上下に振ってみた場合、ストリームは以下のような感じになります。
(丸の中は加速度を表しています。)
図のように時系列に沿ったデータが流れている状態をイメージしてください。
この時、「3回振ったら何かイベントが発生してほしいな」と思ったら、
このストリームを監視して"隣り合ったデータの加速度符号が逆転している"組が"3つあった場合"に通知を飛ばしてもらえれば良いわけです。
例えば以下のように書いてみます。
stream.delay(1000, TimeUnit.MILLISECONDS) // あんまりにも早く判定されてしまうと、画面遷移でonStartが走らないパターンもあり得るので、1000msディレイを入れておく。
.map { t: SensorEvent -> XEvent(t.timestamp, t.values[0]) } // センサーイベントからx座標のイベントに変換
.filter { xEvent -> Math.abs(xEvent.x) > TIME_THRESHOLD } // ある一定の加速度を持つShake以外は破棄する
.buffer(2, 1) // 2組とする
.filter { buf -> buf[0].x * buf[1].x < 0 } // 加速度の符号が逆転しているものをとる。
.map { buf -> buf[1].timestamp / 1000000000f }// イベントを秒に変換
.buffer(SHAKE_COUNT, 1) // 直近三回にまとめる
.filter { buf -> buf[SHAKE_COUNT - 1] - buf[0] < SHAKES_PERIOD } // 1秒以内に揺れがあった場合のみ取得
.throttleFirst(SHAKES_PERIOD.toLong(), TimeUnit.SECONDS)
.subscribe() // 通知
色々書いていますが、要するに設定した様々なフィルターをピタゴラスイッチ的に潜り抜けた最後のストリームをObserverパターン的な通知で飛ばします。
こんな感じでストリームを操作していくコーディング方法がRxでのプログラミングです。(だと思ってます。)
これをストリームを使わないで実装すると以下のようになると思います。
/***********************************************************************
* 保持している状態
***********************************************************************/
// 最後の座標
var mLastX = -1.0f
var mLastY = -1.0f
var mLastZ = -1.0f
//
var mShakeCount = 0
var mLastShake = 0L
var mLastForce = 0L
var mLastTime = 0L
override fun onSensorChanged(event: SensorEvent?) {
// そもそも加速センサーのイベントでなければ終了
if (event?.sensor?.type != Sensor.TYPE_ACCELEROMETER) {
return
}
// 最後に動かしてから500ミリ秒経過していたら、連続していないのでカウントを0に。
val now = System.currentTimeMillis()
if (now - mLastForce > SHAKE_TIMEOUT) {
mShakeCount = 0
}
// 最後に振ってから100ミリ秒以内の振りは無効にしたい
if (now - mLastTime <= TIME_THRESHOLD) {
return
}
// 端末の加速度から前回の加速度=if ( speed > 350 )を引いたものをMath.absで絶対値にする。
// それを経過時間で割ったものに10000を掛けて10秒間でどれだけ加速したかを求める
val diff = now - mLastTime
val dTotal = event.values[0] + event.values[1] + event.values[2]
val dLastTotal = mLastX + mLastY + mLastZ
val speed = Math.abs(dTotal - dLastTotal) / diff * 10000
// 350より大きい速度で、振られたのが3回目(以上)でかつ、
// 最後にシェイクを検知してから100ミリ秒以上経っていたら通知を実施
if (speed > FORCE_THRESHOLD) {
if (isNotify(now)) {
mLastShake = now
mShakeCount = 0
// 画面遷移
router.goTo()
}
mLastForce = now
}
// 状態の保存
mLastTime = now
mLastX = event.values[0]
mLastY = event.values[1]
mLastZ = event.values[2]
}
上記のような書き方だと
・ 変数(状態)が多くて管理が大変。
・ 処理が何回目かによってプログラムの挙動が変わるので一見読みづらい。
・ 長い。
といった問題点が見受けられます。
そんなわけで、Rxを利用するとリーダブルで管理しやすいです。
次の話
【2.実装編】に続く。