rx-java 基本范例

示例

调度程序是有关处理单元的RxJava抽象。调度程序可以由Executor服务支持,但是您可以实现自己的调度程序实现。

AScheduler应该满足此要求:

  • 应该顺序处理未延迟的任务(FIFO顺序)

  • 任务可以延迟

Scheduler可以在某些运算符(例如:)中将A用作参数delay,或与subscribeOn/observeOn方法一起使用。

对于某些运算符,Scheduler将使用来处理特定运算符的任务。例如,delay将安排一个延迟任务,该任务将发出下一个值。这Scheduler将保留并在以后执行。

该subscribeOn可每使用一次Observable。它将定义Scheduler订阅的代码将在其中执行。

每个observeOn可以多次使用Observable。这将限定其中Scheduler将用于执行所定义的所有的任务之后的observeOn方法。observeOn将帮助您执行线程跳跃。

在特定的调度程序上

// 该lambda将在Schedulers.io()中执行
Observable.fromCallable(() -> Thread.currentThread().getName())
          .subscribeOn(Schedulers.io())
          .subscribe(System.out::println);

使用特定的Scheduler进行observeOn

Observable.fromCallable(() -> "Thread -> " + Thread.currentThread().getName())
         // 下一个任务将在io调度程序中执行
         .observeOn(Schedulers.io())
         .map(str -> str + " -> " + Thread.currentThread().getName())
          // 下一个任务将在计算调度程序中执行
         .observeOn(Schedulers.computation())
         .map(str -> str + " -> " + Thread.currentThread().getName())
         // 下一个任务将在io调度程序中执行
         .observeOn(Schedulers.newThread())
         .subscribe(str -> System.out.println(str + " -> " + Thread.currentThread().getName()));

用运算符指定特定的调度程序

一些运算符可以将Scheduleras作为参数。

Observable.just(1)
          // 延迟运算符的onNext方法将在新线程中执行
          .delay(1, TimeUnit.SECONDS, Schedulers.newThread())
          .subscribe(System.out::println);

发布给订阅者:

TestScheduler testScheduler = Schedulers.test();
EventBus sut = new DefaultEventBus(testScheduler);
TestSubscriber<Event> subscriber = new TestSubscriber<Event>();
sut.get().subscribe(subscriber);
sut.publish(event);
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);

线程池:

this.poolName = schedulerFig.getIoSchedulerName();
final int poolSize = schedulerFig.getMaxIoThreads();
final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(poolSize);
final MaxSizeThreadPool threadPool = new MaxSizeThreadPool( queue, poolSize );
this.scheduler = Schedulers.from(threadPool);

Web套接字可观察:

final Subscription subscribe = socket.webSocketObservable()
        .subscribeOn(Schedulers.io())
        .doOnNext(new Action1<RxEvent>() {
            @Override
            public void call(RxEvent rxEvent) {
                System.out.println("Event: " + rxEvent);
            }
        })
        .subscribe();