Java 使用线程中的BlockingQueue解决生产者消费者问题
生产者消费者是Java并发和多线程中最常见的问题。 它在帮助管理多个尝试访问共享资源的线程时产生同步。本文将帮助我们找到使用Java线程中的BlockingQueue解决生产者消费者问题的方法。
生产者消费者问题和BlockingQueue
理解生产者消费者问题
生产者和消费者是使用共享队列的两个不同实体或进程。 该队列是固定大小的缓冲区。生产者生成信息片段并将其存储在队列中。 消费者消耗给定的信息并将其从队列中移除。
实际问题发生的情况
- 当生产者即使缓冲区已满时仍然继续生成数据。
-
当消费者尝试从缓冲区中删除数据,即使缓冲区为空时。
-
生产者或消费者的速度较慢。
-
两者同时尝试更新缓冲区。
解决方案
-
当缓冲区已满时,生产者必须停止数据生成。
-
当缓冲区为空时,消费者必须停止从缓冲区中删除信息。
-
生产者和消费者仅在缓冲区既不为空也不为满时工作。
BlockingQueue
Java在’java.util.concurrent’包中提供了BlockingQueue接口。使用该队列的主要优点是,在检索和删除项时,它会等待队列变为非空。 同时,在添加项目时,它会等待可用空间。这个特性使它成为生产者消费者问题的完美选择。
语法
BlockingQueue< Type > nameOfObject = new LinkedBlockingQueue<>();
这里,LinkedBlockingQueue是一个实现BlockingQueue接口的类。
使用Java中的BlockingQueue的生产者消费者解决方案
方法
- 创建两个类及其对应的构造函数。这两个类都将扩展’Thread’类。第一个类是用于生产者,第二个类是用于消费者。
-
在两个类中,定义类型为’Integer’的BlockingQueue,并将其作为参数传递给构造函数。这里,’Integer’是一个包装类。
-
在生产者类中,我们重写内置方法’run()’来从生产者的一端产生数据。现在,通过使用’put()’方法,对数据进行5次迭代,并将数据以1秒的间隔存储到BlockingQueue中。
-
在消费者类中,再次重写内置方法’run()’以使用内置方法’take()’从消费者一端消费数据。
-
在main()方法中,定义一个BlockingQueue对象,并将其作为参数传递给生产者和消费者类的构造函数。
示例
import java.util.concurrent.*;
class Producr extends Thread {
protected BlockingQueue<Integer> blcque;
Producr(BlockingQueue<Integer> blcque) { // constructor
this.blcque = blcque;
}
public void run() { // overriding run method
while (true) {
for(int i = 1; i <= 5; i++) {
try {
System.out.println("Producer is running " + i);
blcque.put(i); // to produce data
// produce data with an interval of 1 sec
Thread.sleep(1000);
}
// to handle exception
catch (InterruptedException exp) {
System.out.println("An interruption occurred at Producer");
}
}
}
}
}
class Consumr extends Thread {
protected BlockingQueue<Integer> blcque;
Consumr(BlockingQueue<Integer> blcque) { // constructor
this.blcque = blcque;
}
public void run() { // overriding run method
try {
while (true) {
Integer elem = blcque.take(); // to consume data
System.out.println("Consumer is running " + elem);
}
}
// to handle exception
catch (InterruptedException exp) {
System.out.println("An interruption occurred at Producer");
}
}
}
public class Solution {
public static void main(String[] args) throws InterruptedException {
// create an object of BlockingQueue
BlockingQueue<Integer> bufrShr = new LinkedBlockingQueue<>();
// passing object of BlockingQueue as arguments
Producr threadProd = new Producr(bufrShr);
Consumr threadCon = new Consumr(bufrShr);
// to start the process
threadProd.start();
threadCon.start();
// to exit the process after 5 sec
Thread.sleep(5000);
System.exit(0);
}
}
输出
Producer is running 1
Consumer is running 1
Producer is running 2
Consumer is running 2
Producer is running 3
Consumer is running 3
Producer is running 4
Consumer is running 4
Producer is running 5
Consumer is running 5
结论
我们从定义生产者消费者问题开始本文,在下一节中,我们通过引入BlockingQueue接口提出了可能的解决方案。最后,我们讨论了一个Java程序,展示了我们如何实际使用BlockingQueue来解决给定的问题。