패턴 명칭
Producer-Consumer
필요한 상황
어떤 데이터가 생성되면, 그 생성된 데이터를 받아 처리하는 상황에서 멀티 스레드를 활용하여 처리량과 속도를 늘리고자할 적용할 수 있는 패턴이다. 데이터를 생성하는 스레드와 이를 처리하는 스레드를 분리할 수 있으며 각각은 1개 이상의 스레드로 구성될 수 있다. 데이터를 생성하는 스레드를 Producer, 데이터를 처리하는 스레드를 Consumer이다. Producer는 처리하고자 하는 데이터를 생성해 저장소에 저장해 두면 Consumer는 이 저장소에서 데이터를 받아 처리한다. 여기서 저장소는 Channel이라고 한다. Channel은 데이터를 저장할 수 있는 물리적 한계가 있으며, Producer는 이 물리적 한계 내에서 데이터를 저장하고 저장할 수 있는 공간이 없을 경우 대기한다. Consumer는 Channel에 저장된 데이터를 가져와 처리하는데, 만약 Channel에 데이터가 없을 경우 대기한다.
예제 코드
위의 클래스 다이어그램에서 언급된 클래스들은 앞서 설명한 내용과 같다. 먼저 이 클래스들을 사용하는 예제 코드는 다음과 같다.
package tstThread; public class Main { public static void main(String[] args) { Channel channel = new Channel(10); new Producer("PRODUCER-1", channel).start(); new Producer("PRODUCER-2", channel).start(); new Consumer("CONSUMER-A", channel).start(); new Consumer("CONSUMER-B", channel).start(); new Consumer("CONSUMER-C", channel).start(); new Consumer("CONSUMER-D", channel).start(); } }
Producer 스레드는 2개, Consumer 스레드는 4개를 구성시킨다. Channel에 저장할 수 있는 데이터의 최대 용량은 10으로 지정했다. 이 예제에서 Producer가 생성하는 데이터는 문자열이며, Producer 클래스는 다음과 같다.
package tstThread; public class Producer extends Thread { private final Channel channel; private int number = 0; public Producer(String name, Channel channel) { super(name); this.channel = channel; } public void run() { try { while(true) { //Thread.sleep(200); String data = "[DATA-" + number + "@" + getName() + "]"; channel.put(data); number++; } } catch(InterruptedException e) { //. } } }
생성된 데이터를 받아 처리하는 Consumer 클래스는 다음과 같은데, 받은 문자열 데이터를 조합한 새로운 문자열을 출력한다.
package tstThread; public class Consumer extends Thread { private final Channel channel; public Consumer(String name, Channel channel) { super(name); this.channel = channel; } public void run() { try { while(true) { String data = channel.get(); //Thread.sleep(100); System.out.println("#: " + getName() + " consumes " + data); } } catch(InterruptedException e) { //. } } }
다음은 Producer가 생성한 데이터를 저장하는 Channel 클래스이다.
package tstThread; import java.util.LinkedList; public class Channel { private final LinkedList<String> dataBuffer = new LinkedList<String>(); private final int maxSize; public Channel(int maxSize) { this.maxSize = maxSize; } public synchronized void put(String data) throws InterruptedException { while(dataBuffer.size() >= maxSize) { wait(); } dataBuffer.addLast(data); notifyAll(); } public synchronized String get() throws InterruptedException { while(dataBuffer.size() <= 0) { wait(); } String data = dataBuffer.pollFirst(); notifyAll(); //System.out.println("\t" + dataBuffer.size()); return data; } }
실행결과는 다음과 같다.
#: CONSUMER-C consumes [DATA-58449@PRODUCER-1] #: CONSUMER-C consumes [DATA-59980@PRODUCER-2] #: CONSUMER-B consumes [DATA-58456@PRODUCER-1] #: CONSUMER-C consumes [DATA-59981@PRODUCER-2] #: CONSUMER-B consumes [DATA-59982@PRODUCER-2] #: CONSUMER-C consumes [DATA-59983@PRODUCER-2] . . . #: CONSUMER-B consumes [DATA-58476@PRODUCER-1] #: CONSUMER-A consumes [DATA-58477@PRODUCER-1] #: CONSUMER-B consumes [DATA-58478@PRODUCER-1] #: CONSUMER-A consumes [DATA-58479@PRODUCER-1] #: CONSUMER-B consumes [DATA-58480@PRODUCER-1] #: CONSUMER-A consumes [DATA-58481@PRODUCER-1] #: CONSUMER-C consumes [DATA-58475@PRODUCER-1] #: CONSUMER-C consumes [DATA-59988@PRODUCER-2] #: CONSUMER-D consumes [DATA-58472@PRODUCER-1]