当前位置: 代码迷 >> 综合 >> Semaphore 的介绍与实现生产者消费者模式案例
  详细解决方案

Semaphore 的介绍与实现生产者消费者模式案例

热度:78   发布时间:2023-09-22 11:26:13.0

一、介绍 

   Semaphore是一种基于计数的信号量。它可以设定一个阈值,基于此,多个线程竞争获取许可信号,做完自己的申请后归还,超过阈值后,线程申请许可信号将会被阻塞。Semaphore可以用来构建一些对象池,资源池之类的,比如数据库连接池,我们也可以创建计数为1的Semaphore,将其作为一种类似互斥锁的机制,这也叫二元信号量,表示两种互斥状态。它的用法如下:
 


public class Service {private Semaphore semaphore = new Semaphore(1);//只能通过一个线程public void testMethod() {try {semaphore.acquire();//获取许可 permitSystem.out.println(Thread.currentThread().getName()+"begin timer:"+System.currentTimeMillis());Thread.sleep(5000);System.out.println(Thread.currentThread().getName()+"end timer="+System.currentTimeMillis());semaphore.release();//释放持有许可} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}
}
    • 二、方法摘要

      所有方法  接口方法  具体的方法 
      Modifier and Type Method and Description
      void acquire()

      从该信号量获取许可证,阻止直到可用,或线程为 interrupted 。

      void acquire(int permits)

      从该信号量获取给定数量的许可证,阻止直到所有可用,否则线程为 interrupted 。

      void acquireUninterruptibly()

      从这个信号灯获取许可证,阻止一个可用的。

      void acquireUninterruptibly(int permits)

      从该信号量获取给定数量的许可证,阻止直到所有可用。

      int availablePermits()

      返回此信号量中当前可用的许可数。

      int drainPermits()

      获取并返回所有可立即获得的许可证。

      protected Collection<Thread> getQueuedThreads()

      返回一个包含可能正在等待获取的线程的集合。

      int getQueueLength()

      返回等待获取的线程数的估计。

      boolean hasQueuedThreads()

      查询任何线程是否等待获取。

      boolean isFair()

      如果此信号量的公平设置为真,则返回 true

      protected void reducePermits(int reduction)

      缩小可用许可证的数量。

      void release()

      释放许可证,将其返回到信号量。

      void release(int permits)

      释放给定数量的许可证,将其返回到信号量。

      String toString()

      返回一个标识此信号量的字符串及其状态。

      boolean tryAcquire()

      从这个信号量获得许可证,只有在调用时可以使用该许可证。

      boolean tryAcquire(int permits)

      从这个信号量获取给定数量的许可证,只有在调用时全部可用。

      boolean tryAcquire(int permits, long timeout, TimeUnit unit)

      从该信号量获取给定数量的许可证,如果在给定的等待时间内全部可用,并且当前线程尚未 interrupted 。

      boolean tryAcquire(long timeout, TimeUnit unit)

      如果在给定的等待时间内可用,并且当前线程尚未 到达 interrupted,则从该信号量获取许可。

    • 三、构造方法详细信息

      • Semaphore

        public Semaphore(int permits)

        创建一个 Semaphore与给定数量的许可证和非公平公平设置。

        参数

        permits - permits的初始许可证。 该值可能为负数,在这种情况下,必须在任何获取被授予之前发布释放。

      • Semaphore

        public Semaphore(int permits,boolean fair)

        创建一个 Semaphore与给定数量的许可证和给定的公平设置。

        参数

        permits - permits的初始许可证。 该值可能为负数,在这种情况下,必须在任何获取被授予之前发布释放。

        fair - true如果这个信号量将保证首先在竞争中首先授予许可证,否则 false

四、用Semaphore实现生产者消费者模式

Service 处理业务逻辑:


import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;public class RepastService {volatile private Semaphore setSemaphore = new Semaphore(10);//厨师volatile private Semaphore getSemaphore = new Semaphore(10);//顾客volatile private ReentrantLock lock = new ReentrantLock(); //锁volatile private Condition setCondition = lock.newCondition();volatile private Condition getCondition = lock.newCondition();volatile private Object[] producePosition = new Object[4];//只有四个餐盘private boolean isEmpty() {boolean isEmpty = true;for (int i = 0; i< producePosition.length;i++) {if (producePosition[i]!=null) {isEmpty=false;break;}}if(isEmpty==true) {return true;}else {return false;}			}private boolean isFull() {boolean isFull = true;for (int i = 0; i < producePosition.length; i++) {if(producePosition[i]==null) {isFull = false;break;}}return isFull;}public void set() {try {setSemaphore.acquire();lock.lock();while(isFull()) {//生产者在等待setCondition.await();}for (int i = 0; i < producePosition.length; i++) {if(producePosition[i]==null) {producePosition[i] = "数据";System.out.println(Thread.currentThread().getName()+" 生产了 "+producePosition[i]);break;}}getCondition.signalAll();lock.unlock();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();} finally {setSemaphore.release();}}public void get() {try {getSemaphore.acquire();lock.lock();while (isEmpty()) {getCondition.await();}for (int i = 0; i < producePosition.length; i++) {if(producePosition[i]!=null) {System.out.println(Thread.currentThread().getName()+" 消费了 "+ producePosition[i]);producePosition[i]=null;break;}} setCondition.signalAll();lock.unlock();}catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();} finally {getSemaphore.release();}}
}

ThreadCus 消费者:


public class ThreadCus extends Thread {private RepastService service ;public ThreadCus(RepastService service) {super();this.service = service;}public void run() {service.get();}
}

ThreadPro 生产者:


public class ThreadPro extends Thread {private RepastService service ;public ThreadPro(RepastService service) {super();this.service = service;}public void run() {service.set();}
}

测试类:


public class Runtest {public static void main(String[] args) throws InterruptedException {RepastService service = new RepastService();ThreadPro[] arrayP = new ThreadPro[60];ThreadCus[] arrayC = new ThreadCus[60];for (int i = 0; i < arrayP.length; i++) {arrayP[i] = new ThreadPro(service);arrayC[i] = new ThreadCus(service);}Thread.sleep(2000);for (int i = 0; i < arrayP.length; i++) {arrayP[i].start();arrayC[i].start();}}
}

 

  相关解决方案