[RxJS] 組合類型 Operators (1) - switchAll / concatAll / mergeAll / combineAll / startWith

今天要介紹的是「組合類型」的 operators,這類型的 operators 會根據指定的條件將來源 Observable 的資料進行「組合」,變成一條新的 Observable 資料流。

switchAll

還記得昨天文章提到的 switchMap 嗎?switchMap 可以將來源 Observable 事件的「資料」轉換成 Observable,switchAll 則非常相似,是將來源事件的「Observable」轉換成另一個 Observble。

乍聽之下感覺有點饒舌,直接來看一下程式碼:

const generateStream = round =>
  timer(0, 1000).pipe(
    map(data => `資料流 ${round}: ${data + 1}`),
    take(3)
  );

const source$ = new Subject();

const stream$ = source$.pipe(map(round => generateStream(round)));

stream$.pipe(switchAll())
  .subscribe(result => console.log(result));

// 第一次事件
source$.next(1);

// 第二次事件
setTimeout(() => {
  source$.next(2);
}, 4000);

// 第三次事件
setTimeout(() => {
  source$.next(3);
}, 5000);

// 資料流 1: 1
// 資料流 1: 2
// 資料流 1: 3 (資料流 1 結束)
// 資料流 2: 1 (第二次事件開始,產生資料流 2)
// 資料流 3: 1 (資料流 2 未結束,第三次事件就開始了,產生資料流 3)
// 資料流 3: 2
// 資料流 3: 3

程式碼:https://stackblitz.com/edit/mastering-rxjs-operator-switchall

上面的程式中,我們建立一個 source$ 的 Subject 以便我們在想要的時間控制事件發生,而 stream$ 則是將每次 source$ 的事件轉換成另外一個 Observable 物件,由於是使用 map 因此只是單純得到一個 Observable 物件而已,不會做任何的訂閱。

之後我們再將這條「將資料轉換成 Observable 物件」的 Observable 透過 switchAll 串在一起,switchAll 會建立一條資料流,當每次收到一個 Observable 時,switchAll 就會幫我們訂閱這個 Observable,當這個 Observable 有新事件時,就讓事件發生在 switchAll 自行建立的資料流上;而訂閱的 Observable 還沒結束同時卻收到下一個要訂閱的 Observable 時,就會將上一個 Observable 退訂閱。

這種 Observable 的事件值也是 Observable 的情境,稱為 Observable of Observable。

彈珠圖:

switchAllswitchMap 的差別在於:

switchMap 會將 callback function 呼叫回傳的 Observable 進行訂閱,因此 Observable 的來源是從 callback function 轉換過來的:

