Java 生产者-消费者

示例

生产者-消费者问题解决方案的简单示例。请注意,JDK类(AtomicBoolean和BlockingQueue)用于同步,这减少了创建无效解决方案的机会。有关各种类型的BlockingQueue,请咨询Javadoc。选择不同的实现可能会大大改变此示例的行为(例如DelayQueue或Priority Queue)。

public class Producer implements Runnable {

    private final BlockingQueue<ProducedData> queue;

    public Producer(BlockingQueue<ProducedData> queue) {
       this.queue= queue;
    }

    public void run() {
        int producedCount = 0;
        try {
            while (true) {
                producedCount++;
                //线程中断时,put抛出InterruptedException
                queue.put(new ProducedData());
            }
        } catch (InterruptedException e) {
            // 线程已被中断:清理并退出
            producedCount--;
            //如果需要更高的中断标志,请重新中断线程
            Thread.currentThread().interrupt();
        }
        System.out.println("Produced " + producedCount + " objects");
    }
}

public class Consumer implements Runnable {

    private final BlockingQueue<ProducedData> queue;

    public Consumer(BlockingQueue<ProducedData> queue) {
       this.queue= queue;
    }

    public void run() {
        int consumedCount = 0;
        try {
            while (true) {
                //线程中断时,put抛出InterruptedException
                ProducedData data = queue.poll(10, TimeUnit.MILLISECONDS);
                // 处理数据
                consumedCount++;
            }
        } catch (InterruptedException e) {
            // 线程已被中断:清理并退出
            consumedCount--;
            //如果需要更高的中断标志,请重新中断线程
            Thread.currentThread().interrupt();
        }
        System.out.println("Consumed " + consumedCount + " objects");
    }
}


public class ProducerConsumerExample {
    static class ProducedData {    
        // 空数据对象
    }

    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<ProducedData> queue = new ArrayBlockingQueue<ProducedData>(1000);
        // 选择队列确定实际行为:请参阅各种BlockingQueue实现

        Thread producer = new Thread(new Producer(queue));
        Thread consumer = new Thread(new Consumer(queue));

        producer.start();
        consumer.start();

        Thread.sleep(1000);
        producer.interrupt();
        Thread.sleep(10);
        consumer.interrupt();
    }
}