Share some Observable love (how share() works in Observable + angular2)

Summary

I’m still relatively new to RxJs (5). They have some good documentation (more in progress) but a lot of it is just experience and understanding the mental model. This blog is about the .share() operator and how it impacts angular2 especially regarding the async pipe. The share() operator allows multiple streams to be watched/subscribed but share the same underlying stream so that you only have 1 execution stack (think http calls over the wire where you may have many consumers but only want a single call to be made for the many consumers).

Marbles

First, for common RxJS/stream visuals .. check out RxMarbles. It’s a REALLY good way of visualizing stream operations (which is how I tend to think of Observables … streams to me seems a more natural word for me). That said … no marbles for the share operator….

ASCII Art (lo-tech marbles)

Since there’s no fancy RxMarbles, I’ll resort to lo-tech ASCII diagrams for description here (also for a great intro to streams check out this gist). Out of the box, each subscriber will get its own reference when subscribing to a stream

(stream that has 1, 2, 5 values streaming down)

Subscriber 1:
-----1-----2-----5-----|->

Subscriber 2:
-----1-----2-----5-----|->

The above is a basic example where there is 1 stream and 2 subscribers. Each subscriber will get a copy of the stream and process independently. That allows each to act independently and unaware that anyone else is doing anything, including any ‘side effects’ on the stream like map processing or other operations.

While this is great for keeping consumers consistent and independent, it’s pretty terrible for performance if you are doing things in the stream processing that involve network, heavy latency, or performance intensive computations. This is where .share() comes in. If we take the above diagram and apply .share() to it, it becomes

(stream that has 1, 2, 5 values streaming down)

                         Subscriber 1:
                     /-------|->
                    /
-----1-----2-----5-|
                    \
                     \-------|->
                         Subscriber 2:

If instead of numbers being sent on the stream we think of each number as an operation on the stream to OBTAIN the numbers (such as a map, perhaps deriving some deep mathematical computation like prime number generation) it starts to make more sense in understanding why we want to use share().

Example/Plunker

I have an angular2 plunker that illustrates things. In the plunker I am making an http call with some latency (anywhere from 1-2seconds). I have 2 calls in my angular2 service

    /**
     * Function will return an observable with the data requested. This can be shared across
     * subscribers and will not cause extra http traffic.
     */
    getDataShared(postNum: number): Observable {
        let calls = 0;
 
        return this.http
                   .get('http://jsonplaceholder.typicode.com/posts/' + postNum)
                   .do(
                     () => {
                       console.log("side effect on shared");
                     }
                   )
                   .map(
                      (res) => {
                        let json = res.json();
                        calls++;
                        json.networkCalls = calls;
                        console.log(JSON.stringify(json));
                        return json;
                      })
                   .share();
 
    }
    /**
     * Function will return an observable with the data requested.
     * There is no share operator used here so each subscriber will result in the entire stream firing.
     */
    getDataNotShared(postNum: number): Observable {
        let calls = 0;
 
        return this.http
                   .get('http://jsonplaceholder.typicode.com/posts/' + postNum)
                   .do(
                     () => {
                       console.log("side effect on not shared");
                     }
                   )
                   .map(
                      (res) => {
                        let json = res.json();
                        calls++;
                        json.networkCalls = calls;
                        console.log(JSON.stringify(json));
                        return json;
                      });
 
    }

I also have bindings via the async pipe in angular2

 <table class="table">
   <tr><td>UserID:</td><td>{{(dataServiceObservableNotShared | async)?.userId}}</td></tr>
   <tr><td>ID:</td><td>{{(dataServiceObservableNotShared | async)?.id}}</td></tr>
   <tr><td>Title:</td><td>{{(dataServiceObservableNotShared | async)?.title}}</td></tr>
   <tr><td>Body:</td><td>{{(dataServiceObservableNotShared | async)?.body}}</td></tr>
 </table>
 <table class="table">
   <tr><td>UserID:</td><td>{{(dataServiceObservableShared | async)?.userId}}</td></tr>
   <tr><td>ID:</td><td>{{(dataServiceObservableShared | async)?.id}}</td></tr>
   <tr><td>Title:</td><td>{{(dataServiceObservableShared | async)?.title}}</td></tr>
   <tr><td>Body:</td><td>{{(dataServiceObservableShared | async)?.body}}</td></tr>
 </table>

