0%

Rxjava2 - buffer示例和源码解析

buffer示例和源码解析

Buffer

收集Observable的数据放进一个数据包裹,然后发射这些数据包裹,而不是一次发射一个值。

Buffer 操作符将一个 Observable 变换为另一个,原来的 Observable 正常发射数据,变换产生的 Observable 发射这些数据的缓存集合。

注意:如果原来的Observable发射了一个onError通知,Buffer会立即传递这个通知,而不是首先发射缓存的数据,即使在这之前缓存中包含了原始Observable发射的数据。

buffer(count, skip)
示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
List<Integer> integers = new ArrayList<>();
for (int i = 0; i < 20; i++) {
integers.add(i);
}
Observable<Integer> observable = Observable.fromIterable(integers);

observable
.buffer(54)
.subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
System.out.println("接收----->" + integers);
}
});
1
2
3
4
5
System.out: 接收----->[0, 1, 2, 3, 4]
System.out: 接收----->[4, 5, 6, 7, 8]
System.out: 接收----->[8, 9, 10, 11, 12]
System.out: 接收----->[12, 13, 14, 15, 16]
System.out: 接收----->[16, 17, 18, 19]

每个缓存里面包含 count 个元素,我们这里把数据列表 假设为List,索引为index。那么下一个缓存是以 index % skip == 0 开始,取 count 个元素。 我们从上面的打印日志中就可以看出,每个缓存的 开始元素索引和skip取余都为0。我们也可以通俗的理解为 先取 count 个元素,然后每隔 skip 个再取 count 个。

我们把skip为6看下输出结果:

1
2
3
4
5
6
7
8
// 0 % 6==0
ystem.out: 接收----->[0, 1, 2, 3, 4]
// 6 % 6==0
System.out: 接收----->[6, 7, 8, 9, 10]
// 12 % 6==0
System.out: 接收----->[12, 13, 14, 15, 16]
// 18 % 6==0
System.out: 接收----->[18, 19]
分析

我们看下源码:

先从 buffer(count) 点进去,他调用的是 buffer(count, skip), 只是参数skip 也是count, 而 buffer(count, skip) 调用的是 buffer(count, skip,bufferSupplier),那么我们看下 buffer(count, skip,bufferSupplier):

1
2
3
4
5
6
7
8
9
10
public final <U extends Collection<? super T>> Observable<U> buffer(int count, int skip, Callable<U> bufferSupplier) {
// count 必须大于0
ObjectHelper.verifyPositive(count, "count");
// skip 必须大于0
ObjectHelper.verifyPositive(skip, "skip");
//bufferSupplier 不能为null
ObjectHelper.requireNonNull(bufferSupplier, "bufferSupplier is null");
//装配 ObservableBuffer 对象返回
return RxJavaPlugins.onAssembly(new ObservableBuffer<T, U>(this, count, skip, bufferSupplier));
}

我们知道,订阅的时候调用的 subscribe 方法,而 subscribe 调用是Observable的子类的 subscribeActual 方法,而这里Observable的子类就是 ObservableBuffer。

1
2
3
4
5
6
7
8
9
10
11
protected void subscribeActual(Observer<? super U> t) {
//第一种情况: 当skip 和 count相等,或者 buffer(count)时会走这里
if (skip == count) {
BufferExactObserver<T, U> bes = new BufferExactObserver<T, U>(t, count, bufferSupplier);
if (bes.createBuffer()) {
source.subscribe(bes);
}
} else {//第二种情况: 当skip 和 count不相等时
source.subscribe(new BufferSkipObserver<T, U>(t, count, skip, bufferSupplier));
}
}

我们分两种情况看:

首先第一种:

创建了一个 BufferExactObserver 对象,然后执行 bes.createBuffer() :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
boolean createBuffer() {
//这里的 U 的类型是 List<Integer>
U b;
try {
//创建一个空的 ArrayList,bufferSupplier是ArrayListSupplier的对象,
//他的call执行了 return new ArrayList<Object>();
b = ObjectHelper.requireNonNull(bufferSupplier.call(), "Empty buffer supplied");
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
buffer = null;
if (upstream == null) {
EmptyDisposable.error(t, downstream);
} else {
upstream.dispose();
downstream.onError(t);
}
return false;
}
// 把空的ArrayList赋给 buffer,buffer是一个List<Integer>类型的变量
buffer = b;
return true;
}

我们看到这里没有异常的话都返回true,所以一般都会执行 source.subscribe(bes),这里的source其实是我们示例里Observable.fromIterable(integers) 返回的 ObservableFromIterable对象,fromIterable 我们已经分析过。

