rx-java PublishSubject

示例

PublishSubject只向观察者发送那些在订阅时间之后由源Observable发出的对象。

一个简单的PublishSubject例子:

Observable<Long> clock = Observable.interval(500, TimeUnit.MILLISECONDS);
Subject<Long, Long> subjectLong = PublishSubject.create();

clock.subscribe(subjectLong);

System.out.println("sub1订阅...");
subjectLong.subscribe(l -> System.out.println("sub1 -> " + l));
Thread.sleep(3000);
System.out.println("sub2订阅...");
subjectLong.subscribe(l -> System.out.println("sub2 -> " + l));
Thread.sleep(5000);

输出:

sub1订阅...
sub1 -> 0
sub1 -> 1
sub2订阅...
sub1 -> 2
sub2 -> 2
sub1 -> 3
sub2 -> 3

在上面的示例中,aPublishSubject订阅了一个Observable类似于时钟的,并items(Long)每500毫秒发出一次。从输出中可以看出,PublishSubject从源(clock)到其订户(sub1和sub2)的价位传递。

APublishSubject可以在没有任何观察者的情况下立即开始发射物品,这冒着一个或多个物品丢失的风险,直到观察者可以晒黑为止。

createClock(); //为简洁起见,移动了3行。与上述示例相同

Thread.sleep(5000); // 在首次订阅之前引入延迟

sub1andsub2(); //为简洁起见,移动了6行。与上述示例相同

输出:

sub1订阅...
sub1 -> 10
sub1 -> 11
sub2订阅...
sub1 -> 12
sub2 -> 12
sub1 -> 13
sub2 -> 13

请注意,sub1发出的值从开始10。引入的5秒延迟导致物品丢失。这些不能复制。这实质上是让PublishSubject一个Hot Observable。

另外,请注意,如果观察者PublishSubject在发出n个项目后订阅了,则该n个项目无法为该观察者再现。

下面是大理石图 PublishSubject

在调用源PublishSubject之前的任何时间点,都会向所有已订阅的对象发射项目。onCompletedObservable

如果源Observable由于错误而终止,PublishSubject则不会将任何项目发送给后续的观察者,而只会传递来自源Observable的错误通知。

用例
假设您要创建一个应用程序,该应用程序将监视某个公司的股票价格并将其转发给所有要求它的客户。

/* Dummy stock prices */
Observable<Integer> prices = Observable.just(11, 12, 14, 11, 10, 12, 15, 11, 10);

/* Your server */
PublishSubject<Integer> watcher = PublishSubject.create();
/* subscribe to listen to stock price changes and push to observers/clients */
prices.subscribe(watcher);

/* Client application */
stockWatcher = getWatcherInstance(); // 得到主题
Subscription steve = stockWatcher.subscribe(i -> System.out.println("看着史蒂夫 " + i));
Thread.sleep(1000);
System.out.println("steve stops watching");
steve.unsubscribe();

在上述示例用例中,PublishSubject充当了将值从服务器传递给预订的watcher所有客户端的桥梁。