rx-java onBackpressureXXX运算子

示例

大多数开发人员在应用程序失败时会遇到背压,MissingBackpressureException并且异常通常指向observeOn运算符。实际原因通常是对的非背压使用PublishSubject,timer()或者interval()是通过创建的自定义运算符create()。

有几种处理此类情况的方法。

增加缓冲区大小

有时,此类溢出是由于突发来源而发生的。突然,用户点击屏幕的速度过快observeOn,Android的默认16元素内部缓冲区溢出。

现在,RxJava最新版本中的大多数对背压敏感的运算符都允许程序员指定其内部缓冲区的大小。相关参数通常被称为bufferSize,prefetch或capacityHint。给定介绍中的溢出示例,我们可以增加的缓冲区大小observeOn以为所有值留出足够的空间。

PublishSubject<Integer> source = PublishSubject.create();

source.observeOn(Schedulers.computation(), 1024 * 1024)
      .subscribe(e -> { }, Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

但是请注意,通常这可能只是临时解决方案,因为如果源过度生成预测的缓冲区大小,则仍然会发生溢出。在这种情况下,可以使用以下运算符之一。

使用标准运算符批量/跳过值

如果可以分批更有效地处理源数据,则可以MissingBackpressureException使用标准批处理运算符之一(按大小和/或时间)来减少源数据的可能性。

PublishSubject<Integer> source = PublishSubject.create();

source
      .buffer(1024)
      .observeOn(Schedulers.computation(), 1024)
      .subscribe(list -> { 
          list.parallelStream().map(e -> e * e).first();
      }, Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

如果某些值可以忽略,一个可以使用的采样(以时间或其他可观察)和节流运营商(throttleFirst,throttleLast,throttleWithTimeout)。

PublishSubject<Integer> source = PublishSubject.create();

source
      .sample(1, TimeUnit.MILLISECONDS)
      .observeOn(Schedulers.computation(), 1024)
      .subscribe(v -> compute(v), Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

请注意,这些运算符仅会降低下游的价值接收率,因此它们可能仍会导致MissingBackpressureException。

onBackpressureBuffer()

这个无参数形式的运算符在上游源和下游运算符之间重新引入了一个无界缓冲区。不受限制意味着只要JVM不会耗尽内存,它就可以处理几乎所有来自突发性源的数据。

 Observable.range(1, 1_000_000)
           .onBackpressureBuffer()
           .observeOn(Schedulers.computation(), 8)
           .subscribe(e -> { }, Throwable::printStackTrace);

在这个例子中,observeOn去与一个非常低的缓冲区大小尚没有MissingBackpressureException因为onBackpressureBuffer吸收了全国各地的小它批100万个价值观和手observeOn。

但是请注意,onBackpressureBuffer它以无限制的方式消耗其源,即不对其施加任何背压。其结果是,甚至range将完全实现诸如背压支撑源。

还有4个额外的重载 onBackpressureBuffer

onBackpressureBuffer(int capacity)

这是一个有界版本,BufferOverflowError在其缓冲区达到给定容量的情况下发出信号。

Observable.range(1, 1_000_000)
          .onBackpressureBuffer(16)
          .observeOn(Schedulers.computation())
          .subscribe(e -> { }, Throwable::printStackTrace);

随着越来越多的运算符现在允许设置其缓冲区大小,该运算符的相关性正在降低。对于其余的部分,这提供了一个机会,即通过使用onBackpressureBuffer大于默认值的数字来“扩展其内部缓冲区” 。

onBackpressureBuffer(int容量,Action0 onOverflow)

如果发生溢出,此重载将调用(共享)操作。它的用处非常有限,因为除了当前调用堆栈以外,没有提供有关溢出的其他信息。

onBackpressureBuffer(int容量,Action0 onOverflow,BackpressureOverflow.Strategy策略)

这种过载实际上更有用,因为它让我们定义了在达到容量后该怎么做。该BackpressureOverflow.Strategy实际上是一个接口,但类BackpressureOverflow报价4个代表典型的行动是实现静态字段:

  • ON_OVERFLOW_ERROR:这是前两个重载的默认行为,表示 BufferOverflowException

  • ON_OVERFLOW_DEFAULT:目前与 ON_OVERFLOW_ERROR

  • ON_OVERFLOW_DROP_LATEST :如果发生溢出,则将仅忽略当前值,并且在下游请求后仅传递旧值。

  • ON_OVERFLOW_DROP_OLDEST :将最早的元素拖放到缓冲区中,然后将当前值添加到缓冲区中。

Observable.range(1, 1_000_000)
          .onBackpressureBuffer(16, () -> { },
              BufferOverflowStrategy.ON_OVERFLOW_DROP_OLDEST)
          .observeOn(Schedulers.computation())
          .subscribe(e -> { }, Throwable::printStackTrace);

请注意,最后两个策略在丢失元素时会导致流中的不连续。此外,它们不会发出信号BufferOverflowException。

onBackpressureDrop()

每当下游未准备好接收值时,此运算符将从序列中删除该elemenet。可以将其视为onBackpressureBuffer具有策略的0容量ON_OVERFLOW_DROP_LATEST。

当可以安全地忽略来源中的值(例如鼠标移动或当前GPS位置信号)时,此运算符非常有用,因为以后还会有更多最新值。

 component.mouseMoves()
 .onBackpressureDrop()
 .observeOn(Schedulers.computation(), 1)
 .subscribe(event -> compute(event.x, event.y));

与源运算符结合使用可能会很有用interval()。例如,如果要执行一些定期的后台任务,但每次迭代的持续时间可能长于周期,则可以放掉多余的间隔通知,因为稍后会有更多通知:

 Observable.interval(1, TimeUnit.MINUTES)
 .onBackpressureDrop()
 .observeOn(Schedulers.io())
 .doOnNext(e -> networkCall.doStuff())
 .subscribe(v -> { }, Throwable::printStackTrace);

此运算符存在一个重载:onBackpressureDrop(Action1<? super T> onDrop)  在调用(共享)操作的情况下,将删除值。此变体允许清除值本身(例如,释放关联的资源)。

onBackpressureLatest()

最终运算符仅保留最新值,并实际上覆盖较早的未交付值。可以将其视为onBackpressureBuffer容量为1且策略为的的变体ON_OVERFLOW_DROP_OLDEST。

onBackpressureDrop与之不同的是,如果下游碰巧落后,总会有可供消费的价值。在某些类似遥测的情况下,这可能很有用,在这种情况下,数据可能会以某种突发模式出现,但只有最新的数据才有意义。

例如,如果用户在屏幕上单击很多,我们仍然希望对其最新输入做出反应。

component.mouseClicks()
.onBackpressureLatest()
.observeOn(Schedulers.computation())
.subscribe(event -> compute(event.x, event.y), Throwable::printStackTrace);

采用onBackpressureDrop在这种情况下会导致在最后点击被丢弃和离开用户不知道为什么没有执行业务逻辑的情况。