当前位置: 代码迷 >> 综合 >> RxJava3源码实现-5-流控制
  详细解决方案

RxJava3源码实现-5-流控制

热度:81   发布时间:2023-10-09 04:45:08.0

目录

1、背景

2、sample()-定期采样「throttleFirst() + throttleLast()」

3、buffer()-批量处理

4、window()-批量处理

5、debounce()-抛弃频繁变动


 

1、背景

在异步处理的前提下, 我们的Observable.create()发布数据的数据有可能快于Observer处理的速度。这就导致 Observer来不及接收所有事件,从而导致Observer无法及时响应 / 处理所有发送过来事件的问题,最终导致缓存区溢出、事件丢失 & OOM。

例如:点击按钮事件:连续过快的点击按钮10次,则只会造成点击2次的效果;

解释:因为点击速度太快了,所以按钮来不及响应。

在RxJava实现回压之前,生产者的生产速度超过消费者的消费速度一直很难解决。为此,RxJava发明了很多操作符来控制生产者的推送数据的量。从而避免消费者出现处理不了的问题。

2、sample()-定期采样「throttleFirst() + throttleLast()」

  1. sample()操作符会定期从上游Observable获取数据,就比如每隔一秒从上面取一次数据,但是这一秒中间的数据将会被丢弃;
  2. throttleLast()和sample()是一样效果,都会取每个时间段的最后一个值;
  3. throttleFirst()与throttleLast()相反,它会取每个时间段第一个值。

sample()有个重载函数可以传入一个Observable作为参数,动态改变监控时间的长短,就比如有些心跳,他们在白天和晚上监控的时间间隔是不一样的。

注意:如果你用main函数测试下面的代码的时候,需要加上Thread.sleep(),不然没有输出。

    /*** 1、interval()-从0开始,每一毫秒+1;* 2、sample()每隔1000毫秒从上游取一次值;* 预期结果:1000、2000、3000.......* 实际结果:1002、2000、3001、4000..... 和预期结果相符,每隔1000毫秒取一次值*/private static void testSample() {Observable.interval(1, TimeUnit.MILLISECONDS).sample(1,TimeUnit.SECONDS).take(10).subscribe(System.out::println);}/*** 1、throttleLast()和sample()是一样效果,都会取每个时间段的最后一个值;* 2、就比如:该例子,是从0开始递增的,但是第一次取的值,是等待一秒以后取的值。*/private static void testThrottleLast() {Observable.interval(1, TimeUnit.MILLISECONDS).throttleLast(1,TimeUnit.SECONDS).take(10).subscribe(System.out::println);}/*** 1、throttleFirst()与throttleLast()相反,它会取每个时间段第一个值。* 2、所以这里的结果应该是:0、1000、2000、3000.......*/private static void testThrottleFirst() {Observable.interval(1, TimeUnit.MILLISECONDS).throttleFirst(1,TimeUnit.SECONDS).take(10).subscribe(System.out::println);}public static void main(String[] args) {testSample();try {Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}}

3、buffer()-批量处理

buffer()操作符会将指定数量的数据聚合成一个List然后发布给Observer。

实现原理:buffer()会不断接受上游的事件,并在内部对他们进行缓冲,直到缓冲区的大小(List<Integer> .size() )达到3,此时整个缓冲区(List<Integer>)会被推送下去。如果完成通知出现时,如果内部的缓冲区大小没达到3,依然会被推送至下游。

适用场景:适用buffer()操作符,可以将细颗粒度的事件替换为数量更少,但规模更大的批处理。例如:如果你想减少数据库的负载,那么就可以将每个事件单独存储的方案,替换为批量存储。

上面将smaple()函数会每隔一段事件从上游取一个数据;buffer()的一个重载版本,可以将某一个时间段的所有数据打包成List发布给下游,如下面的例子3。

    /*** buffer(3)* 输出结果:* [1, 2, 3]* [4, 5, 6]* [7, 8, 9]* [10]*/private static void testbuffer() {Observable.range(1, 10).buffer(3).subscribe((List<Integer> list) -> {System.out.println(list);});}/*** buffer(1,2)  每次跳过元素的数量* 输出结果:*      [1]*      [3]*      [5]*      [7]*      [9]*/private static void testbuffer1() {Observable.range(1, 10).buffer(1,2).subscribe((List<Integer> list) -> {System.out.println(list);});}/***  buffer(1,TimeUnit.SECONDS)  1秒内缓冲的元素转换成List。*  输出结果:*  [0, 1, 2]*  [3, 4, 5]*  [6, 7, 8]*  [9, 10, 11, 12]*  [13, 14, 15]*/private static void testbuffer2() {Observable.interval(300, TimeUnit.MILLISECONDS).buffer(1,TimeUnit.SECONDS).take(5).subscribe((List<Long> list) -> {System.out.println(list);});}

