rx-java 介绍

示例

背压是在Observable处理管道中时,某些异步阶段无法足够快地处理值,因此需要一种方法来告诉上游生产者放慢速度。

需要背压的经典情况是生产者是热源时:

PublishSubject<Integer> source = PublishSubject.create();

source
.observeOn(Schedulers.computation())
.subscribe(v -> compute(v), Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

Thread.sleep(10_000);

在此示例中,主线程将向在后台线程上对其进行处理的最终消费者生产100万件商品。该方法可能会compute(int)花费一些时间,但Observable运算符链的开销也可能会增加处理项目的时间。但是,带有for循环的生产线程不知道这一点,并且会继续执行onNext。

在内部,异步运算符具有缓冲区来保存此类元素,直到可以对其进行处理为止。在经典Rx.NET和早期的RxJava中,这些缓冲区是无界的,这意味着它们可能会容纳该示例中几乎所有的100万个元素。例如,当程序中有10亿个元素或同一100万个序列出现1000次时,问题就开始出现,OutOfMemoryError并通常由于过度的GC开销而导致速度变慢。

与错误处理成为一流公民并通过onErrorXXX运算符(通过运算符)来处理错误的方式类似,背压是程序员必须考虑和处理(通过onBackpressureXXX运算符)的数据流的另一个属性。

除PublishSubject上述之外,还有其他一些不支持背压的运算符,主要是由于功能原因。例如,运算符interval会定期发出值,将其反向加压会导致相对于挂钟的时间发生偏移。

在现代RxJava中,大多数异步运算符现在都具有一个有界的内部缓冲区,就像observeOn上面一样,任何尝试溢出该缓冲区的操作都将使用终止整个序列MissingBackpressureException。每个运算符的文档中都有关于其反压行为的描述。

但是,背压在常规冷序中会更微妙地出现(不会也不应屈服MissingBackpressureException)。如果第一个示例被重写:

Observable.range(1, 1_000_000)
.observeOn(Schedulers.computation())
.subscribe(v -> compute(v), Throwable::printStackTrace);

Thread.sleep(10_000);

没有错误,并且使用少量内存即可使所有操作顺利进行。这样做的原因是,许多源运算符可以按需“生成”值,因此运算符observeOn可以告诉range生成的observeOn缓冲区最多可以一次拥有这么多的值,而不会溢出。

该协商基于协同例程的计算机科学概念(我叫你,你叫我)。运算符通过调用其(inner 's)range以Producer接口实现的形式observeOn向其发送回调。作为回报,带有值的调用将告诉它允许产生(即,它)许多其他元素。然后,有责任在正确的时间以正确的值调用该方法,以保持数据畅通但不会溢出。SubscribersetProducerobserveOnProducer.request(n)rangeonNextobserveOnrequest

很少需要在最终用户中表达背压(因为它们相对于其直接上游是同步的,并且背压自然是由于调用堆栈阻塞而发生的),但可能更容易理解它的工作原理:

Observable.range(1, 1_000_000)
.subscribe(new Subscriber<Integer>() {
    @Override
    public void onStart() {
        request(1);
    }

    public void onNext(Integer v) {
        compute(v);

        request(1);
    }

    @Override
    public void onError(Throwable ex) {
        ex.printStackTrace();
    }

    @Override
    public void onCompleted() {
        System.out.println("Done!");
    }
});

在此,onStart实现指示range生成其第一个值,然后在中接收该值onNext。一旦compute(int)完成,将另一个值,然后从请求range。在的天真的实现中range,此类调用将递归调用onNext,StackOverflowError这当然是不可取的。

为避免这种情况,运营商使用所谓的蹦床逻辑来防止此类可重入的呼叫。用range的术语,它将记住在request(1)调用时有一个调用onNext(),一旦onNext()返回,它将进行onNext()下一回合并使用下一个整数值进行调用。因此,如果将两者交换,该示例仍然可以正常工作:

@Override
public void onNext(Integer v) {
    request(1);

    compute(v);
}

但是,这不适用于onStart。尽管Observable基础结构保证每次将最多Subscriber调用一次,但是对的调用request(1)可能会立即触发元素的发射。如果在request(1)需要调用之后具有初始化逻辑,则onNext可能会出现以下异常:

Observable.range(1, 1_000_000)
.subscribe(new Subscriber<Integer>() {

    String name;

    @Override
    public void onStart() {
        request(1);

        name = "RangeExample";
    }

    @Override
    public void onNext(Integer v) {
        compute(name.length + v);

        request(1);
    }

    // ...休息是一样的
});

在这种同步情况下,NullPointerException将在仍然执行时立即抛出a onStart。如果对的调用request(1)触发了onNext对其他线程的异步调用,并name在onNext竞赛中读取了将其写入onStart后的情况,则会发生更细微的错误request。

因此,应该在此onStart之前或之前进行所有字段初始化,然后调用request()last。request()在必要时,in运算符的实现可确保适当的事前发生关系(或换句话说,内存释放或完全隔离)。