typeof Diary

VimとかJSとか。

Observableの結合のこと

今年はブログ書くって言うたので書いてみます。
ネタは別のとこに書いたやつではあるけど。

前の記事でも触れていますが、昨年の秋ごろからAngularを触っています。 Angularでngrx使いつつです。

ことのなりゆき

(本題だけなら飛ばしてね)

年末にふとng-japanのSlackに以下の質問。

  • ngrxのeffects内でstoreのデータを使いたい場面に出くわした。
  • withLatestFrom(this.store$.select(fromRoot.getSelectedId))みたいにして繋げてしまうのはありなのか?

背景的には、選択中のカテゴリに属する記事を一覧表示。
その一覧から記事を削除したとき、表示中の一覧を更新したい。
更新用のAPI呼びたいけど、選択中のカテゴリIDがほしい。  みたいな、結構あるあるな状況ではないでしょうか?

もらった回答が、

  • withLatestFromでも良いけど、このケースならActionのpayloadにID含めて渡すのはどうか?

言われてみれば、確かに。。。

いざ実装してみると、payloadを次に渡してあげないといけなくて、どうするんだろう?

foo$: this.actions$.pipe(
  ofType(ActionName),
  map(action => action.payload),
  mergeMap(payload => {
    return this.api.fetchList().pipe(
      map(...)
      catchError(...)
    )
  }),
  mergeMap( /*ここにpayloadがほしい */
  ...

forkJoinを使ってまとめたらとやりたいことはできたのですが、テストが通らない。。。
なんで・・・?

理由は、結合方法にも何パターンかあって、それぞれの動きが異なるから。
この辺りちゃんと理解せずやったのでハマりました。

結合方法

Observableを結合するには、何パターンか方法があります。

  1. zip
  2. combineLatest
  3. forkJoin
  4. withLatestFrom

ざっくりこの4つ。

こんなObservableがあるとして・・・

const observable1 = Rx.Observable.interval(1000).map(x => x).take(5)
const observable2 = Rx.Observable.interval(2000).map(y => y + 1).take(5)

それぞれsubscribeすると、

observable1.subscribe(x => console.log(x))
// 0 - 1秒間隔で
// 1
// 2
// 3
// 4

observable2.subscribe(y => console.log(y))
// 1 - 2秒間隔で
// 2
// 3
// 4
// 5

こんな感じでobservable1は1秒間隔で0〜4observable21〜5が出力されます。
これを使ってそれぞれの違いを見ていきます。

zip

まずはzip

Observableのそれぞれの値から順に値が計算されるObservableを返す。

訳はChromeの翻訳。
意味を汲み取りましょう。

Rx.Observable.zip(observable1, observable2).subscribe(value => console.log('zip', value))

// 出力間隔を見るためのカウンター
const counter = 0
const interval = setInterval(() => {
  console.log(counter++)
  if (counter === 10) clearInterval(interval)
})

コンソールの出力は以下。[x, y]の形になっています。

0
1
zip [0, 1]
2
3
zip [1, 2]
4
5
zip [2, 3]
6
7
zip [3, 4]
8
9
zip [4, 5]

普通にsubscribeしたやつが、配列になっただけのような感じです。
間隔に注目してみると、observable1も2秒間隔になっています。

zipは、observable2を待って、1ペアずつ出力されている!

combineLatest

次、combineLatest

Observableのそれぞれの最新値から計算されるObservableを返す。

Rx.Observable.combineLatest(observable1, observable2).subscribe(value => console.log('combineLatest', value))

これを同じように出力してみます。

0
combineLatest [1, 1]
1
combineLatest [2, 1]
2
combineLatest [3, 1]
combineLatest [3, 2]
3
combineLatest [4, 2]
4
combineLatest [4, 3]
5
6
combineLatest [4, 4]
7
8
combineLatest [4, 5]
9

zipよりも出力回数が増えています。
zipobservable1observable2が揃って出力されていたのに対して、combineLatestはお構いなしに、その時の最新の値を出してきます。
ところどころ2連続で出力があるところはobservable2の間隔。
observable1が先に終わるので、最後の出力回数が減っているのも分かりますね。

forkJoin

次、forkJoin

全てのObservableが完了するのを待って、最後の値を返す。

Rx.Observable.forkJoin(observable1, observable2).subscripbe(value => console.log('forkJoin', value))

同じく出力します。

0
1
2
3
4
5
6
7
8
9
forkJoin [4, 5]

0〜9の間の出力がなくなりました。
observable1observable2の両方が完了したタイミングで出力されています。

withLatestFrom

withLatestFromも見ておきましょう。

ソースObservableが値を発行するたびに、その値と他の入力Observablesからの最新の値を使用して式を計算し、次にその式の出力を発行します。

これは前の3つとは少し違っています。
今回はobservable2observable1を入れて使ってみます。

observable2.withLatestFrom(observable1, (y, x) => y * x)
  .subscribe(value => console.log('withLatestFrom', value))

出力は

0
1
withLatestFrom 1 -- 1 * 1
2
3
withLatestFrom 6 -- 2 * 3
4
5
withLatestFrom 12 -- 3 * 4
6
7
withLatestFrom 16 -- 4 * 4
8
9
withLatestFrom 20 -- 5 * 4

2秒間隔で、observable2 * observable1が行われていますね。

まとめ

ユニットテストforkJoinが上手くいかなかったのは、おそらく完了を待ち続けていたのかな?
値が揃えばOKなら、zip
常に最新値でやってほしいならcombineLatest
1つのObservableに別のObservableを急に入れたいみたいな場合はwithLatestFrom

今回のも似てるようで全然違う挙動をするので、使い分けできるようにならないとなーと思いました。

mergeMapswitchMapあたりも怪しい・・・。