buffer示例和源码解析
Buffer 收集Observable的数据放进一个数据包裹,然后发射这些数据包裹,而不是一次发射一个值。
Buffer 操作符将一个 Observable 变换为另一个,原来的 Observable 正常发射数据,变换产生的 Observable 发射这些数据的缓存集合。
注意:如果原来的Observable发射了一个onError通知,Buffer会立即传递这个通知,而不是首先发射缓存的数据,即使在这之前缓存中包含了原始Observable发射的数据。
buffer(count, skip) 示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 List<Integer > integers = new ArrayList<>(); for (int i = 0 ; i < 20 ; i++) { integers.add (i); } Observable<Integer > observable = Observable.fromIterable(integers); observable .buffer(5 ,4 ) .subscribe(new Consumer<List<Integer >>() { @Override public void accept(List<Integer > integers) throws Exception { System .out .println("接收----->" + integers); } });
1 2 3 4 5 System.out : 接收----->[0 , 1 , 2 , 3 , 4 ] System.out : 接收----->[4 , 5 , 6 , 7 , 8 ] System.out : 接收----->[8 , 9 , 10 , 11 , 12 ] System.out : 接收----->[12 , 13 , 14 , 15 , 16 ] System.out : 接收----->[16 , 17 , 18 , 19 ]
每个缓存里面包含 count 个元素,我们这里把数据列表 假设为List,索引为index。那么下一个缓存是以 index % skip == 0 开始,取 count 个元素。 我们从上面的打印日志中就可以看出,每个缓存的 开始元素索引和skip取余都为0。我们也可以通俗的理解为 先取 count 个元素,然后每隔 skip 个再取 count 个。
我们把skip为6看下输出结果:
1 2 3 4 5 6 7 8 ystem.out : 接收----->[0 , 1 , 2 , 3 , 4 ] System.out : 接收----->[6 , 7 , 8 , 9 , 10 ] System.out : 接收----->[12 , 13 , 14 , 15 , 16 ] System.out : 接收----->[18 , 19 ]
分析 我们看下源码:
先从 buffer(count) 点进去,他调用的是 buffer(count, skip), 只是参数skip 也是count, 而 buffer(count, skip) 调用的是 buffer(count, skip,bufferSupplier),那么我们看下 buffer(count, skip,bufferSupplier):
1 2 3 4 5 6 7 8 9 10 public final <U extends Collection<? super T>> Observable<U> buffer(int count, int skip, Callable<U> bufferSupplier) { ObjectHelper . verifyPositive(count , "count" ) ; ObjectHelper . verifyPositive(skip , "skip" ) ; ObjectHelper . requireNonNull(bufferSupplier , "bufferSupplier is null" ) ; return RxJavaPlugins . onAssembly(new ObservableBuffer<T, U>(this , count , skip , bufferSupplier ) ); }
我们知道,订阅的时候调用的 subscribe 方法,而 subscribe 调用是Observable的子类的 subscribeActual 方法,而这里Observable的子类就是 ObservableBuffer。
1 2 3 4 5 6 7 8 9 10 11 protected void subscribeActual(Observer<? super U> t) { if (skip == count ) { BufferExactObserver<T, U> bes = new BufferExactObserver<T, U>(t, count , bufferSupplier); if (bes.createBuffer()) { source .subscribe(bes); } } else { source .subscribe(new BufferSkipObserver<T, U>(t, count , skip, bufferSupplier)); } }
我们分两种情况看:
首先第一种:
创建了一个 BufferExactObserver 对象,然后执行 bes.createBuffer() :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 boolean createBuffer() { U b; try { b = ObjectHelper . requireNonNull(bufferSupplier .call () , "Empty buffer supplied" ); } catch (Throwable t) { Exceptions . throwIfFatal(t ) ; buffer = null; if (upstream == null) { EmptyDisposable . error(t, downstream); } else { upstream.dispose() ; downstream.onError(t ) ; } return false ; } buffer = b; return true ; }
我们看到这里没有异常的话都返回true,所以一般都会执行 source.subscribe(bes),这里的source其实是我们示例里Observable.fromIterable(integers) 返回的 ObservableFromIterable对象,fromIterable 我们已经分析过。
我们知道 fromIterable 是遍历 Iterable,而每遍历一项数据都会执行 BufferExactObserver 的 onNext,所以我们直接看下onNext的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public void onNext(T t) { U b = buffer; if (b != null ) { b.add(t); if (++size >= count ) { downstream.onNext(b); size = 0 ; createBuffer(); } } }
我们看到onNext 发射的都是count大小的缓冲,但是假如我们传的数据不是count倍数时,比如24条数据,缓存大小为5,其中20条分四个缓存发射了,那剩下的4就不会发射了吗?
我们从onNext中看到,不管有没有count个元素,都会放到缓存 b 中,而剩下4条会在onComplete()中发射
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public void onComplete ( ) { U b = buffer; if (b != null ) { buffer = null ; if (!b.isEmpty()) { downstream.onNext(b); } downstream.onComplete(); } }
上面就是buffer(count)的整个流程。
第二种情况 (buffer(count, skip)):
从 subscribeActual 方法中我们看到第二种情况直接执行了下面代码:
source.subscribe(new BufferSkipObserver<T, U>(t, count, skip, bufferSupplier));
同样 subscribe 也是走了 fromIterable 的流程,和上面的一样。
我们直接看 BufferSkipObserver 的 onNext方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public void onNext(T t ) { if (index++ % skip == 0 ) { U b; try { b = ObjectHelper . requireNonNull(bufferSupplier .call () , "The bufferSupplier returned a null collection. Null values are generally not allowed in 2.x operators and sources." ); } catch (Throwable e) { buffers.clear() ; upstream.dispose() ; downstream.onError(e ) ; return; } buffers.offer(b); } Iterator<U> it = buffers.iterator() ; while (it.hasNext() ) { U b = it.next() ; b.add(t); if (count <= b.size() ) { it.remove() ; downstream.onNext(b ) ; } } }
同样的当最后数据不足一个缓存的大小时,会执行onComplete方法:
1 2 3 4 5 6 7 8 9 10 @Override public void onComplete() { while (!buffers.isEmpty() ) { downstream.onNext(buffers .poll () ); } downstream.onComplete() ; }
buffer(timespan, unit) 示例 示例1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 Observable .interval (100 , TimeUnit.MILLISECONDS).take (10 ) .buffer (300 , TimeUnit.MILLISECONDS) .subscribe (new Observer<List<Long>>() { @Override public void onSubscribe(Disposable d) { System .out .println ("onSubscribe" ); } @Override public void onNext (List<Long> longs) { System .out .println (longs); } @Override public void onError (Throwable e) { } @Override public void onComplete() { System .out .println ("onComplete" ); } });
1 2 3 4 5 6 7 08 -29 14 :34 :32.359 8656 -8656 /com.zwb.rxjavademo I/System.out : onSubscribe08 -29 14 :34 :32.661 8656 -8729 /com.zwb.rxjavademo I/System.out : [0 , 1 , 2 ]08 -29 14 :34 :32.960 8656 -8729 /com.zwb.rxjavademo I/System.out : [3 , 4 ]08 -29 14 :34 :33.261 8656 -8729 /com.zwb.rxjavademo I/System.out : [5 , 6 , 7 ]08 -29 14 :34 :33.361 8656 -8730 /com.zwb.rxjavademo I/System.out : [8 , 9 ]08 -29 14 :34 :33.361 8656 -8730 /com.zwb.rxjavademo I/System.out : onComplete
打印日志的第四行和第五行才隔了200ms,而不是我们设置的300ms,这个我们下面会分析
分析 我们看下源码: 从 buffer 点击去,最后执行的是
1 2 3 4 5 6 7 8 9 10 11 public final <U extends Collection<? super T>> Observable<U> buffer( long timespan, TimeUnit unit , Scheduler scheduler, int count, Callable<U> bufferSupplier, boolean restartTimerOnMaxSize) { return RxJavaPlugins . onAssembly(new ObservableBufferTimed<T, U>(this , timespan , timespan , unit , scheduler , bufferSupplier , count , restartTimerOnMaxSize ) ); }
我们知道真正订阅是在ObservableBufferTimed类的subscribeActual方法中,我们看下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 protected void subscribeActual(Observer<? super U> t) { if (timespan == timeskip && maxSize == Integer.MAX_VALUE) { source.subscribe(new BufferExactUnboundedObserver <T, U>( new SerializedObserver <U>(t), bufferSupplier, timespan, unit, scheduler)); return ; } Scheduler.Worker w = scheduler.createWorker(); if (timespan == timeskip) { source.subscribe(new BufferExactBoundedObserver <T, U>( new SerializedObserver <U>(t), bufferSupplier, timespan, unit, maxSize, restartTimerOnMaxSize, w )); return ; } source.subscribe(new BufferSkipBoundedObserver <T, U>( new SerializedObserver <U>(t), bufferSupplier, timespan, timeskip, unit, w)); }
我们这里只分析第一种情况:
【代码1】就一行代码,但是这行代码其实很复杂的。
source(被观察者) 是执行 interval 时创建的 ObservableInterval 对象 ,而执行 subscribe 时会执行 ObservableInterval 的 subscribeActual,而 ObservableInterval 的 subscribeActual 默认会在computation线程每隔 100ms(示例1中设置的) 发送一条整数序列,也就是执行一次 onNext (注意:这里的onNext不会走 我们的示例的中onNext)。
下面是 ObservableInterval 的部分代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public void subscribeActual(Observer<? super Long> observer ) { IntervalObserver is = new IntervalObserver(observer ) ; observer.onSubscribe(is ) ; Scheduler sch = scheduler; if (sch instanceof TrampolineScheduler) { Worker worker = sch.createWorker() ; is.setResource(worker ) ; worker.schedulePeriodically(is , initialDelay , period , unit ) ; } else { Disposable d = sch.schedulePeriodicallyDirect(is , initialDelay , period , unit ) ; is.setResource(d ) ; } } @Override public void run() { if (get() != DisposableHelper.DISPOSED) { downstream.onNext(count ++ ) ; } }
interval参考 interval 示例和详细解析
我们从这里没看到什么时候执行onComplete的方法,也就说明了Interval是无限发射数据的,但是我们上面的日志打印了onComplete方法,其实是take中 执行了onComplete方法。
我们回过头再看【代码1】 里面创建了两个类:BufferExactUnboundedObserver 和 SerializedObserver,这两个类把我们自己创建的观察者 Observer 又包装了两次。
1 2 3 source.subscribe(new BufferExactUnboundedObserver <T, U>( new SerializedObserver <U>(t), bufferSupplier, timespan, unit, scheduler));
但是在传到 ObservableInterval 的 subscribeActual方法中的是 BufferExactUnboundedObserver的对象,所以上面的 标记1 (onSubscribe)和 标记2 (onNext)会执行 BufferExactUnboundedObserver 的 onSubscribe和onNext方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 @Override public void onSubscribe(Disposable d ) { if (DisposableHelper . validate(this.upstream, d)) { this.upstream = d; U b; try { b = ObjectHelper . requireNonNull(bufferSupplier .call () , "The buffer supplied is null" ); } catch (Throwable e) { Exceptions . throwIfFatal(e ) ; dispose() ; EmptyDisposable . error(e, downstream); return; } buffer = b; downstream.onSubscribe(this ) ; if (!cancelled) { Disposable task = scheduler.schedulePeriodicallyDirect(this , timespan , timespan , unit ) ; if (!timer.compareAndSet(null , task ) ) { task.dispose() ; } } } } @Override public void onNext(T t ) { synchronized (this) { U b = buffer; if (b == null) { return; } b.add(t); } }
scheduler参考 scheduler的详细解析
我们看到在 onSubscribe 中执行一个定时任务,最后回调是 run 方法中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 @Override public void run () { U next; try { next = ObjectHelper.requireNonNull(bufferSupplier.call(), "The bufferSupplier returned a null buffer" ); } catch (Throwable e) { Exceptions.throwIfFatal(e); downstream.onError(e); dispose(); return ; } U current; synchronized (this ) { current = buffer; if (current != null ) { buffer = next; } } if (current == null ) { DisposableHelper.dispose(timer); return ; } fastPathEmit(current, false , this ); } @Override public void accept (Observer<? super U> a, U v) { downstream.onNext(v); }
我们从上面代码中看到,标注3 *和 * 标注4 的 onSubscribe 和 onNext 方法是 SerializedObserver中的,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Override public void onSubscribe(@NonNull Disposable d) { if (DisposableHelper.validate(this .upstream, d)) { this .upstream = d; downstream.onSubscribe(this ); } } @Override public void onNext(@NonNull T t) { downstream.onNext(t); }
我们看到最后给我们自己创建的观察者发射数据是在 SerializedObserver 中,包括onSubscribe、onComplete的回调
但是整个流程还没完,我们打印的日志里 第四条和第五条的时间间隔不是我们设置的300ms,这是怎么回事呢
我们知道其实上面的代码中我们看到了两个线程在执行,一个是interval的线程隔100ms发射数据给buffer,另个一是buffer的线程每个300ms发射数据缓存 ,但是这俩个线程是异步的, 也就是说interval(100, TimeUnit.MILLISECONDS).take(10)
发射的10条数据完成后会直接执行onComplete(),而onComplete()把剩下的数据缓存全部发射,并且onComplete()也不需要等待300ms后在执行。
这里take的onComplete()最终会调用 BufferExactUnboundedObserver 的 onComplete方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Override public void onComplete() { U b; synchronized (this) { b = buffer; buffer = null; } if (b != null) { queue.offer(b); done = true ; if (enter() ) { QueueDrainHelper . drainLoop(queue , downstream , false , null , this ) ; } } DisposableHelper . dispose(timer); }
这样buffer(timespan, unit) 的整个流程就分析完了。上面示例分析加了interval 和 take ,所以会有些复杂。 下面看下简单的示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 Observable .create (new ObservableOnSubscribe<Integer >() { @Override public void subscribe(ObservableEmitter<Integer > emitter) throws Exception { for (int i=0 ;i<10 ;i++){ emitter.onNext(i); Thread.sleep(100 ); } emitter.onComplete(); } }) .buffer(300 , TimeUnit.MILLISECONDS) .subscribe(new Observer<List<Integer >>() { @Override public void onSubscribe(Disposable d) { System .out .println("onSubscribe"); } @Override public void onNext(List<Integer > longs) { System .out .println(longs); } @Override public void onError(Throwable e) { } @Override public void onComplete() { System .out .println("onComplete"); } });
1 2 3 4 5 6 I/System.out : onSubscribe I/System.out : [0 , 1 , 2 ] I/System.out : [3 , 4 , 5 ] I/System.out : [6 , 7 , 8 ] I/System.out : [9 ] I/System.out : onComplete
时序图如下: