如何在Java 9中使用Flow API实现反应式流?

自Java 9以来,Flow API是对反应流规范的官方支持。它是Iterator Observer 模式的组合。的流API是一个互操作规范,而不是最终用户API等RxJava

Flow API包含四个基本接口:

  • 订阅服务器订阅服务器订阅发布服务器以进行回调。

  • 发布者发布者将数据项流发布给注册的订阅者。

  • 订阅发布者和订阅者之间的链接。

  • 处理器处理器位于发布者和订阅者之间,并将一个流转换为另一流。

在下面的示例中,我们创建了一个基本订阅服务器,该订阅服务器请求一个数据对象,将其打印并再请求一个。我们可以使用Java(SubmissionPublisher)提供的发布者实现来完成我们的会话。

示例

import java.util.concurrent.Flow;
import java.util.List;
import java.util.concurrent.SubmissionPublisher;

class MySubscriber<T>implements Flow.Subscriber<T> {
   private Flow.Subscription subscription;
   @Override   public void onSubscribe(Flow.Subscription subscription) {
      this.subscription = subscription;
      this.subscription.request(1);
   }
   @Override   public void onNext(T item) {
      System.out.println(item);
      subscription.request(1);
   }
   @Override   public void onError(Throwable throwable) {
      throwable.printStackTrace();
   }
   @Override   public void onComplete() {
      System.out.println("Done");
   }
}// main classpublic class FlowTest {
   public static void main(String args[]) {
      List<String> items = List.of("1", "2", "3", "4", "5", "6", "7", "8", "9", "10");
      SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
      publisher.subscribe(new MySubscriber<>());
      items.forEach(s -> {
         try {
            Thread.sleep(1000);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
         publisher.submit(s);
      });
      publisher.close();
   }
}

输出结果

1
2
3
4
5
6
7
8
910
Done