defer示例和详细解析
defer
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
|
private ObservableSource<? extends Integer> getSource() { return Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { System.out.println("发射----->" + Thread.currentThread().getName() + "--:" + 1); emitter.onNext(1); emitter.onComplete(); } }); }
private Observer<? super Integer> getObserver() { return new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("onSubscribe"); }
@Override public void onNext(Integer integer) { System.out.println("接收----->" + integer); }
@Override public void onError(Throwable e) {
}
@Override public void onComplete() { System.out.println("onComplete"); } }; } public void testDefer() { Observable<Integer> source = Observable .defer(new Callable<ObservableSource<? extends Integer>>() { @Override public ObservableSource<? extends Integer> call() throws Exception { return getSource(); } }); source .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(getObserver());
source .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(getObserver()); }
|
1 2 3 4 5 6 7 8
| System.out: onSubscribe //先订阅 System.out: onSubscribe System.out: 发射 System.out: 发射 System.out: 接收 System.out: onComplete System.out: 接收 System.out: onComplete
|
分析
根据日志可以看出,直到有观察者订阅时才创建 Observable ,并且为每个观察者创建一个新的 Observable 。
Defer操作符会一直等待直到有观察者订阅它,然后它使用Observable工厂方法生成一个Observable。它对每个观察者都这样做,因此尽管每个订阅者都以为自己订阅的是同一个Observable,事实上每个订阅者获取的是它们自己的单独的数据序列。
我们点进去看下 defer , 其实也返回了一个 Observable 子类的对象 ObservableDefer
那么接下来我们之间看 subscribeActual 方法的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13
| public void subscribeActual(Observer<? super T> observer) { ObservableSource<? extends T> pub; try { pub = ObjectHelper.requireNonNull(supplier.call(), "null ObservableSource supplied"); } catch (Throwable t) { Exceptions.throwIfFatal(t); EmptyDisposable.error(t, observer); return; }
pub.subscribe(observer); }
|
我们每次执行 source.subscribe(getObserver()) ;就会调用一次 subscribeActual方法,也就会通过 upplier.call() 返回我们新创建的被观察者,也就验证了 “直到有观察者订阅时才创建 Observable ,并且为每个观察者创建一个新的 Observable”。
我们再看下 pub.subscribe(observer); 这一行代码,pub 就是我们 getSourse() 时通过 create 创建的 Observable,这里的 pub.subscribe() 其实是执行了 ObservableCreate 的 subscribeActual 方法,接下来的流程就和 create 的订阅过程 一样了。