블로킹 큐(Blocking Queue)legacy/Operating System2024. 10. 5. 21:19
Table of Contents
블로킹 큐(Blocking Queue)는 동기화와 스레드 안전성을 보장하는 자료구조이다. 주로 멀티 스레드 환경에서 사용되며, 생산자-소비자 패턴과 같은 문제를 해결할 때 유용하다.
블로킹 큐의 동작 방식
블로킹 큐는 큐가 비어 있거나 가득 찼을 때, 큐에 데이터를 넣거나 빼려는 스레드가 일정 조건을 만족할 때까지 대기 상태(블로킹)에 들어간다. 이를 통해 스레드 간의 안전한 데이터 공유가 가능하다.
- 삽입 시 대기
- 큐가 가득 찬 경우, 데이터를 삽입하려는 스레드는 큐에 빈 공간이 생길 때까지 대기한다.
- 다른 스레드가 큐에 데이터를 소비해서 자리가 생기면, 삽입하려는 스레드는 대기 상태에서 깨어나고 데이터를 큐에 넣을 수 있다.
- 삭제 시 대기
- 큐가 비어 있는 경우, 데이터를 꺼내려는 스레드는 큐에 새로운 데이터가 삽입될 때까지 대기한다.
- 다른 스레드가 큐에 데이터를 넣으면, 대기 중이던 스레드는 깨어나 데이터를 큐에서 꺼낸다.
블로킹 큐의 특징
- 동기화
- 블로킹 큐는 내부적으로 동기화를 지원한다. 여러 스레드가 동시에 큐에 접근할 때 데이터의 일관성을 보장하고, 스레드가 안전하게 데이터를 처리할 수 있도록 설계되어 있다.
- 예를 들어, put() 메서드를 호출하는 스레드는 큐가 가득 차 있으면 블로킹되고, take() 메서드를 호출하는 스레드는 큐가 비어 있으면 블로킹된다.
- 스레드 안전성
- 블로킹 큐는 멀티스레드 환경에서 스레드 안전성을 보장한다. 여러 스레드가 동시에 큐에 접근해도 데이터의 손상이나 경쟁 조건이 발생하지 않도록 설계되어 있다.
- 비동기 작업
- 블로킹 큐는 주로 생산자-소비자 패턴에서 비동기적인 작업을 지원하는 데 사용된다. 생상자는 데이터를 큐에 추가하고, 소비자는 큐에서 데이터를 가져가는 방식으로 비동기적 통신을 할 수 있다.
자바에서 블로킹 큐
자바에서는 java.util.concurrent 패키지에서 제공하는 다양한 블로킹 큐 구현체를 사용할 수 있다.
- ArrayBlockingQueue
- 고정 크기의 블로킹 큐로, 객체 생성 시에 크기를 지정해야 한다.
- LinkedBlockingQueue
- 링크드 리스트 기반의 블로킹 큐이다.
- PriorityBlockingQueue
- 우선순위를 기반으로 한 블로킹 큐이다. FIFO 방식이 아닌 요소들의 우선순위에 따른 데이터를 정렬하고, 가장 높은 우선순위의 데이터가 먼저 처리된다.
- SynchronousQueue
- 큐의 크기가 0인 블로킹 큐이다. 데이터를 생산하는 스레드가 데이터를 넣으려고 할 때 반드시 데이터를 소비하는 스레드가 데이터를 동시에 가져가야 한다. 그렇지 않으면 생산자를 블로킹 상태가 된다.
블로킹 큐와 논블로킹 큐의 차이점
- 블로킹 큐
- 큐가 가득 차거나 비어 있을 때, 큐에 데이터를 넣거나 빼려는 스레드는 대기(블로킹) 상태에 들어간다. 예를 들어, put() 메서드는 큐에 빈 공간이 없으면 대기하고, take() 메서드는 큐에 데이터가 없으면 대기한다.
- 논블로킹 큐
- 스레드가 큐에 데이터를 넣거나 빼려고 할 때, 즉시 실패 여부를 알린다. 대기 상태에 들어가지 않고, 성공하거나 실패 여부를 바로 반환한다. 예를 들어, offer() 메서드는 데이터를 넣을 수 없으면 즉시 실패를 반환한다.
생산자-소비자 문제
블로킹 큐는 생산자-소비자 문제를 해결하는 데 유용하다. 생산자가 데이터를 생성하여 큐에 넣고, 소비자는 큐에서 데이터를 꺼내 처리한다. 블로킹 큐는 생산자와 소비자 사이의 동기화를 보장한다.
블로킹 큐 예제 코드
- 소비자
- take() 메서드는 큐에 소비할 데이터가 없으면 대기한다. 생산자에 의해 큐에 데이터가 삽입되면 대기 중인 소비자는 해당 데이터를 소비한다.
// 소비자는 큐에 저장된 데이터를 소비한다.
public class Consumer implements Runnable {
private BlockingQueue<Integer> que;
public Consumer(BlockingQueue<Integer> que) {
this.que = que;
}
@Override
public void run() {
try {
for(int i = 0; i < 10; i++) {
int data = que.take(); // take() : 큐가 비어 있으면 대기 상태로 들어간다.
System.out.println("Consumed: " + data);
Thread.sleep(150);
}
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
- 생산자
- put() 메서드는 큐가 가득 찬 경우 대기한다. 소비자에 의해 큐에 담긴 데이터가 소비되면 대기 중인 생산자는 생산된 데이터를 삽입한다.
// 생산자는 데이터를 생산하여 큐에 저장한다.
public class Producer implements Runnable {
private BlockingQueue<Integer> que;
public Producer(BlockingQueue<Integer> que) {
this.que = que;
}
@Override
public void run() {
try {
for(int i = 0; i < 10; i++) {
// put() : 큐에 데이터가 가득 찬 경우 대기 상태로 들어간다.
que.put(i);
System.out.println("Produced: " + i);
Thread.sleep(100);
}
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
- Main
블로킹 큐에서는 생산자와 소비자가 데이터 삽입 또는 소비에 성공할 때까지 대기한다.
public class Main {
public static void main(String[] args) {
// 크기가 5인 Blocking Queue 생성
BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(5);
Producer producer = new Producer(blockingQueue);
Consumer consumer = new Consumer(blockingQueue);
Thread producerThread = new Thread(producer);
Thread consumerThread = new Thread(consumer);
producerThread.start();
consumerThread.start();
}
}
생산자(Producer)는 큐에 데이터가 가득 차면 대기하고, 자리가 생기면 데이터를 삽입한다.
소비자(Consumer)는 큐에 데이터가 없으면 대기하고, 데이터가 추가되면 데이터를 소비한다.
Produced: 0
Consumed: 0
Produced: 1
Consumed: 1
Produced: 2
Consumed: 2
Produced: 3
Produced: 4
Consumed: 3
Produced: 5
Consumed: 4
Produced: 6
Produced: 7
Consumed: 5
Produced: 8
Consumed: 6
Produced: 9
Consumed: 7
Consumed: 8
Consumed: 9
논블로킹 큐 예제 코드
- 소비자
- poll() 메서드는 큐가 비어 있는 경우 즉시 실패한다. poll(long, TimeUnit) 메서드는 주어진 시간 동안 큐가 비어 있으면 대기한 후, 해당 시간 내에 데이터가 들어오지 않으면 null을 반환한다. 예제에서는 대기 시간을 0초로 설정하여 데이터 소비에 실패하면 대기하지 않도록 하였다.
public class Consumer implements Runnable{
private BlockingQueue<Integer> que;
public Consumer(BlockingQueue<Integer> que) {
this.que = que;
}
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
Integer data = que.poll(0, TimeUnit.SECONDS);
if(data != null) {
System.out.println("Consumed: " + data);
} else {
System.out.println("큐가 비어 있어서 데이터 소비에 실패하였습니다.");
}
}
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
- 생산자
- offer() 메서드는 큐가 가득찬 경우 즉시 실패한다. offer(T, long, TimeUnit) 메서드는 주어진 시간 동안 큐가 가득 차 있으면 대기한 후, 해당 시간 내에 빈 공간이 생기지 않으면 false를 반환한다. 예제에서는 대기 시간을 0초로 하여 데이터 삽입에 실패한 경우 대기하지 않도록 하였다.
public class Producer implements Runnable {
private BlockingQueue<Integer> que;
public Producer(BlockingQueue<Integer> que) {
this.que = que;
}
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
boolean isOffered = que.offer(i, 0, TimeUnit.SECONDS);
if (isOffered) {
System.out.println("Produced: " + i);
} else {
System.out.println("큐에 데이터 삽입을 실패하였습니다.");
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
- Main
논블로킹 큐에서 생산자와 소비자는 데이터 생산 및 소비에 실패한다면 대기하지 않고 실패 처리한다.
public class Main {
public static void main(String[] args) {
// 크기가 5인 Blocking Queue 생성
BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(5);
Producer producer = new Producer(blockingQueue);
Consumer consumer = new Consumer(blockingQueue);
Thread producerThread = new Thread(producer);
Thread consumerThread = new Thread(consumer);
producerThread.start();
consumerThread.start();
}
}
생산자는 큐에 데이터가 가득찬 경우 대기하지 않고 실패 처리한다.
소비자는 큐가 비어 있는 경우 대기하지 않고 실패 처리한다.
큐가 비어 있어서 데이터 소비에 실패하였습니다.
Consumed: 0
큐가 비어 있어서 데이터 소비에 실패하였습니다.
큐가 비어 있어서 데이터 소비에 실패하였습니다.
큐가 비어 있어서 데이터 소비에 실패하였습니다.
큐가 비어 있어서 데이터 소비에 실패하였습니다.
큐가 비어 있어서 데이터 소비에 실패하였습니다.
큐가 비어 있어서 데이터 소비에 실패하였습니다.
큐가 비어 있어서 데이터 소비에 실패하였습니다.
큐가 비어 있어서 데이터 소비에 실패하였습니다.
Produced: 0
Produced: 1
Produced: 2
Produced: 3
Produced: 4
Produced: 5
큐에 데이터 삽입을 실패하였습니다.
큐에 데이터 삽입을 실패하였습니다.
큐에 데이터 삽입을 실패하였습니다.
큐에 데이터 삽입을 실패하였습니다.