当前位置: 代码迷 >> java >> 何时以及如何使用wait()和notify() 制片人 消费者
  详细解决方案

何时以及如何使用wait()和notify() 制片人 消费者

热度:7   发布时间:2023-07-31 11:12:08.0

我从SO找到了这个例子。 现在,我试图了解wait()notify()/notifyAll()的用法。 在哪种情况下以及为什么需要这样做。

class BlockingQueue<T> {

    private Queue<T> queue = new LinkedList<T>();
    private int capacity;

    public BlockingQueue(int capacity) {
        this.capacity = capacity;
    }

    public synchronized void put(T element) throws InterruptedException {
        while (queue.size() == capacity) {
             System.out.println("Waiting...");
            wait();

        }

        queue.add(element);
        notify(); // notifyAll() for multiple producer/consumer threads
    }

    public synchronized T take() throws InterruptedException {
        while (queue.isEmpty()) {
            wait();
        }

        T item = queue.remove();
        notify(); // notifyAll() for multiple producer/consumer threads
        return item;
    }
}

因此,实现了Runnable和重写的run()方法,如下所示

 @Override
    public void run() {
        // synchronized (this) {
        BlockingQueue<Integer> s = new BlockingQueue(10);
        for (int i = 0; i < 12; i++) {
            try {
                s.put(i);
                if (i > 9) {
                    System.out.println(Thread.currentThread().getName() + "  : " + s.take());
                }
                System.out.println(Thread.currentThread().getName() + " ExtendsThread : Counter : " + i);
            } //}
            //notify();
            catch (InterruptedException ex) {
                Logger.getLogger(ExtendsThread.class.getName()).log(Level.SEVERE, null, ex);
            }

        }

    }

并且,如下运行线程

 ImplementsRunnable rc = new ImplementsRunnable();
        Thread t1 = new Thread(rc, "A");
        t1.start();

当我运行它时,它在counter : 9之后卡住,并一直等待着永远。 有人建议我这是怎么了?

您的概念有些瑕疵。 BlockingQueue可以充当生产者/消费者模式中的桥梁。

也就是说,它允许一个线程向其中写入内容,而另一个线程从中读取内容,但是这样做的前提是:

  • 没有要取的物品,它会等到新物品到达
  • 如果物品太多,它将等待物品被移除

在这种情况下, waitnotifyBlockingQueue实例的内部消息传递

您可以查看“ 。

因此,您应该使用(至少)两个,而不是仅使用一个线程,一个农产品和一个消费者...

制片人

这需要一个BlockingQueue实例,并向其中添加int值。 每次停止1秒钟,然后添加下一个

public class Producer implements Runnable {

    private BlockingQueue<Integer> queue;

    public Producer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        for (int index = 0; index < 10; index++) {
            try {
                System.out.println("Put " + index);
                queue.put(index);
                Thread.sleep(1000);
            } catch (InterruptedException ex) {
            }
        }
    }

}

消费者

使用者使用BlockQueue并从中读取int值,该值将被阻塞直到值存在。

public class Consumer implements Runnable {

    private BlockingQueue<Integer> queue;

    public Consumer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                Integer value = queue.take();
                System.out.println("Took " + value);
            }
        } catch (InterruptedException ex) {
            Logger.getLogger(JavaApplication220.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

}

您可以使用类似...

BlockingQueue bq = new BlockingQueue(10);
Thread p = new Thread(new Producer(bq));
Thread c = new Thread(new Consumer(bq));
c.setDaemon(true);
c.start();
p.start();

您应该注意, put消息之间的延迟很小,但是took消息之间几乎没有延迟。 这是正在执行的队列。 Consumer正在阻止/等待队列中的东西给它。

您可以与“ Producer和“ Consumer一起玩耍,也许可以改变他们的时间(例如,在拿一件物品之前,“ Consumer会有更长的延迟),以了解这可能会带来怎样的不同影响

当我运行它时,它在计数器9之后停滞不前,并一直等待着永远

这可能是因为您已经超出了队列的容量,并且put方法一直处于阻塞状态,直到您从队列中取出某些东西为止(您实际上有一个死锁,队列正在等待您从中取出某些东西,但是您可以计算负担,因为你锁定在put

要记住的事情:

  • 为了使两个或多个线程可以使用监视器锁,它们必须共享相同的监视器/对象锁实例。 在这种情况下, BlockingQueue的相同实例
  • notify将唤醒一个在监视器锁的wait方法的同一实例上等待的对象。 没有办法知道哪一个。 如果您有多个使用者,但不关心数据的处理顺序,例如,这可能很有用。

更新了其他示例

因此,这将Thread.sleepProducer Thread.sleep (并允许生产者产生100个值),并将Thread.sleep添加到Consumer

这样, Producer将在Consumer耗尽其容量之前达到其容量,迫使其等待直到Consumer可以从中获取价值为止。

public class Producer implements Runnable {

    private BlockingQueue<Integer> queue;

    public Producer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        for (int index = 0; index < 100; index++) {
            try {
                System.out.println("Put " + index);
                queue.put(index);
            } catch (InterruptedException ex) {
            }
        }
    }

}

public class Consumer implements Runnable {

    private BlockingQueue<Integer> queue;

    public Consumer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                Integer value = queue.take();
                System.out.println("Took " + value);
                Thread.sleep(1000);
            }
        } catch (InterruptedException ex) {
        }
    }

}

在此处添加printlns

public synchronized void put(T element) throws InterruptedException {
    while (queue.size() == capacity) {
        System.out.println("blocked");
        wait();
    }
    queue.add(element);
    notify(); // notifyAll() for multiple producer/consumer threads
    System.out.println("put "+ element);
}

public synchronized T take() throws InterruptedException {
    while (queue.isEmpty()) {
        wait();
    }
    T item = queue.remove();
    notify(); // notifyAll() for multiple producer/consumer threads
    System.out.println("removed " + item);
    return item;
}

并运行此测试

public static void main(String argv[]) throws Exception {
    final BlockingQueue q = new BlockingQueue(2);
    new Thread() {
        public void run() {
            try {
                Thread.sleep(5000);
                q.take();
            } catch (Exception e) {
                e.printStackTrace();
            }
        };
    }.start();
    q.put(1);
    q.put(2);  // will block here until tread 2 takes an element and reduces the capacity
    q.put(3);   
}

它将打印

put 1
put 2
blocked
removed 1
put 3
  相关解决方案