Java 使用线程中的BlockingQueue解决生产者消费者问题

Java 使用线程中的BlockingQueue解决生产者消费者问题

生产者消费者是Java并发和多线程中最常见的问题。 它在帮助管理多个尝试访问共享资源的线程时产生同步。本文将帮助我们找到使用Java线程中的BlockingQueue解决生产者消费者问题的方法。

生产者消费者问题和BlockingQueue

理解生产者消费者问题

生产者和消费者是使用共享队列的两个不同实体或进程。 该队列是固定大小的缓冲区。生产者生成信息片段并将其存储在队列中。 消费者消耗给定的信息并将其从队列中移除。

实际问题发生的情况

  • 当生产者即使缓冲区已满时仍然继续生成数据。

  • 当消费者尝试从缓冲区中删除数据,即使缓冲区为空时。

  • 生产者或消费者的速度较慢。

  • 两者同时尝试更新缓冲区。

解决方案

  • 当缓冲区已满时,生产者必须停止数据生成。

  • 当缓冲区为空时,消费者必须停止从缓冲区中删除信息。

  • 生产者和消费者仅在缓冲区既不为空也不为满时工作。

BlockingQueue

Java在’java.util.concurrent’包中提供了BlockingQueue接口。使用该队列的主要优点是,在检索和删除项时,它会等待队列变为非空。 同时,在添加项目时,它会等待可用空间。这个特性使它成为生产者消费者问题的完美选择。

语法

BlockingQueue< Type > nameOfObject = new LinkedBlockingQueue<>();

这里,LinkedBlockingQueue是一个实现BlockingQueue接口的类。

使用Java中的BlockingQueue的生产者消费者解决方案

方法

  • 创建两个类及其对应的构造函数。这两个类都将扩展’Thread’类。第一个类是用于生产者,第二个类是用于消费者。

  • 在两个类中,定义类型为’Integer’的BlockingQueue,并将其作为参数传递给构造函数。这里,’Integer’是一个包装类。

  • 在生产者类中,我们重写内置方法’run()’来从生产者的一端产生数据。现在,通过使用’put()’方法,对数据进行5次迭代,并将数据以1秒的间隔存储到BlockingQueue中。

  • 在消费者类中,再次重写内置方法’run()’以使用内置方法’take()’从消费者一端消费数据。

  • 在main()方法中,定义一个BlockingQueue对象,并将其作为参数传递给生产者和消费者类的构造函数。

示例

import java.util.concurrent.*;
class Producr extends Thread {
   protected BlockingQueue<Integer> blcque;
   Producr(BlockingQueue<Integer> blcque) { // constructor
      this.blcque = blcque;
   }
   public void run() { // overriding run method
      while (true) {
         for(int i = 1; i <= 5; i++) {
            try {
               System.out.println("Producer is running " + i);
               blcque.put(i); // to produce data
               // produce data with an interval of 1 sec
               Thread.sleep(1000);
            }
           // to handle exception
           catch (InterruptedException exp) {
              System.out.println("An interruption occurred at Producer");
            }
         }
      }
   }
}
class Consumr extends Thread {
   protected BlockingQueue<Integer> blcque;
   Consumr(BlockingQueue<Integer> blcque) { // constructor
      this.blcque = blcque;
   }
   public void run() { // overriding run method
      try {
         while (true) {
            Integer elem = blcque.take(); // to consume data
            System.out.println("Consumer is running " + elem);
         }
      }
      // to handle exception
      catch (InterruptedException exp) {
         System.out.println("An interruption occurred at Producer");
      }
   }
}
public class Solution {
   public static void main(String[] args) throws InterruptedException {
      // create an object of BlockingQueue
      BlockingQueue<Integer> bufrShr = new LinkedBlockingQueue<>();

      // passing object of BlockingQueue as arguments
      Producr threadProd = new Producr(bufrShr);
      Consumr threadCon = new Consumr(bufrShr);

      // to start the process
      threadProd.start();
      threadCon.start();

      // to exit the process after 5 sec
      Thread.sleep(5000);
      System.exit(0);
   }
}

输出

Producer is running 1
Consumer is running 1
Producer is running 2
Consumer is running 2
Producer is running 3
Consumer is running 3
Producer is running 4
Consumer is running 4
Producer is running 5
Consumer is running 5

结论

我们从定义生产者消费者问题开始本文,在下一节中,我们通过引入BlockingQueue接口提出了可能的解决方案。最后,我们讨论了一个Java程序,展示了我们如何实际使用BlockingQueue来解决给定的问题。

Camera课程

Python教程

Java教程

Web教程

数据库教程

图形图像教程

办公软件教程

Linux教程

计算机教程

大数据教程

开发工具教程