Java中的BlockingQueue接口
Java中的BlockingQueue
接口是Java5中的一个重要类,它是用来实现阻塞队列的接口,可以让我们实现线程安全的队列。我们在进行多线程编程、生产消费模式的时候,为了保证数据的一致性和并发执行的效率,通常使用阻塞队列。
什么是阻塞队列?
阻塞队列是指在数据存储空间满时,插入线程进行等待,或者是在数据存储空间为空时,取出线程进行等待的队列。这种队列的主要特点就是当队列为空时,获取元素的线程会阻塞等待队列中有新的元素;当队列满时,插入元素的线程会阻塞等待队列中有空的元素。
阻塞队列的实现主要是利用Lock
接口中的Condition
条件来实现的,通过await()
和signal()
方法来进行线程的等待和唤醒。
BlockingQueue的实现机制
BlockingQueue
接口有三个重要实现类:
ArrayBlockingQueue
LinkedBlockingQueue
SynchronousQueue
ArrayBlockingQueue
ArrayBlockingQueue
是一个基于数组实现的有界阻塞队列,内部维护了一个定长数组,数组的大小在创建时指定,之后大小不能更改。
下面是一个简单的示例代码:
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
queue.put("a");
queue.put("b");
queue.put("c");
String ele = queue.take();
System.out.println(ele);
在这个例子中,我们先创建了一个有界阻塞队列,大小为10,然后尝试往队列中放入三个元素a
、b
、c
,最后从队列中取出一个元素,并输出它。如果队列为空,则取出元素的线程会阻塞等待队列中有元素。
LinkedBlockingQueue
LinkedBlockingQueue
是一个基于链表实现的无界阻塞队列,内部维护了一个链表,其大小默认为Integer.MAX_VALUE
。
下面是一个简单的示例代码:
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
queue.put("a");
queue.put("b");
queue.put("c");
String ele = queue.take();
System.out.println(ele);
在这个例子中,我们先创建了一个无界阻塞队列,然后尝试往队列中放入三个元素a
、b
、c
,最后从队列中取出一个元素,并输出它。如果队列为空,则取出元素的线程会阻塞等待队列中有元素。
SynchronousQueue
SynchronousQueue
是一个不存储元素的阻塞队列,每个put
操作必须等待一个take
操作,否则无法添加元素。在同步队列中,没有任何存储元素的空间,因此当然也就不能插入元素。在同步队列种,每一个插入操作都要等待另一个线程的移除操作,所以它的吞吐量相对较小。
下面是一个简单的示例代码:
BlockingQueue<String> queue = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println("添加元素");
queue.put("a");
System.out.println("添加完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
System.out.println("获取元素");
String ele = queue.take();
System.out.println("获取完成:" + ele);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
在这个例子中,我们创建了一个不存储元素的阻塞队列SynchronousQueue
,然后在两个线程中分别执行了往队列中添加元素和从队列中获取元素的操作。由于SynchronousQueue
是一个无法存储元素的队列,因此只有当一个线程执行了put()
方法之后,另一个线程才能够执行take()
方法。
BlockingQueue的常用方法
BlockingQueue
接口提供了一些常用的方法,用于队列的添加、获取、删除和判断等操作。下面我们来介绍一下这些方法:
put(E e)
:向队列末尾添加一个元素,如果队列已满,则线程会等待队列中有空间。该操作是阻塞式的。take()
:从队列头部获取一个元素,如果队列为空,则线程会等待队列中有元素。该操作是阻塞式的。poll(long timeout, TimeUnit unit)
:从队列头部获取一个元素,如果队列为空,则线程会等待一段时间,如果等待时间超时,则返回null
。该操作是阻塞式的。add(E e)
:向队列末尾添加一个元素,如果队列已满,则抛出IllegalStateException
异常。offer(E e)
:向队列末尾添加一个元素,如果队列已满,则返回false
。offer(E e, long timeout, TimeUnit unit)
:向队列末尾添加一个元素,如果队列已满,则等待一定时间,如果等待时间超时,则返回false
,否则返回true
。peek()
:获取队列头部的元素,但不将其从队列中移除,如果队列为空,则返回null
。poll()
:从队列头部获取一个元素,如果队列为空,则返回null
。remove()
:获取并移除队列头部的元素,如果队列为空,则抛出NoSuchElementException
异常。contains(Object o)
:判断队列中是否包含指定元素,如果包含,则返回true
,否则返回false
。
示例代码
下面是一个示例代码,用于展示BlockingQueue
的使用:
public class BlockingQueueDemo {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
queue.put(i);
System.out.println(Thread.currentThread().getName() + "生产:" + i);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "生产者线程").start();
new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
Integer ele = queue.take();
System.out.println(Thread.currentThread().getName() + "消费:" + ele);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "消费者线程").start();
}
}
在这个例子中,我们创建了一个容量为10的阻塞队列LinkedBlockingQueue
,并启动了两个线程,一个线程用于生产数据,另一个线程用于消费数据。通过在put()
方法和take()
方法处添加Thread.sleep()
方法,我们模拟了生产和消费的过程。
结论
BlockingQueue
接口提供了一种实现线程安全队列的方式,可以用于实现多线程编程中的生产消费模式。同时,由于BlockingQueue
接口提供的方法都是线程安全的,因此在多线程环境下使用时,无需担心数据的不一致性和并发执行的效率问题。