当前位置: 代码迷 >> 综合 >> guava 限流的两种方式
  详细解决方案

guava 限流的两种方式

热度:25   发布时间:2023-12-27 05:53:47.0

版权声明: https://blog.csdn.net/mengxpFighting/article/details/79117934

java中对于生产者消费者模型,或者小米手机营销  1分钟卖多少台手机等都存在限流的思想在里面。
关于限流 目前存在两大类,从线程个数(jdk1.5 Semaphore)和RateLimiter速率(guava)
Semaphore:从线程个数限流
RateLimiter:从速率限流  目前常见的算法是漏桶算法和令牌算法
令牌桶算法。相比漏桶算法而言区别在于,令牌桶是会去匀速的生成令牌,拿到令牌才能够进行处理,类似于匀速往桶里放令牌
漏桶算法是:生产者消费者模型,生产者往木桶里生产数据,消费者按照定义的速度去消费数据
应用场景:
漏桶算法:必须读写分流的情况下,限制读取的速度
令牌桶算法:必须读写分离的情况下,限制写的速率或者小米手机饥饿营销的场景  只卖1分种抢购1000
实现的方法都是一样。RateLimiter来实现
对于多线程问题查找时,很多时候可能使用的类都是原子性的,但是由于代码逻辑的问题,也可能发生线程安全问题
 
一、问题描述  某天A君突然发现自己的接口请求量突然涨到之前的10倍,没多久该接口几乎不可使用,并引发连锁反应导致整个系统崩溃。如何应对这种情况呢?生活给了我们答案:比如老式电闸都安装了保险丝,一旦有人使用超大功率的设备,保险丝就会烧断以保护各个电器不被强电流给烧坏。同理我们的接口也需要安装上“保险丝”,以防止非预期的请求对系统压力过大而引起的系统瘫痪,当流量过大时,可以采取拒绝或者引流等机制。 
二、常用的限流算法常用的限流算法有两种:漏桶算法和令牌桶算法,这篇博文介绍得比较清晰(过载保护算法浅析)。漏桶算法思路很简单,请求先进入到漏桶里,漏桶以一定的速度出水,当水请求过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率。图1 漏桶算法示意图对于很多应用场景来说,除了要求能够限制数据的平均传输速率外,还要求允许某种程度的突发传输。这时候漏桶算法可能就不合适了,令牌桶算法更为适合。如图2所示,令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。图2 令牌桶算法示意图
三、限流工具类RateLimitergoogle开源工具包guava提供了限流工具类RateLimiter,该类基于“令牌桶算法”,非常方便使用。该类的接口描述请参考:RateLimiter接口描述,具体的使用请参考:RateLimiter使用实践。


 

1.关于RateLimter和Semphore简单用法
package concurrent;import com.google.common.util.concurrent.RateLimiter;import java.util.concurrent.*;
import java.util.stream.IntStream;import static java.lang.Thread.currentThread;/*** ${DESCRIPTION}* 关于限流 目前存在两大类,从线程个数(jdk1.5 Semaphore)和RateLimiter速率(guava)* Semaphore:从线程个数限流* RateLimiter:从速率限流  目前常见的算法是漏桶算法和令牌算法,下面会具体介绍** @author mengxp* @version 1.0* @create 2018-01-15 22:44**/
public class RateLimiterExample {//Guava  0.5的意思是 1秒中0.5次的操作,2秒1次的操作  从速度来限流,从每秒中能够执行的次数来private final static RateLimiter limiter=RateLimiter.create(0.5d);//同时只能有三个线程工作 Java1.5  从同时处理的线程个数来限流private final static Semaphore sem=new Semaphore(3);private static void testSemaphore(){try {sem.acquire();System.out.println(currentThread().getName()+" is doing work...");TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(10));} catch (InterruptedException e) {e.printStackTrace();}finally {sem.release();System.out.println(currentThread().getName()+" release the semephore..other thread can get and do job");}}public static void runTestSemaphore(){ExecutorService service = Executors.newFixedThreadPool(10);IntStream.range(0,10).forEach((i)->{//RateLimiterExample::testLimiter 这种写法是创建一个线程service.submit(RateLimiterExample::testSemaphore);});}/*** Guava的RateLimiter*/private static void testLimiter(){System.out.println(currentThread().getName()+" waiting  " +limiter.acquire());}//Guava的RateLimiterpublic static void runTestLimiter(){ExecutorService service = Executors.newFixedThreadPool(10);IntStream.range(0,10).forEach((i)->{//RateLimiterExample::testLimiter 这种写法是创建一个线程service.submit(RateLimiterExample::testLimiter);});}public static void main(String[] args) {IntStream.range(0,10).forEach((a)-> System.out.println(a));//从0-9//runTestLimiter();runTestSemaphore();}
}


