9和Java的 支持,以创建无流:通过引入几个接口发行,认购人,认购,并SubmissionPublisher 类,它实现了发布 接口。每个接口都可以扮演与响应 流原理相对应的角色。
我们可以使用用户 接口来订阅正在被一公布的数据出版商。我们需要实现Subscriber 接口,并提供抽象方法的实现。
onComplete():当Publisher对象完成其角色时,已调用此方法。
onError():当Publisher中发生问题并通知给订阅服务器时,已调用此方法。
onNext():每当发布者有要通知所有订阅者的新信息时,就会调用此方法。
onSubscribe():发布者添加订户时已调用此方法。
import java.util.concurrent.Flow; import java.util.concurrent.SubmissionPublisher; import java.util.stream.IntStream; public class SubscriberImplTest { public static class Subscriber implements Flow.Subscriber<Integer> { private Flow.Subscription subscription; private boolean isDone; @Override public void onSubscribe(Flow.Subscription subscription) { System.out.println("Subscribed"); this.subscription = subscription; this.subscription.request(1); } @Override public void onNext(Integer item) { System.out.println("Processing " + item); this.subscription.request(1); } @Override public void onError(Throwable throwable) { throwable.printStackTrace(); } @Override public void onComplete() { System.out.println("Processing done"); isDone = true; } } public static void main(String args[]) throws InterruptedException { SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>(); Subscriber subscriber = new Subscriber(); publisher.subscribe(subscriber); IntStream intData = IntStream.rangeClosed(1, 10); intData.forEach(publisher::submit); publisher.close(); while(!subscriber.isDone) { Thread.sleep(10); } System.out.println("Done"); } }
输出结果
Subscribed Processing 1 Processing 2 Processing 3 Processing 4 Processing 5 Processing 6 Processing 7 Processing 8 Processing 9 Processing 10 Processing done Done