我们知道 fromIterable 是遍历 Iterable,而每遍历一项数据都会执行 BufferExactObserver 的 onNext,所以我们直接看下onNext的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* fromIterable 遍历 Iterable,执行onNext
* @param t 每遍历一项数据 发射的Item
*/
public void onNext(T t) {
//buffer 上面创建的空的 ArrayList
U b = buffer;
if (b != null) {
// t 是fromIterable中返回的每一条item数据
b.add(t);
// size 记录存入缓存区的元素个数,当元素个数>=我们传进去的大小时,就发射一个count大小的缓冲区
//发射完后,元素个数赋值为0,重新创建一个新的缓存区覆盖原来的缓存区
if (++size >= count) {
//发射一个count大小的缓冲区
downstream.onNext(b);
size = 0;
createBuffer();
}
}
}

我们看到onNext 发射的都是count大小的缓冲,但是假如我们传的数据不是count倍数时,比如24条数据,缓存大小为5,其中20条分四个缓存发射了,那剩下的4就不会发射了吗?

我们从onNext中看到,不管有没有count个元素,都会放到缓存 b 中,而剩下4条会在onComplete()中发射

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 1、缓存中还有元素,发射剩下的元素,执行onComplete
* 2、缓存中没有元素,直接执行onComplete
*/
public void onComplete() {
U b = buffer;
if (b != null) {
buffer = null;
// 这时的缓存区的元素就是剩下的4条元素(假如数据总数不是count的倍数)
if (!b.isEmpty()) {
downstream.onNext(b);
}
//发送完成后执行 onComplete
downstream.onComplete();
}
}

上面就是buffer(count)的整个流程。

第二种情况 (buffer(count, skip)):

从 subscribeActual 方法中我们看到第二种情况直接执行了下面代码:

source.subscribe(new BufferSkipObserver<T, U>(t, count, skip, bufferSupplier));

同样 subscribe 也是走了 fromIterable 的流程,和上面的一样。

我们直接看 BufferSkipObserver 的 onNext方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* fromIterable 遍历 Iterable,执行onNext
* @param t 每遍历一项数据 发射的Item
*/
public void onNext(T t) {
// index是遍历 Iterable的索引,判断index+1后和skip取余是否等于0
//假如取余等于0,重新创建一个缓存
if (index++ % skip == 0) {
//U 是 List<Integer>类型的
U b;
try {
//创建一个空的 ArrayList
b = ObjectHelper.requireNonNull(bufferSupplier.call(), "The bufferSupplier returned a null collection. Null values are generally not allowed in 2.x operators and sources.");
} catch (Throwable e) {
buffers.clear();
upstream.dispose();
downstream.onError(e);
return;
}
//buffers 是ArrayDeque类型的对象,把空的ArrayList存入到 buffers中,当做一个缓存区
//当count > slip时,buffers中可能同时会有多个缓存区,这个时候缓存会有重叠的元素
//当count < slip时,buffers在某些情况下没有缓存区,这个时候会有间隙,也就是说部分数据会丢失
buffers.offer(b);
}
Iterator<U> it = buffers.iterator();
//遍历buffers
while (it.hasNext()) {
//获取 buffers 中每一个缓存ArrayList,把当前发射的Item存到缓存区
U b = it.next();
b.add(t);
//判断当前缓存区是否已满
if (count <= b.size()) {
//如果已满,则发射当前缓存区数据,删除当前缓存区
it.remove();
downstream.onNext(b);
}
}
}

同样的当最后数据不足一个缓存的大小时,会执行onComplete方法:

1
2
3
4
5
6
7
8
9
10
@Override
public void onComplete() {
//判断buffers是否存在缓存
while (!buffers.isEmpty()) {
//假如有 直接发射缓存区中的所有元素
downstream.onNext(buffers.poll());
}
//执行onComplete 回调
downstream.onComplete();
}
buffer(timespan, unit)
示例

示例1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Observable
//表示发射10条整数序列,每间隔 100ms发射一条
.interval(100, TimeUnit.MILLISECONDS).take(10)
.buffer(300, TimeUnit.MILLISECONDS)
.subscribe(new Observer<List<Long>>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}

