Producer-Consumer 패턴

패턴 명칭

Producer-Consumer

필요한 상황

어떤 데이터가 생성되면, 그 생성된 데이터를 받아 처리하는 상황에서 멀티 스레드를 활용하여 처리량과 속도를 늘리고자할 적용할 수 있는 패턴이다. 데이터를 생성하는 스레드와 이를 처리하는 스레드를 분리할 수 있으며 각각은 1개 이상의 스레드로 구성될 수 있다. 데이터를 생성하는 스레드를 Producer, 데이터를 처리하는 스레드를 Consumer이다. Producer는 처리하고자 하는 데이터를 생성해 저장소에 저장해 두면 Consumer는 이 저장소에서 데이터를 받아 처리한다. 여기서 저장소는 Channel이라고 한다. Channel은 데이터를 저장할 수 있는 물리적 한계가 있으며, Producer는 이 물리적 한계 내에서 데이터를 저장하고 저장할 수 있는 공간이 없을 경우 대기한다. Consumer는 Channel에 저장된 데이터를 가져와 처리하는데, 만약 Channel에 데이터가 없을 경우 대기한다.

예제 코드

위의 클래스 다이어그램에서 언급된 클래스들은 앞서 설명한 내용과 같다. 먼저 이 클래스들을 사용하는 예제 코드는 다음과 같다.

package tstThread;

public class Main {
	public static void main(String[] args) {
		Channel channel = new Channel(10);
		
		new Producer("PRODUCER-1", channel).start();
		new Producer("PRODUCER-2", channel).start();
		
		new Consumer("CONSUMER-A", channel).start();
		new Consumer("CONSUMER-B", channel).start();
		new Consumer("CONSUMER-C", channel).start();
		new Consumer("CONSUMER-D", channel).start();
	}
}

Producer 스레드는 2개, Consumer 스레드는 4개를 구성시킨다. Channel에 저장할 수 있는 데이터의 최대 용량은 10으로 지정했다. 이 예제에서 Producer가 생성하는 데이터는 문자열이며, Producer 클래스는 다음과 같다.

package tstThread;

public class Producer extends Thread {
	private final Channel channel;
	private int number = 0;
	
	public Producer(String name, Channel channel) {
		super(name);
		this.channel = channel;
	}
	
	public void run() {
		try {
			while(true) {
				//Thread.sleep(200);
				
				String data = "[DATA-" + number + "@" + getName() + "]";
				channel.put(data);
				number++;
			}
		} catch(InterruptedException e) {
			//.
		}
	}
}

생성된 데이터를 받아 처리하는 Consumer 클래스는 다음과 같은데, 받은 문자열 데이터를 조합한 새로운 문자열을 출력한다.

package tstThread;

public class Consumer extends Thread {
	private final Channel channel;
	
	public Consumer(String name, Channel channel) {
		super(name);
		this.channel = channel;
	}
	
	public void run() {
		try {
			while(true) {
				String data = channel.get();
				
				//Thread.sleep(100);
				System.out.println("#: " + getName() + " consumes " + data);
			}
		} catch(InterruptedException e) {
			//.
		}
	}
}

다음은 Producer가 생성한 데이터를 저장하는 Channel 클래스이다.

package tstThread;

import java.util.LinkedList;

public class Channel {
	private final LinkedList<String> dataBuffer = new LinkedList<String>();
	private final int maxSize;
	
	public Channel(int maxSize) {
		this.maxSize = maxSize;
	}
	
	public synchronized void put(String data) throws InterruptedException {
		while(dataBuffer.size() >= maxSize) {
			wait();
		}
		
		dataBuffer.addLast(data);
		
		notifyAll();
	}
	
	public synchronized String get() throws InterruptedException {
		while(dataBuffer.size() <= 0) {
			wait();
		}
		
		String data = dataBuffer.pollFirst();
		notifyAll();
		
		//System.out.println("\t" + dataBuffer.size());
		
		return data;
	}
}

실행결과는 다음과 같다.

#: CONSUMER-C consumes [DATA-58449@PRODUCER-1]
#: CONSUMER-C consumes [DATA-59980@PRODUCER-2]
#: CONSUMER-B consumes [DATA-58456@PRODUCER-1]
#: CONSUMER-C consumes [DATA-59981@PRODUCER-2]
#: CONSUMER-B consumes [DATA-59982@PRODUCER-2]
#: CONSUMER-C consumes [DATA-59983@PRODUCER-2]

.
.
.

#: CONSUMER-B consumes [DATA-58476@PRODUCER-1]
#: CONSUMER-A consumes [DATA-58477@PRODUCER-1]
#: CONSUMER-B consumes [DATA-58478@PRODUCER-1]
#: CONSUMER-A consumes [DATA-58479@PRODUCER-1]
#: CONSUMER-B consumes [DATA-58480@PRODUCER-1]
#: CONSUMER-A consumes [DATA-58481@PRODUCER-1]
#: CONSUMER-C consumes [DATA-58475@PRODUCER-1]
#: CONSUMER-C consumes [DATA-59988@PRODUCER-2]
#: CONSUMER-D consumes [DATA-58472@PRODUCER-1]

Read-Write Lock 패턴

패턴 명칭

Read-Write Lock

필요한 상황