2.实现漏桶算法

package concurrent.BucketAl;import com.google.common.util.concurrent.Monitor;
import com.google.common.util.concurrent.RateLimiter;import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;import static java.lang.Thread.currentThread;/*** ${DESCRIPTION}** @author mengxp* @version 1.0* @create 2018-01-20 22:42* 实现漏桶算法 实现多线程生产者消费者模型 限流**/
public class Bucket {//定义桶的大小private final ConcurrentLinkedQueue<Integer> container=new ConcurrentLinkedQueue<>();private final static int  BUCKET_LIMIT=1000;//消费者 不论多少个线程,每秒最大的处理能力是1秒中执行10次private final RateLimiter consumerRate=RateLimiter.create(10d);//往桶里面放数据时,确认没有超过桶的最大的容量private Monitor offerMonitor=new Monitor();//从桶里消费数据时,桶里必须存在数据private Monitor consumerMonitor=new Monitor();/*** 往桶里面写数据* @param data*/public void submit(Integer data){if (offerMonitor.enterIf(offerMonitor.newGuard(()->container.size()<BUCKET_LIMIT))){try {container.offer(data);System.out.println(currentThread()+" submit.."+data+" container size is :["+container.size()+"]");} finally {offerMonitor.leave();}}else {//这里时候采用降级策略了。消费速度跟不上产生速度时,而且桶满了,抛出异常//或者存入MQ DB等后续处理throw new IllegalStateException(currentThread().getName()+"The bucket is ful..Pls latter can try...");}}/*** 从桶里面消费数据* @param consumer*/public void takeThenConsumer(Consumer<Integer> consumer){if (consumerMonitor.enterIf(consumerMonitor.newGuard(()->!container.isEmpty()))){try {//不打印时 写 consumerRate.acquire();System.out.println(currentThread()+"  waiting"+consumerRate.acquire());Integer data = container.poll();//container.peek() 只是去取出来不会删掉consumer.accept(data);}finally {consumerMonitor.leave();}}else {//当木桶的消费完后,可以消费那些降级存入MQ或者DB里面的数据System.out.println("will consumer Data from MQ...");try {TimeUnit.SECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}}}}

2.1 漏桶算法测试类

package concurrent.BucketAl;import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;import static java.lang.Thread.currentThread;/*** ${DESCRIPTION}** @author mengxp* @version 1.0* @create 2018-01-20 23:11* 漏桶算法测试* 实现漏桶算法 实现多线程生产者消费者模型 限流**/
public class BuckerTest {public static void main(String[] args) {final Bucket bucket = new Bucket();final AtomicInteger DATA_CREATOR = new AtomicInteger(0);//生产线程 10个线程 每秒提交 50个数据  1/0.2s*10=50个IntStream.range(0, 10).forEach(i -> {new Thread(() -> {for (; ; ) {int data = DATA_CREATOR.incrementAndGet();try {bucket.submit(data);TimeUnit.MILLISECONDS.sleep(200);} catch (Exception e) {//对submit时,如果桶满了可能会抛出异常if (e instanceof IllegalStateException) {System.out.println(e.getMessage());//当满了后,生产线程就休眠1分钟try {TimeUnit.SECONDS.sleep(60);} catch (InterruptedException e1) {e1.printStackTrace();}}}}}).start();});//消费线程  采用RateLimiter每秒处理10个  综合的比率是5:1IntStream.range(0, 10).forEach(i -> {new Thread(() -> {for (; ; ) {bucket.takeThenConsumer(x -> {System.out.println(currentThread()+"C.." + x);});}}).start();});}
}

3.令牌桶算法

package concurrent.TokenBucket;import com.google.common.util.concurrent.RateLimiter;import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;import static java.lang.Thread.currentThread;
import static java.lang.Thread.interrupted;/*** ${DESCRIPTION}** @author mengxp* @version 1.0* @create 2018-01-21 0:18* 令牌桶算法。相比漏桶算法而言区别在于,令牌桶是会去匀速的生成令牌,拿到令牌才能够进行处理,类似于匀速往桶里放令牌* 漏桶算法是:生产者消费者模型,生产者往木桶里生产数据,消费者按照定义的速度去消费数据** 应用场景:* 漏桶算法:必须读写分流的情况下,限制读取的速度* 令牌桶算法:必须读写分离的情况下,限制写的速率或者小米手机饥饿营销的场景  只卖1分种抢购1000** 实现的方法都是一样。RateLimiter来实现* 对于多线程问题查找时,很多时候可能使用的类都是原子性的,但是由于代码逻辑的问题,也可能发生线程安全问题**/
public class TokenBuck {//可以使用 AtomicInteger+容量  可以不用Queue实现private AtomicInteger phoneNumbers=new AtomicInteger(0);private RateLimiter rateLimiter=RateLimiter.create(20d);//一秒只能执行五次//默认销售500台private final static int DEFALUT_LIMIT=500;private final int saleLimit;public TokenBuck(int saleLimit) {this.saleLimit = saleLimit;}public TokenBuck() {this(DEFALUT_LIMIT);}public int buy(){//这个check 必须放在success里面做判断,不然会产生线程安全问题(业务引起)//原因当phoneNumbers=99 时 同时存在三个线程进来。虽然phoneNumbers原子性,但是也会发生。如果必须写在这里,在success//里面也需要加上double check/* if (phoneNumbers.get()>=saleLimit){throw new IllegalStateException("Phone has been sale "+saleLimit+" can not  buy more...")}*///目前设置超时时间,10秒内没有抢到就抛出异常//这里的TimeOut*Ratelimiter=总数  这里的超时就是让别人抢几秒,所以设置总数也可以由这里的超时和RateLimiter来计算boolean success = rateLimiter.tryAcquire(10, TimeUnit.SECONDS);if (success){if (phoneNumbers.get()>=saleLimit){throw new IllegalStateException("Phone has been sale "+saleLimit+" can not  buy more...");}int phoneNo = phoneNumbers.getAndIncrement();System.out.println(currentThread()+" user has get :["+phoneNo+"]");return phoneNo;}else {//超时后 同一时间,很大的流量来强时,超时快速失败。throw new RuntimeException(currentThread()+"has timeOut can try again...");}}
}

3.1 令牌桶算法的测试类

package concurrent.TokenBucket;import java.util.stream.IntStream;/*** ${DESCRIPTION}** @author mengxp* @version 1.0* @create 2018-01-21 0:40**/
public class TokenBuckTest {public static void main(String[] args) {final TokenBuck tokenBuck=new TokenBuck(200);IntStream.range(0,300).forEach(i->{//目前测试时,让一个线程抢一次,不用循环抢//tokenBuck::buy 这种方式 产生一个Runnablenew Thread(tokenBuck::buy).start();});}
}

--------------------- 本文来自 大数据孟小鹏 的CSDN 博客 ,全文地址请点击:https://blog.csdn.net/mengxpfighting/article/details/79117934?utm_source=copy

 

Moniter相关作用:https://www.cnblogs.com/hupengcool/p/4250903.html

Monitor类是作为ReentrantLock的一个替代,代码中使用 Monitor比使用ReentrantLock更不易出错,可读性也更强,并且也没有显著的性能损失,使用Monitor甚至有潜在的性能得到优化。

public abstract static class Guard:一个标识线程是否等待的布尔条件,Guard类总是与单一的Monitor相关联,Monitor可以在任意时间从任意占用Monitor的线程检查Guard,这样代码的编写将不在关心Guard是否被检查的频率。

public abstract boolean isSatisfied():Guard内部提供的抽象方法,isSatisfied(),当被关联的Monitor被占用时,Guard的此方法会被调用,该方法的实现必须取决于被关联Monitor保护的状态,并且状态不可修改。

Monitor有几个常用的方法

  • enter():进入到当前Monitor,无限期阻塞,等待锁。(这里没有Guard)
  • enter(long time, TimeUnit unit):进入到当前Monitor,最多阻塞给定的时间,返回是否进入Monitor。
  • tryEnter():如果可以的话立即进入Monitor,不阻塞,返回是否进入Monitor。
  • enterWhen(Guard guard):进入当前Monitor,等待Guard的isSatisfied()为true后,继续往下执行 ,但可能会被打断; 为false,会阻塞。
  • enterIf(Guard guard):如果Guard的isSatisfied()为true,进入当前Monitor。等待获得锁(这里会等待获取锁),不需要等待Guard satisfied。
  • tryEnterIf(Guard guard):如果Guard的isSatisfied()为true并且可以的话立即进入Monitor,不等待获取锁(这里不等待获取锁),也不等待Guard satisfied。
  • 感觉  enterWhen enterIf的用的区别就是前者无返回值,后者返回Boolean值。
  • newGuard(Boolean b)为{@代码}监视器创建一个新的{@链接守护}