当前位置: 代码迷 >> Java相关 >> 生产者-消费者形式
  详细解决方案

生产者-消费者形式

热度:361   发布时间:2016-04-22 19:12:04.0
生产者-消费者模式
  • 简介

  生产者-消费者模式是经典的多线程设计模式,它为多线程间的协作提供了良好的解决方案。在生产者-消费者模式中,有两类线程:若干个生产者线程和若干个消费者线程。生产者负责提交用户请求,消费者用于具体的处理生产者提交的任务。生产者和消费者通过共享内存缓冲区进行数据通信。

  生产者-消费者模式的基本结构如下图:

  通过上图可以看出,生产者和消费者通过共享内存缓冲区进行通信,他们之间并不进行直接的通信,从而减少了他们之间的耦合,生产者不需要直到消费者的存在,消费者也不需要知道生产者的存在。内存缓冲区主要的功能是实现数据在多线程间的共享,此外,通过该缓冲区,还可以缓解生产者和消费者之间的性能差异。

  • 生产者-消费者模式实现

  下面以生产者-消费者模式的简单实现,介绍该模式的优点:

  生产者代码:

 1 public class Producer implements Runnable { 2      3     private volatile boolean isRunnig = true; 4      5     private BlockingQueue<PCData> queue;//缓冲队列 6      7     private static AtomicInteger count = new AtomicInteger(); 8      9     private static final int SLEEPTIME = 1000;10     11     12     public Producer(BlockingQueue<PCData> queue) {13         this.queue = queue;14     }15 16 17     @Override18     public void run() {19         PCData pcData = null;20         Random r = new Random();21         22         System.out.println("start producer id:"+Thread.currentThread().getId());23         24         try {25             while(true){26                 Thread.sleep(r.nextInt(SLEEPTIME));27                 pcData = new PCData(count.incrementAndGet());28                 System.out.println(pcData +"is put into queue");29                 if(!queue.offer(pcData, 2, TimeUnit.SECONDS)){30                     System.err.println("fail to put data:"+pcData);31                 }32             }33         } catch (Exception e) {34             // TODO: handle exception35             e.printStackTrace();36             Thread.currentThread().interrupt();37         }38         39     }40     41     public void stop(){42         isRunnig = false;43     }44     45 46 }

  消费者代码:

 1 public class Consumer implements Runnable { 2      3     private BlockingQueue<PCData> queue;//缓冲队列 4      5     private static final int SLEEPTIME=1000; 6      7      8      9     public Consumer(BlockingQueue<PCData> queue) {10         this.queue = queue;11     }12 13 14 15     @Override16     public void run() {17         System.out.println("start constomer id:"+Thread.currentThread().getId());18         Random r = new Random();19         try {20             while(true){21                 PCData data = queue.take();22                 int re = data.getIntData()*data.getIntData();23                 System.out.println(MessageFormat.format("{0}*{1}={2}", data.getIntData(),data.getIntData(),re));24                 25                 Thread.sleep(r.nextInt(SLEEPTIME));26             }27         } catch (Exception e) {28             // TODO: handle exception29             e.printStackTrace();30             Thread.currentThread().interrupt();31         }32         33     }34 35 }

  消费者、生产者之间的共享数据模型:

 1 public final class PCData { 2     private final int intData; 3  4     public PCData(int intData) { 5         this.intData = intData; 6     } 7      8     public PCData(String strData){ 9         this.intData = Integer.valueOf(strData);10     }11     12     public synchronized int getIntData() {13         return intData;14     }15 16     @Override17     public String toString() {18         return "data:" + intData;19     }20     21     22     23 }

  在客户端中,启动3个消费者和3个生产者,并让他们协作运行:

 1 public class Client { 2     public static void main(String[] args) throws InterruptedException { 3         BlockingQueue<PCData> queue = new LinkedBlockingQueue<PCData>(10);  4          5         Producer  p1 = new Producer(queue); 6         Producer  p2 = new Producer(queue); 7         Producer  p3 = new Producer(queue); 8          9         Consumer  c1 = new Consumer(queue);10         Consumer  c2 = new Consumer(queue);11         Consumer  c3 = new Consumer(queue);12         13         ExecutorService exe = Executors.newCachedThreadPool();14         exe.execute(p1);15         exe.execute(p2);16         exe.execute(p3);17         18         exe.execute(c1);19         exe.execute(c2);20         exe.execute(c3);21         22         Thread.sleep(10*1000);23         24         p1.stop();25         p2.stop();26         p3.stop();27         28         Thread.sleep(3000);29         exe.shutdown();30     }31 }    

  优点:生产者-消费者模式能很好的对生产者线程和消费者线程进行解耦,优化系统的整体结构。同时,由于缓冲区的存在,运行生产者和消费者在性能上存在一定的差异,从而一定程度上缓解了性能瓶颈对系统性能的影响。

  相关解决方案