If you run this example and look at the browser console what you’ll see is that for the NON shared stream the stream reaches all the way back to the originator of the data, the http call, for EACH subscriber (the subscriber is the binding in angular2 via the async pipe).

side effect on not shared
{"userId":1,"id":3,"title":"ea molestias quasi exercitationem repellat qui ipsa sit aut","body":"et iusto sed quo iure\nvoluptatem occaecati omnis eligendi aut ad\nvoluptatem doloribus vel accusantium quis pariatur\nmolestiae porro eius odio et labore et velit aut","networkCalls":1}
side effect on not shared
{"userId":1,"id":3,"title":"ea molestias quasi exercitationem repellat qui ipsa sit aut","body":"et iusto sed quo iure\nvoluptatem occaecati omnis eligendi aut ad\nvoluptatem doloribus vel accusantium quis pariatur\nmolestiae porro eius odio et labore et velit aut","networkCalls":2}
side effect on not shared
{"userId":1,"id":3,"title":"ea molestias quasi exercitationem repellat qui ipsa sit aut","body":"et iusto sed quo iure\nvoluptatem occaecati omnis eligendi aut ad\nvoluptatem doloribus vel accusantium quis pariatur\nmolestiae porro eius odio et labore et velit aut","networkCalls":3}
side effect on not shared
{"userId":1,"id":3,"title":"ea molestias quasi exercitationem repellat qui ipsa sit aut","body":"et iusto sed quo iure\nvoluptatem occaecati omnis eligendi aut ad\nvoluptatem doloribus vel accusantium quis pariatur\nmolestiae porro eius odio et labore et velit aut","networkCalls":4}

You can also see that the ‘side effect’ processing is called 4 unique times. While this may be great in some cases, it is clearly NOT good when talking over high latency operations, such as http, or with code that is CPU intensive. What we’d prefer to do is share the stream across all subscribers and re-use the upstream processing – only consume the end result. This is what the .share() operator does.

side effect on shared
{"userId":1,"id":3,"title":"ea molestias quasi exercitationem repellat qui ipsa sit aut","body":"et iusto sed quo iure\nvoluptatem occaecati omnis eligendi aut ad\nvoluptatem doloribus vel accusantium quis pariatur\nmolestiae porro eius odio et labore et velit aut","networkCalls":1}

You can also notice the very real side-effect of the 2 different stream processing types by looking at the latency generated when binding the UI. Since there are 4 different http calls being made in the non-shared example and only a single call in the shared example the UI itself demonstrates how dramatic the binding time is. Check out the below animation

share operator on stream processing

You can see I put the post number ‘3’ into the non-shared stream and each area of the UI where I have an async pipe doing the binding is binding individually and slowly. This is because there is a network call taking place for EACH async pipe. Whereas the shared stream makes 1 call and the ENTIRE UI binds in 1 operation afterwards. You can also see the console logs indicate that my ‘side effect’ is taking place multiple times for the non-shared stream whereas for the shared stream you notice 1 ‘side-effect’ taking place.

Conclusion

The share() operator isn’t applicable in all cases, but for angular2 UI binding when utilizing the async pipe I believe it’ll be used on MOST cases where a single underlying stream is used. ESPECIALLY for angular services that make http calls.

I hope this helped you understand the share() operator in RxJS and also how it impacts performance for any consumer especially angular2 and the async pipe.

Thanks for reading!

Leave a Reply

Your email address will not be published. Required fields are marked *