背压是在Observable处理管道中时,某些异步阶段无法足够快地处理值,因此需要一种方法来告诉上游生产者放慢速度。
需要背压的经典情况是生产者是热源时:
PublishSubject<Integer> source = PublishSubject.create(); source .observeOn(Schedulers.computation()) .subscribe(v -> compute(v), Throwable::printStackTrace); for (int i = 0; i < 1_000_000; i++) { source.onNext(i); } Thread.sleep(10_000);
在此示例中,主线程将向在后台线程上对其进行处理的最终消费者生产100万件商品。该方法可能会compute(int)花费一些时间,但Observable运算符链的开销也可能会增加处理项目的时间。但是,带有for循环的生产线程不知道这一点,并且会继续执行onNext。
在内部,异步运算符具有缓冲区来保存此类元素,直到可以对其进行处理为止。在经典Rx.NET和早期的RxJava中,这些缓冲区是无界的,这意味着它们可能会容纳该示例中几乎所有的100万个元素。例如,当程序中有10亿个元素或同一100万个序列出现1000次时,问题就开始出现,OutOfMemoryError并通常由于过度的GC开销而导致速度变慢。
与错误处理成为一流公民并通过onErrorXXX运算符(通过运算符)来处理错误的方式类似,背压是程序员必须考虑和处理(通过onBackpressureXXX运算符)的数据流的另一个属性。
除PublishSubject上述之外,还有其他一些不支持背压的运算符,主要是由于功能原因。例如,运算符interval会定期发出值,将其反向加压会导致相对于挂钟的时间发生偏移。
在现代RxJava中,大多数异步运算符现在都具有一个有界的内部缓冲区,就像observeOn上面一样,任何尝试溢出该缓冲区的操作都将使用终止整个序列MissingBackpressureException。每个运算符的文档中都有关于其反压行为的描述。
但是,背压在常规冷序中会更微妙地出现(不会也不应屈服MissingBackpressureException)。如果第一个示例被重写:
Observable.range(1, 1_000_000) .observeOn(Schedulers.computation()) .subscribe(v -> compute(v), Throwable::printStackTrace); Thread.sleep(10_000);
没有错误,并且使用少量内存即可使所有操作顺利进行。这样做的原因是,许多源运算符可以按需“生成”值,因此运算符observeOn可以告诉range生成的observeOn缓冲区最多可以一次拥有这么多的值,而不会溢出。
该协商基于协同例程的计算机科学概念(我叫你,你叫我)。运算符通过调用其(inner 's)range以Producer接口实现的形式observeOn向其发送回调。作为回报,带有值的调用将告诉它允许产生(即,它)许多其他元素。然后,有责任在正确的时间以正确的值调用该方法,以保持数据畅通但不会溢出。SubscribersetProducerobserveOnProducer.request(n)rangeonNextobserveOnrequest
很少需要在最终用户中表达背压(因为它们相对于其直接上游是同步的,并且背压自然是由于调用堆栈阻塞而发生的),但可能更容易理解它的工作原理:
Observable.range(1, 1_000_000) .subscribe(new Subscriber<Integer>() { @Override public void onStart() { request(1); } public void onNext(Integer v) { compute(v); request(1); } @Override public void onError(Throwable ex) { ex.printStackTrace(); } @Override public void onCompleted() { System.out.println("Done!"); } });
在此,onStart实现指示range生成其第一个值,然后在中接收该值onNext。一旦compute(int)完成,将另一个值,然后从请求range。在的天真的实现中range,此类调用将递归调用onNext,StackOverflowError这当然是不可取的。
为避免这种情况,运营商使用所谓的蹦床逻辑来防止此类可重入的呼叫。用range的术语,它将记住在request(1)调用时有一个调用onNext(),一旦onNext()返回,它将进行onNext()下一回合并使用下一个整数值进行调用。因此,如果将两者交换,该示例仍然可以正常工作:
@Override public void onNext(Integer v) { request(1); compute(v); }
但是,这不适用于onStart。尽管Observable基础结构保证每次将最多Subscriber调用一次,但是对的调用request(1)可能会立即触发元素的发射。如果在request(1)需要调用之后具有初始化逻辑,则onNext可能会出现以下异常:
Observable.range(1, 1_000_000) .subscribe(new Subscriber<Integer>() { String name; @Override public void onStart() { request(1); name = "RangeExample"; } @Override public void onNext(Integer v) { compute(name.length + v); request(1); } // ...休息是一样的 });
在这种同步情况下,NullPointerException将在仍然执行时立即抛出a onStart。如果对的调用request(1)触发了onNext对其他线程的异步调用,并name在onNext竞赛中读取了将其写入onStart后的情况,则会发生更细微的错误request。
因此,应该在此onStart之前或之前进行所有字段初始化,然后调用request()last。request()在必要时,in运算符的实现可确保适当的事前发生关系(或换句话说,内存释放或完全隔离)。