RxJS: How Higher Order Operators Simplify Code

Discover Angular’s RxJS HOOs: Demystify Higher Order Observables. Explore switchMap, exhaustMap, concatMap, and mergeMap benefits.

RxJS: How Higher Order Operators Simplify Code

If you’ve worked with Angular, you’ve probably encountered RxJS. Observables, overhead constructs, many arguments to the pipe method, and each argument is returned by a different function with a different number of arguments. There are intuitive functions like filter or map. The former explicitly filters values in the observable, and the latter changes these values. Such functions are called operators. And the deeper you go into RxJS, the more different operators you learn. And eventually, you get to observables of observables – Higher Order Observables. There are special operators to work with such flows – Higher Order Operators (HOO). They can flatten observables or, in other words, make them normal.

In this article, I will show you that there is nothing mythical about HOO, and I will tell you in which cases you need to use Higher Order Operators. Now you will think that this is a boring longread, but take your time. We will look at just 4 operators: switchMap, exhaustMap, concatMap and mergeMap.

switchMap

switchMap is by far the most popular of all. But why? For one simple reason – this operator gets rid of the race condition.

Let’s examine the code:

const searchInput = document.getElementById('search-input'); 
const search$ = fromEvent(searchInput, 'input').pipe( 
  map(event => event.target.value) 
); 
 
search$.subscribe((query) => { 
  http.get('/search', {params: {query}}).subcribe((result) => { 
    console.log(result); 
  }) 
})

In this code, we find the input that the user is interacting with and subscribe to the input event. That is, the search$ observable emits strings. Inside the subscription, we can see that for each string emit, a request is sent to the server and the service response is output to the console.

You can see at least two problems in this code:

  1. Race state. Usually, when searching for something, it is important for a user to see the result of exactly the last query. But this kind of code does not give us a guarantee that the last data printed to the console corresponds to the last string emitted in the search$ observable.
  2. Subscribe in subscribe and no unsubscribe. There is an excellent rule of thumb that can save you a lot of trouble — there should be an unsubscribed for every subscription. This will at least reduce the likelihood of memory leaks.

But let’s think about how search should work:

  1. When receiving a new string, check for active requests
  2. If there are no active requests, go to step 4
  3. Unsubscribe from previous requests
  4. Subscribe to a new request
  5. Output result to the console

This can be implemented in the following way:

const searchInput = document.getElementById('search-input'); 
 
const search$ = fromEvent(searchInput, 'input').pipe( 
  map(event => event.target.value) 
); 
 
 
let subscription; 
search$.subscribe((query) => { 
  if (subscription) { 
    subscription.unsubscribe(); 
  } 
 
  subscription = http.get('/search', {params: {query}}).subcribe((result) => { 
    console.log(result); 
  }) 
})

But what if I told you that the first 4 requirement clauses are done by the switchMap operator? Let's rewrite the code:

const searchInput = document.getElementById('search-input'); 
const search$ = fromEvent(searchInput, 'input').pipe( 
  map(event => event.target.value) 
); 
 
search$.pipe( 
  switchMap((query) => http.get('/search', {params: {query}})) 
).subscribe((result) => { 
  console.log(result); 
});

The switchMap ensures that we always get the results of the last observable and saves us from the race condition. And a nice bonus is that unsubscribing from the external subscription will automatically unsubscribe from the internal one. Profit!

Resume. switchMap can be used in cases when we care about the result of the last action, for example, when searching or implementing infinite scrolling. If all previous actions can be ignored, then we can safely take switchMap.

exhaustMap

exhaustMap is definitely the most unpopular of all. The reason is not completely clear to me, but it can also be used to implement search, but using a different approach.

const searchInput = document.getElementById('search-input'); 
const searchButton = document.getElementById('search-button'); 
 
const searchButtonClick$ = fromEvent(searchButton, 'click'); 
 
searchButtonClick$.subscribe((result) => { 
  const query = searchInput.value; 
  http.get('/search', { params: { query } } }).subscribe((result) => { 
    console.log(result); 
  }); 
});

In this code, the initiator of the query is the button click.

Actually, this code has the same problems as in the previous example, but here we have different requirements:

  1. Check if there are active requests by pressing the button
  2. If there is an active one, do nothing and wait for the next click
  3. If there is no active one, subscribe to the request execution
  4. Output the result to the console

Let’s implement it:

const searchInput = document.getElementById('search-input'); 
const searchButton = document.getElementById('search-button'); 
 
const searchButtonClick$ = fromEvent(searchButton, 'click'); 
 
let isRequestPending = false; 
searchButtonClick$.subscribe((result) => { 
  if (isRequestPending) { 
    return; 
  } 
 
  isRequestPending = true; 
  const query = searchInput.value; 
  http.get('/search', { params: { query } } }).subscribe((result) => { 
    isRequestPending = false; 
    console.log(result); 
  }); 
});

