Two simple ways to manage a busy RxJS stream

RxJS provides some expressive operators for delaying or deferring the output of streams -- particularly useful when dealing with chatty streams that may trigger some costly side-effect, such as rendering. Below are two patterns for throttling streams and examples for when each might be suitable.

Reduce and sample

This is one of my personal favourites, a very close analogue for Flux-style reducers. The pattern involves consuming messages and incrementally reducing them into a single atom of state (via scan) and then emitting the state at a fixed interval.

const stream$ = Observable.of(  
  { key: "foo", value: 1 },
  { key: "bar", value: 2 },
  { key: "baz", value: 3 }
);

const initialState = {};

const state$ = stream$.scan(  
  (state, message) =>
    Object.assign(state, { [message.key]: message.value }),
  initialState
);

state$.sampleTime(1000).subscribe(  
  // Logs `{ foo: 1, bar: 2, baz: 3 }`
  (state) => console.log(state)
);

This pattern is potentially lossy, as we discard the source messages in favour of reduced state, but this is often preferable in a Flux-style application where messages are transient.

An example of how this pattern can be easily combined with React:

function render(state) {  
  ReactDOM.render(
    <MyApp state={state} />,
    document.querySelector("#root")
  );
}

state$  
  .sampleTime(1000)
  .subscribe(render);

In this example, we are able to decouple the rate at which we receive messages from stream$ and the rate at which we attempt to render. This is especially valuable in complex applications, where the cost of invoking React's reconciler is high.


Buffer and batch

As an alternative to reducing and sampling, we could simply buffer up messages and emit groups on an interval. This is a lossless approach, as all source messages will be output. For example:

const stream$ = Observable.of(  
  { key: "foo", value: 1 },
  { key: "bar", value: 2 },
  { key: "baz", value: 3 }
);

stream$.bufferWithTime(1000).subscribe(  
  // Logs `[
  //   { key: "foo", value: 1 },
  //   { key: "bar", value 2 },
  //   { key: "baz", value: 3 },
  // ]`
  (messages) => console.log(messages)
);

This could be appropriate where you need to integrate with an existing Flux implementation, or the order and full content of the messages is important elsewhere in your application. In combination with Redux, we could write:

const store = createStore(...);

stream$  
  .bufferWithTime(1000)
  .subscribe((messages) =>
    store.dispatch({ type: "RECEIVE_MESSAGES", messages })
  );

This would enable us to handle the action in multiple reducers, giving more fine-grained control over the way that data is processed in our application.


We use both of these patterns to good effect in our products at Football Radar, but which of these you should choose will depend on the specific requirements of your application. We'd love to hear about how you're solving problems like these.