rx-java 创建背压数据源

例子

通常在处理背压时,创建背压数据源是相对容易的任务,因为该库已经Observable为开发人员提供了处理背压的静态方法。我们可以区分两种工厂方法:基于下游需求返回和生成元素的冷“生成器”和通常桥接非反应性和/或非背压数据源并在其之上分层一些背压处理的热“推送器”他们。

只是

最基本的背压感知源是通过just以下方式创建的:

Observable.just(1).subscribe(new Subscriber<Integer>() {
    @Override
    public void onStart() {
        request(0);
    }

    @Override
    public void onNext(Integer v) {
        System.out.println(v);
    }
   
    // 为简洁起见省略其余部分
}

由于我们明确不请求 in onStart,因此不会打印任何内容。just当有一个常量值我们想快速启动一个序列时,这很好。

不幸的是,just经常被误认为是一种动态计算要被Subscribers消耗的东西的方法:

int counter;

int computeValue() {
   return ++counter;
}

Observable<Integer> o = Observable.just(computeValue());

o.subscribe(System.out:println);
o.subscribe(System.out:println);

令某些人感到惊讶的是,这会打印 1 两次,而不是分别打印 1 和 2。如果调用被重写,它为什么会这样工作就很明显了:

int temp = computeValue();

Observable<Integer> o = Observable.just(temp);

在computeValue被称为主例程中的一部分,而不是响应用户订阅。

从可调用

人们真正需要的是方法fromCallable:

Observable<Integer> o = Observable.fromCallable(() -> computeValue());

此处computeValue仅在订阅者订阅时执行,并且为每个订阅者打印预期的 1 和 2。当然,fromCallable也正确支持背压,除非请求,否则不会发出计算值。但是请注意,计算确实会发生。如果计算本身应推迟到下游的实际要求,我们可以使用just同map:

Observable.just("This doesn't matter").map(ignored -> computeValue())...

just不会发出它的常量值,直到被请求时它被映射到 的结果computeValue,仍然单独为每个订阅者调用。

如果数据已经作为对象数组、对象列表或任何Iterable来源可用,则相应的from重载将处理此类来源的背压和发射:

 Observable.from(Arrays.asList(1, 2, 3, 4, 5)).subscribe(System.out::println);

为方便起见(并避免有关创建通用数组的警告),有 2 到 10 个参数重载到just内部委托给from.

这from(Iterable)也提供了一个有趣的机会。许多价值生成可以用状态机的形式表达。每个请求的元素都会触发状态转换和返回值的计算。

编写像Iterables这样的状态机有点复杂(但仍然比编写Observable使用它的an 容易)并且与 C# 不同,Java 没有编译器的任何支持来通过简单地编写经典外观的代码(使用yield return和yield break)来构建这样的状态机。一些库提供了一些帮助,例如 Google GuavaAbstractIterable和 IxJava和. 这些本身就值得一个完整的系列,所以让我们看看一些非常基本的来源,无限地重复一些恒定值:Ix.generate()Ix.forloop()Iterable

Iterable<Integer> iterable = () -> new Iterator<Integer>() {
    @Override
    public boolean hasNext() {
        return true;
    }

    @Override
    public Integer next() {
        return 1;
    }
};

Observable.from(iterable).take(5).subscribe(System.out::println);

如果我们使用iterator经典的 for 循环,那将导致无限循环。因为我们Observable用它构建了一个,我们可以表达我们的意愿,只消耗它的前 5 个,然后停止请求任何东西。这就是在Observables内部进行懒惰评估和计算的真正威力。

create(SyncOnSubscribe)

有时候,要转换成reactive world的数据源本身就是同步(阻塞)和pull-like的,也就是我们必须调用someget或read方法来获取下一条数据。当然,可以将其转换为 ,Iterable但是当这些源与资源相关联时,如果下游在序列结束之前取消订阅,我们可能会泄漏这些资源。

为了处理这种情况,RxJava 有这个SyncOnSubscribe类。可以扩展它并实现其方法或使用其基于 lambda 的工厂方法之一来构建实例。

SyncOnSubscribe<Integer, InputStream> binaryReader = SyncOnSubscribe.createStateful(
     () -> new FileInputStream("data.bin"),
     (inputstream, output) -> {
         try {
             int byte = inputstream.read();
             if (byte < 0) {
                 output.onCompleted();
             } else {
                 output.onNext(byte);
             }
         } catch (IOException ex) {
             output.onError(ex);
         }
         return inputstream;
     },
     inputstream -> {
         try {
             inputstream.close();
         } catch (IOException ex) {
             RxJavaHooks.onError(ex);
         }
     } 
 );

 Observable<Integer> o = Observable.create(binaryReader);

一般SyncOnSubscribe使用3个回调。

第一个回调允许创建每个订阅者的状态,例如FileInputStream在示例中;该文件将对每个订阅者独立打开。

第二个回调接受这个状态对象并提供一个输出Observer,onXXX可以调用它的方法来发出值。此回调执行的次数与下游请求的次数相同。在每次调用时,它必须onNext最多调用一次,可选地后跟onError或onCompleted。在示例中,我们调用onCompleted()读取字节是否为负,指示文件结束,并onError在读取抛出IOException.

当下游取消订阅(关闭输入流)或前一个回调调用终端方法时,将调用最终回调;它允许释放资源。由于并非所有源都需要所有这些特性,SyncOnSubscribe让我们创建没有它们的实例的静态方法。

不幸的是,JVM 和其他库中的许多方法调用都会抛出受检异常,需要将其包装到try-catches 中,因为此类使用的功能接口不允许抛出受检异常。

当然,我们可以模仿其他典型的来源,例如无限范围:

SyncOnSubscribe.createStateful(
     () -> 0,
     (current, output) -> {
         output.onNext(current);
         return current + 1;
     },
     e -> { }
);

在此设置中,current开始于0并且下次调用 lambda 时,参数current现在保持1。

有一个SyncOnSubscribe被调用的变体AsyncOnSubscribe看起来非常相似,除了中间回调也采用表示来自下游的请求量的 long 值,并且回调应该生成Observable具有完全相同长度的一个。该源然后将所有这些连接Observable成一个序列。

 AsyncOnSubscribe.createStateful(
     () -> 0,
     (state, requested, output) -> {
         output.onNext(Observable.range(state, (int)requested));
         return state + 1;
     },
     e -> { }
 );

有一个关于这个类的有用性的持续(激烈的)讨论,通常不推荐,因为它经常打破关于它将如何实际发出这些生成的值以及它将如何响应,甚至它将接收什么样的请求值的预期更复杂的消费场景。

create(emitter)

有时,要包装到 an 中的源Observable已经是热的(例如鼠标移动)或冷的,但在其 API 中不可回压(例如异步网络回调)。

为了处理这种情况,最近版本的 RxJava 引入了create(emitter)工厂方法。它需要两个参数:

  • 将使用Emitter<T>每个传入订阅者的接口实例调用的回调,

  • 一个Emitter.BackpressureMode枚举,它要求开发人员指定要应用的背压行为。它具有通常的模式,类似于onBackpressureXXX除了发出信号之外,MissingBackpressureException或者干脆完全忽略它内部的这种溢出。

请注意,它目前不支持这些背压模式的附加参数。如果需要这些定制,使用NONE作为背压模式并onBackpressureXXX在结果上应用相关Observable是要走的路。

当想要与基于推送的源(例如 GUI 事件)进行交互时,使用它的第一个典型案例。这些 API 具有可以利用的某种形式的addListener/removeListener调用:

Observable.create(emitter -> {
    ActionListener al = e -> {
        emitter.onNext(e);
    };

    button.addActionListener(al);

    emitter.setCancellation(() -> 
        button.removeListener(al));

}, BackpressureMode.BUFFER);

使用起来Emitter相对简单;可以调用onNext,onError并onCompleted在其上,运营商自行处理背压和退订管理。此外,如果封装的 API 支持取消(例如示例中的侦听器移除),则可以使用setCancellation(或setSubscriptionfor Subscription-like 资源) 注册取消回调,该回调在下游取消订阅或调用onError/ 时onCompleted调用提供的Emitter实例。

这些方法一次只允许一个资源与发射器相关联,设置新资源会自动取消订阅旧资源。如果必须处理多个资源,请创建一个CompositeSubscription,将其与发射器相关联,然后向其CompositeSubscription自身添加更多资源:

Observable.create(emitter -> {
    CompositeSubscription cs = new CompositeSubscription();

    Worker worker = Schedulers.computation().createWorker();

    ActionListener al = e -> {
        emitter.onNext(e);
    };

    button.addActionListener(al);

    cs.add(worker);
    cs.add(Subscriptions.create(() -> 
        button.removeActionListener(al));

    emitter.setSubscription(cs);

}, BackpressureMode.BUFFER);

第二种情况通常涉及一些异步的、基于回调的 API,这些 API 必须转换为Observable.

Observable.create(emitter -> {
    
    someAPI.remoteCall(new Callback<Data>() {
        @Override
        public void onSuccess(Data data) {
            emitter.onNext(data);
            emitter.onCompleted();
        }

        @Override
        public void onFailure(Exception error) {
            emitter.onError(error);
        }
    });

}, BackpressureMode.LATEST);

在这种情况下,委托的工作方式相同。不幸的是,通常,这些经典的回调式 API 不支持取消,但如果支持,您可以像在前面的示例中一样设置取消(尽管可能采用更复杂的方式)。注意LATEST背压模式的使用;如果我们知道只有一个值,我们就不需要该BUFFER策略,因为它分配了一个永远不会被充分利用的默认 128 元素长缓冲区(根据需要增长)。