As in the first case, we needed one more variable outside the observable. This adds complexity to the code and if we want to change it we will have to rewrite everything. Well, as you have already guessed, exhaustMap comes to our rescue.

const searchInput = document.getElementById('search-input'); 
const searchButton = document.getElementById('search-button'); 
 
const searchInput$ = fromEvent(searchButton, 'click'); 
 
searchInput$.pipe( 
  exhaustMap(() => { 
    const query = searchInput.value; 
    return http.get('/search', { params: { query } } 
  }) 
).subscribe((result) => { 
  console.log(result); 
});

Resume. exhaustMap should be used in cases where, when an observable subscription is active, the others can be ignored, as in the case of a keystroke search or, for example, when skipping animation start events when playing an animation.

mergeMap

mergeMap is an operator that merges all internal observables into a single output observable. This means that the inner observables can terminate in any order, and their results will be merged. And that's the simplest explanation I could squeeze out of myself.

Let’s move on to examples:

entityId$.subscribe((id) => { 
  entityService.get(id).subscribe(() => { 
    entityStore.upsertItem(entity); 
  }); 
});

In this code, we see entityId$ is an observable of strings with the ID of some entity. Here we have to request entity data from the server for each ID and add or update this entity in the store. Actually, this is what our code does and there is nothing to solve here. But there are problems, and in this case, they are absolutely identical to the previous ones. Let's try to complicate the task and introduce a limit of 3 requests at one moment of time.

We solve it:

const CONCURRENT_REQUESTS = 3; 
let activeRequests = 0; 
const queue = []; 
 
function processNext() { 
  if (queue.length === 0 || activeRequests >= CONCURRENT_REQUESTS) { 
    return; 
  } 
 
  const id = queue.shift(); 
  activeRequests++; 
 
  entityService.get(id).subscribe(entity => { 
    entityStore.upsertItem(entity); 
    activeRequests--; 
 
    processNext(); 
  }); 
} 
 
entityId$.subscribe(id => { 
  queue.push(id); 
  processNext(); 
});

I didn’t even try to test the code to see if it worked because I wrote it directly in the text editor. The code became difficult to read. The processNext function has several side effects inside. And there are additional variables outside the observable and function. Putting it all together is quite difficult.

These are precisely the kind of tasks that mergeMap solves. Let's rewrite the first example using this operator:

entityId$.pipe( 
  mergeMap((id) => entityService.get(id)) 
).subscribe((entity) => { 
  entityStore.upsertItem(entity); 
});

In this code, mergeMap subscribes to each observable returned by entityService.get(id) and outputs their values in one single observable.

Okay, but what about the limit of 3 requests at one time? It turns out that mergeMap already knows how to do everything. The second argument to mergeMap takes a number that just sets up the competitiveness.

entityId$.pipe( 
  mergeMap((id) => entityService.get(id), 3)) 
).subscribe((entity) => { 
  entityStore.upsertItem(entity); 
});

That’s it!

Resume. mergeMap is great when you want to perform parallel actions and merge their results. However, you have to be careful as there can be many active queries if the source observable emits values too fast.

concatMap

concatMap is the last higher-order operator. The key difference is that concatMap maintains the order of execution. It will wait for one internal observable to complete before moving on to the next.

To practically look at its use, we can take the previous example and change its requirements. It so happens that we are no longer satisfied with disordered requests, and we want to execute them one by one instead of in parallel. That is, the competitiveness should become equal to one.

entityId$.pipe( 
  mergeMap((id) => entityService.get(id), 1)) 
).subscribe((entity) => { 
  entityStore.upsertItem(entity); 
});

But mergeMap with concreteness 1 does the same thing as concatMap! Literally. This is perfectly visible in the source code operator.

That is, the use of mergeMap with concurrency 1 is such a frequent case that it was put into a separate operator.

Resume. concatMap is great for situations where the order of execution is important. If you want to process a sequence of actions without parallel processing, this is your choice.

Conclusion

Higher-order operators are a powerful tool in the arsenal of every developer working with reactive programming. They provide flexibility and elegance when handling complex data observables and allow us to shorten code, making it more readable and maintainable.

  • The switchMap is an excellent choice when we only care about the last emit result, such as in real-time search.
  • exhaustMap is ideal for cases where we need to ignore new observables until the current one terminates.
  • mergeMap allows multiple incoming observables to be processed in parallel, but can lead to overloading if the number of concurrent observables is not controlled.
  • concatMap guarantees the order of processing by executing each internal observable sequentially.

When used correctly, these operators can handle many reactive tasks, be it event observables from the user interface, HTTP requests, or even complex animation sequences.

However, the keyword here is correct usage. Always analyze the requirements of your application and choose the right operator carefully. This will help you avoid unwanted side effects and create reactive solutions that can scale and be easily maintained.

Reactive programming offers many tools, and among them, higher-order operators hold a special place. By taking time to study and understand their peculiarities, you can significantly improve the quality and efficiency of your code.

Read more