当前位置: 代码迷 >> Web前端 >> 生产者消费者方式
  详细解决方案

生产者消费者方式

热度:370   发布时间:2013-10-23 11:39:13.0
生产者消费者模式

?生产者消费者 环形缓冲模式

?---------------------------------------------------------

/**

?* 环形缓冲区

?* @author fengbin

?*

?*/

public class CircularBuf {

?

? ?int NMAX = 1000;

?

? ?int iput = 0; // 环形缓冲区的当前放人位置

?

? ?int iget = 0; // 缓冲区的当前取出位置

?

? ?int n = 0; // 环形缓冲区中的元素总数量

?

? ?Object buffer[];

? ?

? ?public CircularBuf() {

? ? ? ?super();

? ? ? ?buffer = new Object[NMAX];

? ?}

?

? ?public CircularBuf(int nmax) {

? ? ? ?super();

? ? ? ?NMAX = nmax;

? ? ? ?buffer = new Object[NMAX];

? ?}

?

? ?/*

? ? * 环形缓冲区的地址编号计算函数,,如果到达唤醒缓冲区的尾部,将绕回到头部。

? ? *?

? ? * 环形缓冲区的有效地址编号为:0到(NMAX-1)

? ? *?

? ? */

? ?public synchronized int addring(int i) {

? ? ? ?return (i + 1) == NMAX ? 0 : i + 1;

? ?}

?

? ?/* 从环形缓冲区中取一个元素 */

? ?public synchronized Object get() {

? ? ? ?int pos;

? ? ? ?

? ? ? ? System.out.println("------------数量:"+n);

? ? ? ?

? ? ? ? ? ?if (n > 0) {

? ? ? ? ? ?

? ? ? ? ? ? ? ?pos = iget;

? ? ? ? ? ? ? ?iget = addring(iget);

? ? ? ? ? ? ? ?n--;

? ? ? ? ? ? ? ?// System.out.println("get-->" + buffer[pos]);

? ? ? ? ? ? ? ?notifyAll();

? ? ? ? ? ? ? ?return buffer[pos];

?

? ? ? ? ? ?} else {

? ? ? ? ? ? ? ?// System.out.println("Buffer is Empty");

? ? ? ? ? ? ? ?try {

? ? ? ? ? ? ? ? ? ?wait();

? ? ? ? ? ? ? ?} catch (InterruptedException e) {

? ? ? ? ? ? ? ? ? ?// TODO Auto-generated catch block

? ? ? ? ? ? ? ? ? ?e.printStackTrace();

? ? ? ? ? ? ? ?}

?

? ? ? ? ? ?}

? ? ? ?return null;

?

? ?}

?

? ?/* 向环形缓冲区中放人一个元素 */

? ?public synchronized void put(Object z) {

?

? ? ? ? ? ?if (n < NMAX) {

? ? ? ? ? ? ? ?buffer[iput] = z;

? ? ? ? ? ? ? ?

? ? ? ? ? ? ? ?System.out.println("put<--" + buffer[iput]);

? ? ? ? ? ? ? ?

? ? ? ? ? ? ? ?iput = addring(iput);

? ? ? ? ? ? ? ?n++;

? ? ? ? ? ? ? ?notifyAll();

? ? ? ? ? ?} else {

? ? ? ? ? ? ? ? System.out.println("Buffer is full");

? ? ? ? ? ? ? ?try {

? ? ? ? ? ? ? ? ? ?wait();

? ? ? ? ? ? ? ? ? ?put(z);//如果满了后,重新执行

? ? ? ? ? ? ? ? ? ?System.out.println("rerun !");

? ? ? ? ? ? ? ?} catch (InterruptedException e) {

? ? ? ? ? ? ? ? ? ?// TODO Auto-generated catch block

? ? ? ? ? ? ? ? ? ?e.printStackTrace();

? ? ? ? ? ? ? ?}

? ? ? ? ? ?}

?

? ?}

?

}

?

?

-----------------------------------------------------------------

?

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

?

import org.apache.commons.lang.StringUtils;

?

/**

?* 环形缓冲区 测试

?* @author fengbin

?*

?*/

public class CircularBufTest {

?

?

CircularBuf circularBuf = new CircularBuf(2);

?

class Producer implements Runnable{

?

String apple = "";

Producer(String str){

apple = str;

}

?

@Override

public void run() {

for(int j=0;j<5;j++){

circularBuf.put(apple+j);

System.out.println("!生产:"+apple+j+"结束");

}

?

}

}

?

class Consumer implements Runnable{

?

Consumer(){

}

?

@Override

public void run() {

while(true){

String str=(String) circularBuf.get();

if(StringUtils.isEmpty(str)){

continue;

}

System.out.println("#消费:"+str+"结束");

?

try {

Thread.sleep(500);

} catch (Exception e) {

System.out.println(e);

}

}

}

?

?

}

?

public void test() {

?

ExecutorService service = Executors.newCachedThreadPool();

Producer producer = new Producer("P");

Consumer consumer = new Consumer();

service.submit(producer);

service.submit(consumer);

}

?

?

?

public static void main(String[] args) {

?

CircularBufTest test = new CircularBufTest();

test.test();

?

}

?

}

?=================================================

使用阻塞队列实现的生产者和消费者模式.

?

public class TestProducterConsumer {

?

class Producter extends Thread {

Queue q;

?

Producter(Queue q) {

this.q = q;

}

?

public void run() {

for (int i = 0; i < 10; i++) {

q.put(i);

System.out.println("producter :" + i);

}

}

}

?

class Consumer extends Thread{

Queue q;

Consumer(Queue q) {

this.q = q;

}

?

public void run() {

while (true) {

System.out.println("Consumer:" + q.get());

}

}

}

?

class Queue {

int value;

boolean bFull = false;

?

public synchronized void put(int i) {

if (!bFull) {

value = i;

bFull = true;

notify();// 必须用在synchronized

}

try {

wait();// 必须捕获异常

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

?

public synchronized int get() {

if (!bFull)

try {

wait();

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

bFull = false;

notify();

return value;

}

?

}

?

public static void main(String[] args) {

TestProducterConsumer con = new TestProducterConsumer();

con.test();

?

}

?

private void test() {

Queue q = new Queue();

Producter p = new Producter(q);

Consumer c = new Consumer(q);

p.start();

c.start();

}

?

}

?

  相关解决方案