@Override
public void onNext(List<Long> longs) {
System.out.println(longs);
}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {
System.out.println("onComplete");
}
});
1
2
3
4
5
6
7
08-29 14:34:32.359 8656-8656/com.zwb.rxjavademo I/System.out: onSubscribe
08-29 14:34:32.661 8656-8729/com.zwb.rxjavademo I/System.out: [0, 1, 2]
08-29 14:34:32.960 8656-8729/com.zwb.rxjavademo I/System.out: [3, 4]
08-29 14:34:33.261 8656-8729/com.zwb.rxjavademo I/System.out: [5, 6, 7]
//中间隔了200ms
08-29 14:34:33.361 8656-8730/com.zwb.rxjavademo I/System.out: [8, 9]
08-29 14:34:33.361 8656-8730/com.zwb.rxjavademo I/System.out: onComplete

打印日志的第四行和第五行才隔了200ms,而不是我们设置的300ms,这个我们下面会分析

分析

我们看下源码:
从 buffer 点击去,最后执行的是

1
2
3
4
5
6
7
8
9
10
11
public final <U extends Collection<? super T>> Observable<U> buffer(
long timespan, TimeUnit unit,
Scheduler scheduler, int count,
Callable<U> bufferSupplier,
boolean restartTimerOnMaxSize) {
//判断数据的合法性
//。。。。。。

//装配 ObservableBufferTimed 对象返回
return RxJavaPlugins.onAssembly(new ObservableBufferTimed<T, U>(this, timespan, timespan, unit, scheduler, bufferSupplier, count, restartTimerOnMaxSize));
}

我们知道真正订阅是在ObservableBufferTimed类的subscribeActual方法中,我们看下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
protected void subscribeActual(Observer<? super U> t) {
//第一种情况,buffer(timespan, unit) 会走这里,也就是示例1
if (timespan == timeskip && maxSize == Integer.MAX_VALUE) {
//我们暂且把这行称为 【代码1】
source.subscribe(new BufferExactUnboundedObserver<T, U>(
new SerializedObserver<U>(t),
bufferSupplier, timespan, unit, scheduler));
return;
}
//第二种情况
Scheduler.Worker w = scheduler.createWorker();

if (timespan == timeskip) {
source.subscribe(new BufferExactBoundedObserver<T, U>(
new SerializedObserver<U>(t),
bufferSupplier,
timespan, unit, maxSize, restartTimerOnMaxSize, w
));
return;
}
//第三种情况
source.subscribe(new BufferSkipBoundedObserver<T, U>(
new SerializedObserver<U>(t),
bufferSupplier, timespan, timeskip, unit, w));

}

我们这里只分析第一种情况:

【代码1】就一行代码,但是这行代码其实很复杂的。

source(被观察者) 是执行 interval 时创建的 ObservableInterval 对象 ,而执行 subscribe 时会执行 ObservableInterval 的 subscribeActual,而 ObservableInterval 的 subscribeActual 默认会在computation线程每隔 100ms(示例1中设置的) 发送一条整数序列,也就是执行一次 onNext (注意:这里的onNext不会走 我们的示例的中onNext)。

下面是 ObservableInterval 的部分代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void subscribeActual(Observer<? super Long> observer) {
IntervalObserver is = new IntervalObserver(observer);
//这行先标记下 标记1
observer.onSubscribe(is);

Scheduler sch = scheduler;

if (sch instanceof TrampolineScheduler) {
Worker worker = sch.createWorker();
is.setResource(worker);
worker.schedulePeriodically(is, initialDelay, period, unit);
} else {
Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
is.setResource(d);
}
}
@Override
public void run() {
if (get() != DisposableHelper.DISPOSED) {
//标记2
downstream.onNext(count++);
}
}

interval参考 interval 示例和详细解析

我们从这里没看到什么时候执行onComplete的方法,也就说明了Interval是无限发射数据的,但是我们上面的日志打印了onComplete方法,其实是take中 执行了onComplete方法。

我们回过头再看【代码1】里面创建了两个类:BufferExactUnboundedObserver 和 SerializedObserver,这两个类把我们自己创建的观察者 Observer 又包装了两次。

1
2
3
source.subscribe(new BufferExactUnboundedObserver<T, U>(
new SerializedObserver<U>(t),
bufferSupplier, timespan, unit, scheduler));

但是在传到 ObservableInterval 的 subscribeActual方法中的是 BufferExactUnboundedObserver的对象,所以上面的 标记1(onSubscribe)和 标记2(onNext)会执行 BufferExactUnboundedObserver 的 onSubscribe和onNext方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
//同样的U 是 List<Integer>类型的
U b;
try {
//创建一个空的 ArrayList
b = ObjectHelper.requireNonNull(bufferSupplier.call(), "The buffer supplied is null");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
dispose();
EmptyDisposable.error(e, downstream);
return;
}
// 把空的ArrayList赋给 buffer,buffer是一个List<Integer>类型的变量
buffer = b;
// downstream 是SerializedObserver的对象,执行他的onSubscribe
// 标记3
downstream.onSubscribe(this);
if (!cancelled) {
// 这个地方很重要,里面也比较复杂,涉及到 scheduler 的知识点
// 简单点理解,就是执行一个定时任务,最后回调是 run 方法中
Disposable task = scheduler.schedulePeriodicallyDirect(this, timespan, timespan, unit);
if (!timer.compareAndSet(null, task)) {
task.dispose();
}
}
}
}