buffer还有一个很厉害的重载版本,它可以控制何时开始缓冲事件,何时停止缓冲事件。

假设现在有这么个一个业务,服务端不停地给客户端推送数据,这个数据非常的频繁。为了减少客户端的压力,客户端想要每隔一段时间,处理一部分数据就可以了,不需要处理所有的数据。而且在白天业务繁忙阶段,加快接受数据的频率,晚上减少频率。

  • 9:00-17:00 ,客户端每隔5秒 截取1秒的数据;
  • 其他的时间,客户端每隔10秒,截取1秒的数据;

我们的数据是每500毫秒发送一个数据,截取1秒的数据,大约是两条数据,输出结果符合我们的预测。

    private static long startTime=System.currentTimeMillis();private static void testBuffer3() {Observable<Duration> insideBusinessHours=Observable.interval(5,TimeUnit.SECONDS).filter(x->isBusinessHour()).map(x->Duration.ofMillis(1000));Observable<Duration> outsideBusinessHours=Observable.interval(10,TimeUnit.SECONDS).filter(x->!isBusinessHour()).map(x->Duration.ofMillis(1000));Observable<Duration> openings=Observable.merge(insideBusinessHours,outsideBusinessHours).doOnNext(s-> System.out.println("时间间隔:"+s.toMillis()+"毫秒"));Observable.interval(500, TimeUnit.MILLISECONDS).buffer(openings,duration -> Observable.empty().delay(duration.toMillis(),TimeUnit.MILLISECONDS)).subscribe((List<Long> list) -> {long endTime=System.currentTimeMillis();long result=endTime-startTime;System.out.println("time="+(result)+"---"+list);startTime=endTime;});}/*** 判断是否是  9:00 --17:00 之内的时间段*/private static final LocalTime BUSINESS_START = LocalTime.of(9, 0);private static final LocalTime BUSINESS_END = LocalTime.of(17, 0);private static boolean isBusinessHour(){ZonedDateTime zdt=ZonedDateTime.now();LocalTime localTime=zdt.toLocalTime();return !localTime.isBefore(BUSINESS_START)&&!localTime.isAfter(BUSINESS_END);}//输出结果
时间间隔:1000毫秒
time=6229---[10, 11]
时间间隔:1000毫秒
time=4981---[20, 21]
时间间隔:1000毫秒
time=5000---[30, 31]
时间间隔:1000毫秒
time=4997---[40, 41]
时间间隔:1000毫秒
time=5002---[49, 50, 51]
时间间隔:1000毫秒
time=5002---[60, 61]
时间间隔:1000毫秒
time=4996---[69, 70, 71]
时间间隔:1000毫秒
time=5000---[80, 81]
时间间隔:1000毫秒
time=4998---[90, 91]

4、window()-批量处理

window() 和buffer()的功能几乎是完全一样,上面buffer()能实现的功能,window()操作符也都可以实现。

buffer()操作符会在缓冲的时候,不断创建List对象,这在内存消耗方面是难以预料;window()操作符则不会进行中间的缓存,而是采取直接消费的方式。

buffer()的返回结果是:Observable<List<Integer>> 

window()的返回结果是:Observable<Observable<Integer>>

5、debounce()-抛弃频繁变动

debounce()会对上游数据的发布频率进行监听,比如:debounce(1秒) ,如果两条数据发布的时间间隔小于1秒,那么这两条数据都不会发布,这时候debounce()会监听第三个数据,如果第三个数据从发布到1秒后,都没有其他数据发布,那么这第三条数据就会被发布。

对于下面第二种没有输出的情况,我们可以适用timeout()操作符来报TimeOutException异常来防止无限地静默状态。

还有一种解决方案就是利用timeout()提供备用Observable流。

       /*** 每个值都会输出*/Observable.interval(500, TimeUnit.MILLISECONDS).debounce(100,TimeUnit.MILLISECONDS).subscribe(s-> System.out.println("输出1="+s));/*** 没有任何值输出*/Observable.interval(50, TimeUnit.MILLISECONDS).debounce(100,TimeUnit.MILLISECONDS).subscribe(s-> System.out.println("输出2="+s));------------------------解决方案---递归------------------------timeDebounce(Observable.interval(50, TimeUnit.MILLISECONDS)).subscribe(s-> System.out.println("输出3="+s)) ;private static  Observable<Long> timeDebounce(Observable<Long> upStream){Observable<Long> onTimeout=upStream.take(1).concatWith(Observable.defer(()->timeDebounce(upStream)));return upStream.debounce(100,TimeUnit.MILLISECONDS).timeout(1,TimeUnit.SECONDS,onTimeout);}
//输出结果:
输出3=0
输出3=0
输出3=0
输出3=0
输出3=0
输出3=0

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

  相关解决方案