Java如何关闭或关闭BlockingQueue?

在此示例中,您将学习如何BlockingQueue在队列中没有其他可用元素时关闭或关闭。我们将通过使生产者在“生产者-消费者”场景中发送标记对象来使用通用策略。此标记对象也称为有毒对象,将被视为队列中不再包含需要处理的对象的标志。这将使我们能够中断使用者线程的操作。

package org.nhooo.example.util.concurrent;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueShutdown {
    public static void main(String[] args) {
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(32);

        MyDataProducer producer = new MyDataProducer(queue);
        MyDataConsumer consumer = new MyDataConsumer(queue);

        new Thread(producer).start();
        new Thread(consumer).start();
    }
}

下面是Producer将数据放入队列的对象。字符串DONE是我们的标记对象。这是最后一个数据将被放置在队列中,以供消费者提取。

package org.nhooo.example.util.concurrent;

import java.util.concurrent.BlockingQueue;

public class MyDataProducer implements Runnable {
    BlockingQueue<String> queue;

    public MyDataProducer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        System.out.println("MyDataProducer.run");
        String[] data = {"D001", "D002", "D003", "D004", "D005", "DONE"};

        try {
            for (String element : data) {
                queue.put(element);
                Thread.sleep(1000);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

所述Consumer对象循环以从队列中检索的元素。当它从队列中检索标记对象时,它将中断循环并结束线程。

package org.nhooo.example.util.concurrent;

import java.util.concurrent.BlockingQueue;

public class MyDataConsumer implements Runnable {
    BlockingQueue<String> queue;

    public MyDataConsumer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        System.out.println("MyDataConsumer.run");

        while (true) {
            try {
                String element = queue.take();
                if ("DONE".equals(element)) {
                    System.out.println("Exiting consumer thread, " +
                            "end of data reached.");
                    break;
                }
                System.out.println("Element = " + element);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}