调度程序是有关处理单元的RxJava抽象。调度程序可以由Executor服务支持,但是您可以实现自己的调度程序实现。
AScheduler应该满足此要求:
应该顺序处理未延迟的任务(FIFO顺序)
任务可以延迟
Scheduler可以在某些运算符(例如:)中将A用作参数delay,或与subscribeOn/observeOn方法一起使用。
对于某些运算符,Scheduler将使用来处理特定运算符的任务。例如,delay将安排一个延迟任务,该任务将发出下一个值。这Scheduler将保留并在以后执行。
该subscribeOn可每使用一次Observable。它将定义Scheduler订阅的代码将在其中执行。
每个observeOn可以多次使用Observable。这将限定其中Scheduler将用于执行所定义的所有的任务之后的observeOn方法。observeOn将帮助您执行线程跳跃。
在特定的调度程序上
// 该lambda将在Schedulers.io()中执行 Observable.fromCallable(() -> Thread.currentThread().getName()) .subscribeOn(Schedulers.io()) .subscribe(System.out::println);
使用特定的Scheduler进行observeOn
Observable.fromCallable(() -> "Thread -> " + Thread.currentThread().getName()) // 下一个任务将在io调度程序中执行 .observeOn(Schedulers.io()) .map(str -> str + " -> " + Thread.currentThread().getName()) // 下一个任务将在计算调度程序中执行 .observeOn(Schedulers.computation()) .map(str -> str + " -> " + Thread.currentThread().getName()) // 下一个任务将在io调度程序中执行 .observeOn(Schedulers.newThread()) .subscribe(str -> System.out.println(str + " -> " + Thread.currentThread().getName()));
用运算符指定特定的调度程序
一些运算符可以将Scheduleras作为参数。
Observable.just(1) // 延迟运算符的onNext方法将在新线程中执行 .delay(1, TimeUnit.SECONDS, Schedulers.newThread()) .subscribe(System.out::println);
发布给订阅者:
TestScheduler testScheduler = Schedulers.test(); EventBus sut = new DefaultEventBus(testScheduler); TestSubscriber<Event> subscriber = new TestSubscriber<Event>(); sut.get().subscribe(subscriber); sut.publish(event); testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
线程池:
this.poolName = schedulerFig.getIoSchedulerName(); final int poolSize = schedulerFig.getMaxIoThreads(); final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(poolSize); final MaxSizeThreadPool threadPool = new MaxSizeThreadPool( queue, poolSize ); this.scheduler = Schedulers.from(threadPool);
Web套接字可观察:
final Subscription subscribe = socket.webSocketObservable() .subscribeOn(Schedulers.io()) .doOnNext(new Action1<RxEvent>() { @Override public void call(RxEvent rxEvent) { System.out.println("Event: " + rxEvent); } }) .subscribe();