我们如何在Java 9中实现Subscriber接口?

9和Java的 支持,以创建无流:通过引入几个接口发行认购人认购,并SubmissionPublisher 类,它实现了发布 接口。每个接口都可以扮演与响应 原理相对应的角色。

我们可以使用用户 接口来订阅正在被一公布的数据出版商。我们需要实现Subscriber 接口,并提供抽象方法的实现。

Flow.Subscriber接口方法:

  • onComplete():当Publisher对象完成其角色时,已调用此方法。

  • onError():当Publisher中发生问题并通知给订阅服务器时,已调用此方法。

  • onNext():每当发布者有要通知所有订阅者的新信息时,就会调用此方法。

  • onSubscribe():发布者添加订户时已调用此方法。

示例

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.stream.IntStream;

public class SubscriberImplTest {
   public static class Subscriber implements Flow.Subscriber<Integer> {
      private Flow.Subscription subscription;
      private boolean isDone;
      
      @Override      public void onSubscribe(Flow.Subscription subscription) {
         System.out.println("Subscribed");
         this.subscription = subscription;
         this.subscription.request(1);
      }
      @Override      public void onNext(Integer item) {
         System.out.println("Processing " + item);
         this.subscription.request(1);
      }
      @Override      public void onError(Throwable throwable) {
         throwable.printStackTrace();
      }
      @Override      public void onComplete() {
         System.out.println("Processing done");
         isDone = true;
      }
   }
   public static void main(String args[]) throws InterruptedException {
      SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
      Subscriber subscriber = new Subscriber();
      publisher.subscribe(subscriber);
      IntStream intData = IntStream.rangeClosed(1, 10);
      intData.forEach(publisher::submit);
      publisher.close();
      while(!subscriber.isDone) {
         Thread.sleep(10);
      }
      System.out.println("Done");
   }
}

输出结果

Subscribed
Processing 1
Processing 2
Processing 3
Processing 4
Processing 5
Processing 6
Processing 7
Processing 8
Processing 9
Processing 10
Processing done
Done