/**
* 这里是把interval 每发射的数据存到 buffer里面
* @param t t在这里是 interval 每隔100ms 发射的整数
*/
@Override
public void onNext(T t) {
synchronized (this) {
U b = buffer;
if (b == null) {
return;
}
b.add(t);
}
}

scheduler参考 scheduler的详细解析

我们看到在 onSubscribe 中执行一个定时任务,最后回调是 run 方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Override
public void run() {
// 存放下一个时间段的数据
U next;
try {
next = ObjectHelper.requireNonNull(bufferSupplier.call(), "The bufferSupplier returned a null buffer");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
downstream.onError(e);
dispose();
return;
}
// 存放当前数据,,
U current;
synchronized (this) {
//把buffer中的数据存进去
current = buffer;
if (current != null) {
//然后buffer存放下个时间段的数据
buffer = next;
}
}
if (current == null) {
DisposableHelper.dispose(timer);
return;
}
// 执行父类的 fastPathEmit,里面执行了accept,也就是下面的方法
fastPathEmit(current, false, this);
}

/**
* @param a
* @param v 就是上面的 current,也就是要发射的一个缓存区
*/
@Override
public void accept(Observer<? super U> a, U v) {
//downstream 是SerializedObserver的对象,执行他的onNext
//标记4
downstream.onNext(v);
}

我们从上面代码中看到,标注3 *和 *标注4 的 onSubscribe 和 onNext 方法是 SerializedObserver中的,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
public void onSubscribe(@NonNull Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
// 这里才执行了 我们自己创建的 Observer 的 onSubscribe 回调
downstream.onSubscribe(this);
}
}

/**
*
* @param t 这里的 t 是上面创建的一组数据缓存
*/
@Override
public void onNext(@NonNull T t) {
//.......
//我们只看这行代码,发射数据缓存
downstream.onNext(t);

}

我们看到最后给我们自己创建的观察者发射数据是在 SerializedObserver 中,包括onSubscribe、onComplete的回调

但是整个流程还没完,我们打印的日志里 第四条和第五条的时间间隔不是我们设置的300ms,这是怎么回事呢

我们知道其实上面的代码中我们看到了两个线程在执行,一个是interval的线程隔100ms发射数据给buffer,另个一是buffer的线程每个300ms发射数据缓存 ,但是这俩个线程是异步的,
也就是说interval(100, TimeUnit.MILLISECONDS).take(10)发射的10条数据完成后会直接执行onComplete(),而onComplete()把剩下的数据缓存全部发射,并且onComplete()也不需要等待300ms后在执行。

这里take的onComplete()最终会调用 BufferExactUnboundedObserver 的 onComplete方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
public void onComplete() {
U b;
synchronized (this) {
b = buffer;
//清空 buffer
buffer = null;
}
if (b != null) {
queue.offer(b);
// done设置为true后执行downstream(SerializedObserver) 的onComplete方法,
// 因为SerializedObserver的onComplete方法才是正在执行我们创建的观察者的onComplete的方法
done = true;
if (enter()) {
//1、执行BufferExactUnboundedObserver 的 accept
//2、执行SerializedObserver的onComplete
QueueDrainHelper.drainLoop(queue, downstream, false, null, this);
}
}
// 结束buffer的定时任务
DisposableHelper.dispose(timer);
}

这样buffer(timespan, unit) 的整个流程就分析完了。上面示例分析加了interval 和 take ,所以会有些复杂。
下面看下简单的示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i=0;i<10;i++){
emitter.onNext(i);
Thread.sleep(100);
}
emitter.onComplete();
}
})
.buffer(300, TimeUnit.MILLISECONDS)
.subscribe(new Observer<List<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}

@Override
public void onNext(List<Integer> longs) {
System.out.println(longs);
}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {
System.out.println("onComplete");
}
});
1
2
3
4
5
6
I/System.out: onSubscribe
I/System.out: [0, 1, 2]
I/System.out: [3, 4, 5]
I/System.out: [6, 7, 8]
I/System.out: [9]
I/System.out: onComplete

时序图如下:

时序图