8

Are there drawbacks to using/mutating a variable from a custom operator closure in RxJS? I realize it violates the "pure" function principle and that you can use scan for this simple example, but I'm asking specifically for tangible technical issues with underlying pattern below:

const custom = () => {

  let state = 0; 

  return pipe(
    map(next => state * next),
    tap(_ => state += 1),
    share()
  )
}

// Usage
const obs = interval(1000).pipe(custom())

obs.subscribe()
4
  • In which case would you need it? In your case, it looks like just doing next * (next + 1) would work
    – Axnyff
    Commented Aug 25, 2018 at 19:12
  • I made the example simple just to demonstrate the pattern. A real use case would be when you have internal state that needs to be used across multiple steps in the operator chain.
    – JeffD23
    Commented Aug 25, 2018 at 19:21
  • Wouldn't it be possible with scan?
    – Axnyff
    Commented Aug 25, 2018 at 19:23
  • Not quite, I almost put that in the question because I knew it would come up. The "state" above is internal to the operator (never emitted to the subscriber). Scan would require the data to be emitted by the observable afaik.
    – JeffD23
    Commented Aug 25, 2018 at 19:33

2 Answers 2

15

There are at least two problems with the way you've stored state within your custom operator.

The first problem is that your doing so means the operator is no longer referentially transparent. That is, if the calling of the operator is replaced with the operator's return value, the behaviour is different:

const { pipe, range } = rxjs;
const { map, share, tap } = rxjs.operators;

const custom = () => {
  let state = 0; 
  return pipe(
    map(next => state * next),
    tap(_ => state += 1),
    share()
  );
};

const op = custom();
console.log("first use:");
range(1, 2).pipe(op).subscribe(n => console.log(n));
console.log("second use:");
range(1, 2).pipe(op).subscribe(n => console.log(n));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>

The second problem - as mentioned in the other answer - is that different subscriptions will receive different values in their next notifications, as the state within the operator is shared.

For example, if the source observable is synchronous, consecutive subscriptions will see different values:

const { pipe, range } = rxjs;
const { map, share, tap } = rxjs.operators;

const custom = () => {
  let state = 0; 
  return pipe(
    map(next => state * next),
    tap(_ => state += 1),
    share()
  );
};

const source = range(1, 2).pipe(custom());
console.log("first subscription:");
source.subscribe(n  => console.log(n));
console.log("second subscription:");
source.subscribe(n => console.log(n));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>

However, it is possible to write an operator very similar to your custom operator and have it behave correctly in all circumstances. To do so, it's necessary to ensure that any state within the operator is per-subscription.

A pipeable operator is just a function that takes an observable and returns an observable, so you could use defer to ensure that your state is per-subscription, like this:

const { defer, pipe, range } = rxjs;
const { map, share, tap } = rxjs.operators;

const custom = () => {
  return source => defer(() => {
    let state = 0; 
    return source.pipe(
      map(next => state * next),
      tap(_ => state += 1)
    );
  }).pipe(share());
};

const op = custom();
console.log("first use:");
range(1, 2).pipe(op).subscribe(n => console.log(n));
console.log("second use:");
range(1, 2).pipe(op).subscribe(n => console.log(n));

const source = range(1, 2).pipe(op);
console.log("first subscription:");
source.subscribe(n => console.log(n));
console.log("second subscription:");
source.subscribe(n => console.log(n));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>

2
  • Amazing. I had my fingers crossed you would answer :) I came to realize the issue with sync Observables after experimenting some more, but the defer explanation is super helpful. Thank you!
    – JeffD23
    Commented Aug 26, 2018 at 2:34
  • No worries. Your question was timely; it just so happens that I've almost finished writing a short article on this topic. I'll most likely publish it next week.
    – cartant
    Commented Aug 26, 2018 at 2:37
4

As you already stated, you lose some of the advantages of pure functions. In this particular case, you run the risk of late subscribers getting different streams of data than you may expect (depends on what you are doing in your real case vs. in this constructed one).

For instance, by adding late subscribers, stream 'A' would see 0 and 1. Stream 'B' would see only '1' (it skips 0 because obs is still active from the 'A' subscriber. Stream 'C' would behave like stream 'A'.

const { interval, pipe, subscribe } = Rx;
const { take, map, tap, share  } = RxOperators;

const custom = () => {
  let state = 0; 
  return pipe(
    map(next => state * next),
    tap(_ => state += 1),
    share()
  )
}

// Late subscribers can get different streams
const obs = interval(500).pipe(custom())
const sub1 = obs.pipe(take(2)).subscribe((x) => console.log('A', x))
setTimeout(() => obs.pipe(take(1)).subscribe((x) => console.log('B', x)), 500)
setTimeout(() => obs.pipe(take(3)).subscribe((x) => console.log('C', x)), 3000)

Whether this is acceptable or expected behavior will depend on your use case. While it is good to try and use pure functions for all of their advantages, sometimes it isn't practical or appropriate for your use case.

1
  • Thank you, this motivated me to experiment with more sync and async examples, which clarified some of the issues you pointed out :)
    – JeffD23
    Commented Aug 26, 2018 at 2:37

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Not the answer you're looking for? Browse other questions tagged or ask your own question.