source$.pipe(switchMap(data => of(data));
                    // ^ 這個 callback function 是 swtichMap 訂閱的資料來源

switchAll 沒有這個 callback function,它的來源是從前一個資料流過來的,因此前一個資料流的「資料」必須是個 Observable:

source$.pipe(
  map(data => of(data)), // 這裡將資料轉換成 Observable
  switchAll() // 訂閱上一個 operator 傳給我的 Observable
);

因此 switchMap 適合用在明確知道下一步要使用哪個 Observable 的情境,由我們自行撰寫程式決定要轉換成什麼 Observable;而 switchAll 則適用在來源 Observable 不明確的情境,以上面的例子來說,stream$ 可能是別人寫好的程式碼,我們只需要知道它的「事件值是一個 Observable」(也就是 Observable of Observable) 需要訂閱,不需要知道背後的實作細節。

switchAllswitchMap 相同的地方在於,當收到新的 Observable 要訂閱時,都會退訂上一個 Observable,因此可以確保永遠都只有最後一個 Observable 正在執行!

concatAll

如同 switchMapswitchAll 的差別一樣,concatMapconcatAll 都會等待前一個 Observable 完成,在開始繼續新的 Observable 資料流訂閱,因此可以確保每個資料流都執行至完成:

const generateStream = round =>
  timer(0, 1000).pipe(
    map(data => `資料流 ${round}: ${data + 1}`),
    take(3)
  );

const source$ = new Subject();

const stream$ = source$.pipe(map(round => generateStream(round)));

stream$.pipe(concatAll())
  .subscribe(result => console.log(result));

source$.next(1);

setTimeout(() => {
  source$.next(2);
}, 4000);

setTimeout(() => {
  source$.next(3);
}, 5000);

// 資料流 1: 1
// 資料流 1: 2
// 資料流 1: 3
// 資料流 2: 1
// 資料流 2: 2 (此時第三次事件已經發生了,但資料流 2 還沒結束,等待中)
// 資料流 2: 3
// 資料流 3: 1 (資料流 2 結束後,才讓資料流 3 開始)
// 資料流 3: 2
// 資料流 3: 3

程式碼:https://stackblitz.com/edit/mastering-rxjs-operator-concatall

彈珠圖:

mergeAll

如同 switchMapswitchAll 的差別一樣,mergeMapmergeAll 在得到新的資料流後會直接訂閱,且不退訂之前的資料流,因此所有資料流會依照各自發生的時間直接的發生在 mergeAll 建立的資料流上:

const generateStream = round =>
  timer(0, 1000).pipe(
    map(data => `資料流 ${round}: ${data + 1}`),
    take(3)
  );

const source$ = new Subject();

const stream$ = source$.pipe(map(round => generateStream(round)));

stream$.pipe(mergeAll())
  .subscribe(result => console.log(result));

source$.next(1);

setTimeout(() => {
  source$.next(2);
}, 2000);

setTimeout(() => {
  source$.next(3);
}, 3000);

// 資料流 1: 1
// 資料流 1: 2 
// 資料流 2: 1 (第二次事件發生,產生資料流 2,原來資料流不退訂)
// 資料流 1: 3 (原來的資料流 1 在此結束)
// 資料流 3: 1 (第三次事件發生,產生資料流 3,原來的資料流不退訂)
// 資料流 2: 2
// 資料流 3: 2
// 資料流 2: 3
// 資料流 3: 3

程式碼:https://stackblitz.com/edit/mastering-rxjs-operator-mergeall

彈珠圖:

由於 mergeAll 會同時訂閱多個 Observable,當要訂閱的 Observable 數量龐大時容易產生效能的問題,因此 mergeAll 還可以帶入一個 concurrent 參數,代表同時最多可以訂閱幾個 Observable,預設值為正無限大。

source$.pipe(mergeAll(5));
// 對於傳入的 Observable,最多只會訂閱 5 個,其餘的依然必須排隊等待
source$.pipe(mergeAll(1));
// 效果等同於使用 concatAll()

combineAll

combineAllcombineLateset 非常類似,都是把資料流的資料組合在一起,規則是每當有資料流發生新事件值時,將這個事件值和其他資料流最後一次的事件值組合起來。

combineLatest 需要明確指定要組合哪些 Observable,而 combineAll 則適用在來源不明確的 Observable of Observable 的情境;另外因為來源並不明確,因此必須等到整個 Observable 結束,明確知道所有要組合的 Observable 後,才會進行相關動作。

const generateStream = round =>
  timer(0, 1000).pipe(
    map(data => `資料流 ${round}: ${data + 1}`),
    take(3)
  );

const source$ = new Subject();

const stream$ = source$.pipe(map(round => generateStream(round)));

stream$.pipe(combineAll())
  .subscribe(result => console.log(result));

source$.next(1);

setTimeout(() => {
  source$.next(2);
  // 結束資料流,不然 combineAll 會持續等待到結束
  source$.complete();
}, 3000);

// (等候 3 秒,到 source$ 結束)
// ["資料流 1: 1", "資料流 2: 1"]
// ["資料流 1: 2", "資料流 2: 1"]
// ["資料流 1: 2", "資料流 2: 2"]
// ["資料流 1: 3", "資料流 2: 2"]
// ["資料流 1: 3", "資料流 2: 3"]

彈珠圖:

startWith

startWith 會在一個 Observable 內加上一個起始值,也就是訂閱產生時會立刻最先收到的一個值,例如在前兩天練習 pairwise 時,會因為第一個事件值沒有「上一個事件值」被忽略,因此改用 scan 來解決,但也可以使用 startWith,會更加簡單:

interval(1000).pipe(
  take(6),
  map(data => data + 1),
  startWith(0), // 給予初始事件值
  pairwise() // 再搭配 pairwise 時,就能讓原始 Observable 的第一個事件有搭配資料可用
).subscribe(result => {
  console.log(result);
});
// [0, 1]
// [1, 2]
// [2, 3]
// [3, 4]
// [4, 5]
// [5, 6]

彈珠圖:

---1---2---3---4---5---6|
startWith(0)
0--1---2---3---4---5---6|

有些時候在訂閱 Observable 時需要立刻有一個預設值來處理時,startWith 就是很好用的 operator 囉!

本日小節

  • xxxAll 系列和 xxxMap 系列處理「上一個」資料流的方式一樣,差別在於 xxxAll 是使用別人傳給我們的 Observable of Observable,而 xxxMap 必須自行撰寫轉換成 Observable 的規則 。
  • cominbeAllcombineLatest 行為也非常類似,combineAll 的資料來源是 Observable of Observable,而 combineLatest 則必須明確指定要組合哪寫 Observables。
  • startWith 適合用在一些希望訂閱 Observable 時就能夠有第一筆預設值的情境。

相關資源