Observableの結合のこと
今年はブログ書くって言うたので書いてみます。
ネタは別のとこに書いたやつではあるけど。
前の記事でも触れていますが、昨年の秋ごろからAngularを触っています。 Angularでngrx使いつつです。
ことのなりゆき
(本題だけなら飛ばしてね)
年末にふとng-japanのSlackに以下の質問。
- ngrxのeffects内でstoreのデータを使いたい場面に出くわした。
withLatestFrom(this.store$.select(fromRoot.getSelectedId))
みたいにして繋げてしまうのはありなのか?
背景的には、選択中のカテゴリに属する記事を一覧表示。
その一覧から記事を削除したとき、表示中の一覧を更新したい。
更新用のAPI呼びたいけど、選択中のカテゴリIDがほしい。
みたいな、結構あるあるな状況ではないでしょうか?
もらった回答が、
withLatestFrom
でも良いけど、このケースならActionのpayloadにID含めて渡すのはどうか?
言われてみれば、確かに。。。
いざ実装してみると、payloadを次に渡してあげないといけなくて、どうするんだろう?
foo$: this.actions$.pipe( ofType(ActionName), map(action => action.payload), mergeMap(payload => { return this.api.fetchList().pipe( map(...) catchError(...) ) }), mergeMap( /*ここにpayloadがほしい */ ...
forkJoin
を使ってまとめたらとやりたいことはできたのですが、テストが通らない。。。
なんで・・・?
理由は、結合方法にも何パターンかあって、それぞれの動きが異なるから。
この辺りちゃんと理解せずやったのでハマりました。
結合方法
Observableを結合するには、何パターンか方法があります。
- zip
- combineLatest
- forkJoin
- withLatestFrom
ざっくりこの4つ。
こんなObservableがあるとして・・・
const observable1 = Rx.Observable.interval(1000).map(x => x).take(5) const observable2 = Rx.Observable.interval(2000).map(y => y + 1).take(5)
それぞれsubscribe
すると、
observable1.subscribe(x => console.log(x)) // 0 - 1秒間隔で // 1 // 2 // 3 // 4 observable2.subscribe(y => console.log(y)) // 1 - 2秒間隔で // 2 // 3 // 4 // 5
こんな感じでobservable1
は1秒間隔で0〜4
、observable2
は1〜5
が出力されます。
これを使ってそれぞれの違いを見ていきます。
zip
まずはzip
。
Observableのそれぞれの値から順に値が計算されるObservableを返す。
訳はChromeの翻訳。
意味を汲み取りましょう。
Rx.Observable.zip(observable1, observable2).subscribe(value => console.log('zip', value)) // 出力間隔を見るためのカウンター const counter = 0 const interval = setInterval(() => { console.log(counter++) if (counter === 10) clearInterval(interval) })
コンソールの出力は以下。[x, y]
の形になっています。
0 1 zip [0, 1] 2 3 zip [1, 2] 4 5 zip [2, 3] 6 7 zip [3, 4] 8 9 zip [4, 5]
普通にsubscribe
したやつが、配列になっただけのような感じです。
間隔に注目してみると、observable1
も2秒間隔になっています。
zip
は、observable2
を待って、1ペアずつ出力されている!
combineLatest
次、combineLatest
。
Observableのそれぞれの最新値から計算されるObservableを返す。
Rx.Observable.combineLatest(observable1, observable2).subscribe(value => console.log('combineLatest', value))
これを同じように出力してみます。
0 combineLatest [1, 1] 1 combineLatest [2, 1] 2 combineLatest [3, 1] combineLatest [3, 2] 3 combineLatest [4, 2] 4 combineLatest [4, 3] 5 6 combineLatest [4, 4] 7 8 combineLatest [4, 5] 9
zip
よりも出力回数が増えています。
zip
はobservable1
とobservable2
が揃って出力されていたのに対して、combineLatest
はお構いなしに、その時の最新の値を出してきます。
ところどころ2連続で出力があるところはobservable2
の間隔。
observable1
が先に終わるので、最後の出力回数が減っているのも分かりますね。
forkJoin
次、forkJoin
。
全てのObservableが完了するのを待って、最後の値を返す。
Rx.Observable.forkJoin(observable1, observable2).subscripbe(value => console.log('forkJoin', value))
同じく出力します。
0 1 2 3 4 5 6 7 8 9 forkJoin [4, 5]
0〜9の間の出力がなくなりました。
observable1
とobservable2
の両方が完了したタイミングで出力されています。
withLatestFrom
withLatestFrom
も見ておきましょう。
ソースObservableが値を発行するたびに、その値と他の入力Observablesからの最新の値を使用して式を計算し、次にその式の出力を発行します。
これは前の3つとは少し違っています。
今回はobservable2
にobservable1
を入れて使ってみます。
observable2.withLatestFrom(observable1, (y, x) => y * x)
.subscribe(value => console.log('withLatestFrom', value))
出力は
0 1 withLatestFrom 1 -- 1 * 1 2 3 withLatestFrom 6 -- 2 * 3 4 5 withLatestFrom 12 -- 3 * 4 6 7 withLatestFrom 16 -- 4 * 4 8 9 withLatestFrom 20 -- 5 * 4
2秒間隔で、observable2 * observable1
が行われていますね。
まとめ
ユニットテストでforkJoin
が上手くいかなかったのは、おそらく完了を待ち続けていたのかな?
値が揃えばOKなら、zip
。
常に最新値でやってほしいならcombineLatest
。
1つのObservable
に別のObservable
を急に入れたいみたいな場合はwithLatestFrom
。
今回のも似てるようで全然違う挙動をするので、使い分けできるようにならないとなーと思いました。
mergeMap
、switchMap
あたりも怪しい・・・。