Database에서 어떤 테이블이 있다고 하자. 이 테이블은 다수의 클라이언트에서 동시에 읽고 쓰이는 대상이다. 어떤 클라이언트는 이 테이블에 데이터를 쓰고, 어떤 또 다른 클라이언트는 이 테이블에서 데이터를 읽는다. 그런데 만약 2개의 클라이언트에서 특정 레코드 데이터를 동시에 각각 쓰고 읽기를 수행하면 읽는 쓰레드 쪽에서는 망가진 데이터를 읽을 수 있다. 또한 만약 2개의 클라이언트에서 동시에 특정 레코드 데이터를 쓰려고 할때 망가진 데이터가 저장될 수 있다. 문제가 없는 경우는 2개의 클라이언트가 동시에 특정 레코드의 데이터를 읽을 때 뿐이다. 이 패턴은 어떤 데이터에 대해 동시에 읽고 쓸때의 상황에서, 또는 동시에 데이터를 쓰는 상황에서 문제가 발생하지 않도록 해주는데 목적이 있다.

예제 코드

위의 클래스 다이어그램에서 언급된 클래스 중 Reader와 Writer는 각각 어떤 데이터를 읽고 쓰는 스레드이며, Data는 이 쓰레드가 읽거나 쓰는 대상이 되는 데이터를 담고 있는 클래스이다. Lock은 읽기와 쓰기에 대한 스레드 제어를 위한 클래스이다. 이 클래스들 사용하는 예제 코드는 다음과 같다.

package tstThread;

public class Main {
	public static void main(String[] args) {
		Data data = new Data(10);
		
		new Reader(data).start();
		new Reader(data).start();
		new Reader(data).start();
		
		String[] weeks = {"월", "화", "수", "목", "금", "토", "일"};
		new Writer(data, weeks).start();
		
		String[] numbers = {"ONE", "TWO", "THREE", "FOUR", "FIVE", "SIX", "SEVEN", "EIGHT", "NINE"};
		new Writer(data, numbers).start();
		
		String[] digits = {"1", "2", "3", "4", "5", "6", "7", "8", "9"};
		new Writer(data, digits).start();
	}
}

Reader 클래스는 다음과 같다.

package tstThread;

public class Reader extends Thread {
	private final Data data;
	
	public Reader(Data data) {
		this.data = data;
	}
	
	public void run() {
		try {
			while(true) {
				String v = data.read();
				System.out.println(Thread.currentThread().getName() + " -> " + v);
			}
		} catch(InterruptedException e) {
			//.
		}
	}
}

Writer 클래스는 다음과 같다.

package tstThread;

import java.util.Random;

public class Writer extends Thread {
	private static final Random random = new Random();
	private final Data data;
	private final String[] inputs;
	private int index = 0;
	
	public Writer(Data data, String[] inputs) {
		this.data = data;
		this.inputs = inputs;
	}
	
	public void run() {
		try {
			while(true) {
				String input = inputs[index];
				index = (index + 1) % inputs.length;
				data.write(input);
				Thread.sleep(random.nextInt(1000));
			}
		} catch(InterruptedException e) {
			//.
		}
	}	
}

Data 클래스는 다음과 같다.

package tstThread;

public class Data {
	private final StringBuilder buffer;
	private final Lock lock = new Lock();
	
	public Data(int size) {
		this.buffer = new StringBuilder("#empty");
	}
	
	public String read() throws InterruptedException {
		lock.readLock();
		try {
			Thread.sleep(100);		
			return buffer.toString();
		} finally {
			lock.readUnlock();
		}
	}
	
	public void write(String v) throws InterruptedException {
		lock.writeLock();
		try {
			Thread.sleep(100);
			buffer.setLength(0);
			buffer.append(v);
		} finally {
			lock.writeUnlock();
		}
	}
}

Lock 클래스는 다음과 같다.

package tstThread;

public class Lock {
	private int readingReaders = 0;
	private int waitingReaders = 0;
	private boolean preferReader = false;
	
	private int waitingWriters = 0;
	private int writingWriters = 0;
	private boolean preferWriter = true;
	
	public synchronized void readLock() throws InterruptedException {
		waitingReaders++;
		try {
			while(writingWriters > 0 || (preferWriter && waitingWriters > 0)) {
				wait();
			}
		} finally {
			waitingReaders--;
		}
		
		readingReaders++;
	}
	
	public synchronized void readUnlock() {
		readingReaders--;
		preferWriter = true;
		preferReader = false;
		notifyAll();
	}
	
	public synchronized void writeLock() throws InterruptedException {
		waitingWriters++;
		try {
			while(readingReaders > 0 || writingWriters > 0 || (preferReader && waitingReaders > 0)) {
				wait();
			}
			
		} finally {
			waitingWriters--;
		}
		
		writingWriters++;
	}
	
	public synchronized void writeUnlock() {
		writingWriters--;
		preferWriter = false;
		preferReader = true;
		notifyAll();
	}
}

실행 결과는 다음과 같다.

Thread-2 -> #empty
Thread-1 -> #empty
Thread-0 -> #empty
Thread-0 -> 월
Thread-2 -> 월
Thread-1 -> 월
Thread-0 -> 1

.
.
.

Thread-0 -> 3
Thread-0 -> FOUR
Thread-2 -> FOUR
Thread-1 -> FOUR