Future 패턴

패턴 명칭

Future

필요한 상황

스레드는 코드의 실행에 초점이 맞춰져 있고, 그 결과를 받는 시점이 불분명하다. 스레드가 단순히 코드를 실행하는 것에서 끝나는 것이 아니라 그 실행의 결과를 다른 스레드에서 받기 위한 패턴이다.

예제 코드

Proxy 클래스는 어떤 결과를 얻기 위한 코드의 실행을 스레드로 실행해 주는 대리자이다. Proxy는 어떤 코드의 실행을 스레드로 실행해주고 바로 Result 클래스 타입의 객체를 반환한다. 바로 이 Result를 통해 스레드의 결과를 얻을 수 있다. 스레드의 실행 결과를 얻을 수 있는 상태가 되면 Result 객체의 setRealResult를 통해 실제 결과를 담는 RealResult 객체를 Result 클래스의 필드값에 담는다. 추후 적당한 시점에서 Result 클래스의 get 함수를 호출해서 실제 결과를 얻는다. 만약 실제 결과를 얻을 수 없을 때는 실제 결과가 완성될때까지 get 함수는 동기화(Blocking) 된다. 이러한 클래스들을 사용하는 코드는 아래와 같다.

package tstThread;

public class Main {
	public static void main(String[] args) {
		System.out.println("START");
		
		Proxy proxy = new Proxy();
		Result result1 = proxy.run(10, 'A');
		Result result2 = proxy.run(30, 'B');
		Result result3 = proxy.run(20, 'C');
				
		System.out.println("result1 = " + result1.get());
		System.out.println("result2 = " + result2.get());
		System.out.println("result3 = " + result3.get());
		
		System.out.println("END");
	}
}

Proxy 클래스는 다음과 같다.

package tstThread;

public class Proxy {
	public Result run(final int count, final char c) {
		final Result result = new Result();
		
		new Thread() {
			public void run() {
				RealResult realData = new RealResult(count, c);
				result.setRealResult(realData);
			}
		}.start();
		
		return result;
	}
}

Result 클래스는 다음과 같다.

package tstThread;

public class Result {
	private RealResult real = null;
	
	public synchronized void setRealResult(RealResult real) {
		if(this.real != null) {
			return;
		}
		
		this.real = real;
		
		notifyAll();
	}
	
	public synchronized String get() {
		while(real == null) {
			try {
				wait();
			} catch(InterruptedException e) {
				//.
			}
		}
		
		return real.get();
	}
}

RealResult 클래스는 다음과 같다.

package tstThread;

public class RealResult extends Result {
	private final String resultData;
	
	public RealResult(int count, char c) {
		char[] buffer = new char[count];
		for(int i=0; i<count; i++) {
			buffer[i] = c;
			try {
				Thread.sleep(100);
			} catch(InterruptedException e) {
				//.
			}
		}

		this.resultData = new String(buffer);
	}
	
	@Override
	public String get() {
		return resultData;
	}
}

실행 결과는 다음과 같다.

START
result1 = AAAAAAAAAA
result2 = BBBBBBBBBBBBBBBBBBBBBBBBBBBBBB
result3 = CCCCCCCCCCCCCCCCCCCC
END

Worker Thread 패턴

패턴 명칭

Worker Thread

필요한 상황

어떤 데이터가 생성되면, 이 데이터를 또 다른 여러 개의 스레드에서 처리한다. 물론 데이터의 생성 역시 또 다른 여러 개의 스레드에서 처리한다. 데이터의 처리를 동시에 처리하면서 처리하는 방식은 한가지로 정해진 것이 아닌 다양한 방식으로 수행되며, 간단이 추가될 수 있어야 한다. 이때 사용할 수 있는 패턴이다. 사실, 데이터의 다양한 방식의 처리는 Worker Thread의 응용이다.

예제 코드

Client 클래스는 처리할 데이터를 생성한다. 이렇게 생성된 데이터는 Request라는 클래스에 담기게 되는데 이 Request는 추상 클래스이며, 이 클래스를 상속받아 데이터에 대한 처리 방식을 정의할 수 있다. Client가 생성한 Request는 바로 처리되는게 아니고 Channel 클래스에 저장된다. 이렇게 저장된 데이터에 대한 Request는 Worker 클래스에의해 스레드로 처리된다. 이 Worker 클래스는 WorkerPool이라는 스레드 저장소에 미리 생성되어 관리된다. 이러한 클래스들을 사용하는 코드는 다음과 같다.

package tstThread;

public class Main {
	public static void main(String[] args) {
		Channel channel = new Channel(10);
		WorkerPool workers = new WorkerPool(5, channel);
		workers.start();
		
		new Client("ClientA", channel).start();
		new Client("ClientB", channel).start();
		new Client("ClientC", channel).start();
	}
}

Channel에 최대로 저장할 수 있는 Request의 개수는 10개, 데이터를 처리하는 Worker의 개수는 5개로 정했으며, 데이터를 생성하는 Client 스레드의 개수는 3개이다. Client 클래스의 코드는 다음과 같다.

package tstThread;

import java.util.Random;

public class Client extends Thread {
	private final Channel channel;
	private static final Random random = new Random();
	
	public Client(String name, Channel channel) {
		super(name);
		this.channel = channel;
	}
	
	public void run() {
		try {
			for(int i = 0; true; i++) {
				Request request;

				Thread.sleep(random.nextInt(1000));
				if(random.nextInt(2) == 0) { 
					request = new OneRequest(getName(), i);
				} else {
					request = new TwoRequest(getName(), i);
				}
				
				channel.putRequest(request);
			}
		} catch(InterruptedException e) {
			//.
		}
	}
}

처리해야할 데이터는 정수값이며, 이 정수값의 데이터에 대한 처리는 무작위로 결정되는데, 실제 처리에 대한 코드는 OneRequest와 TwoRequest 클래스에 정의되어 있다. Request 추상 클래스에 대한 코드는 다음과 같다.

package tstThread;

public abstract class Request {
	protected final String clentName;
	protected final int number;
	
	public Request(String clentName, int number) {
		this.clentName = clentName;
		this.number = number;
	}
	
	public abstract void execute();
}

이 추상클래스를 구현하는 OneRequest 클래스는 다음과 같다.

package tstThread;

public class OneRequest extends Request {

	public OneRequest(String name, int number) {
		super(name, number);
	}

	public void execute() {
		String result = "ECHO: " + number + "@" + clentName;
		System.out.println(Thread.currentThread().getName() + " -> " + result);
	}
}

또 다른 처리 방식인 TwoRequest 클래스는 다음과 같다.

package tstThread;

public class TwoRequest extends Request {

	public TwoRequest(String name, int number) {
		super(name, number);
	}

	public void execute() {
		String result = "POWER: " + (number*number) + "@" + clentName;
		System.out.println(Thread.currentThread().getName() + " -> " + result);
	}
}

이 Request 클래스에 대한 객체는 Client가 생성하여 Channel에 저장되는데, Channel 클래스의 코드는 다음과 같다.

package tstThread;

import java.util.LinkedList;

public class Channel {
	private final int maxCountRequests;
	private final LinkedList<Request> requestQueue = new LinkedList<Request>();
	
	public Channel(int maxCountRequests) {
		this.maxCountRequests = maxCountRequests;
	}
	
	public synchronized void putRequest(Request request) {
		while(requestQueue.size() >= maxCountRequests) {
			try {
				wait();
			} catch(InterruptedException e) {
				//.
			}
		}
		
		requestQueue.addLast(request);
		notifyAll();
	}
	
	public synchronized Request takeRequest() {
		while(requestQueue.size() <= 0) {
			try {
				wait();
			} catch(InterruptedException e) {
				//.
			}
		}
		
		Request request = requestQueue.pollFirst();
		notifyAll();

		return request;
	}
}

데이터를 처리하는 Worker 클래스는 다음과 같다.

package tstThread;

public class Worker extends Thread {
	private final Channel channel;
	
	public Worker(String name, Channel channel) {
		super(name);
		this.channel = channel;
	}
	
	public void run() {
		while(true) {
			Request request = channel.takeRequest();
			request.execute();
		}
	}
}

Worker 클래스의 객체는 WorkerPool이라는 클래스를 통해 생성되어 관리되며 코드는 다음과 같다.

package tstThread;

public class WorkerPool {
	private final Worker[] threadPool;
	
	public WorkerPool(int countThreads, Channel channel) {
		threadPool = new Worker[countThreads];
		for(int i=0; i<threadPool.length; i++) {
			threadPool[i] = new Worker("Worker-" + i, channel);
		}
	}
	
	public void start() {
		for(int i=0; i<threadPool.length; i++) {
			threadPool[i].start();
		}
	}
}

실행 결과는 다음과 같다.

Worker-0 -> ECHO: 0@ClientA
Worker-4 -> ECHO: 0@ClientB
Worker-3 -> POWER: 0@ClientC
Worker-2 -> POWER: 1@ClientB
Worker-3 -> POWER: 4@ClientB
Worker-2 -> POWER: 1@ClientC

.
.
.

orker-0 -> POWER: 625@ClientC
Worker-2 -> POWER: 784@ClientB
Worker-0 -> POWER: 841@ClientB
Worker-2 -> POWER: 784@ClientA
Worker-0 -> POWER: 841@ClientA
Worker-2 -> ECHO: 30@ClientB
Worker-0 -> POWER: 676@ClientC
Worker-2 -> ECHO: 30@ClientA
Worker-3 -> POWER: 961@ClientA
Worker-2 -> ECHO: 32@ClientA
Worker-3 -> POWER: 961@ClientB

Producer-Consumer 패턴

패턴 명칭

Producer-Consumer

필요한 상황

어떤 데이터가 생성되면, 그 생성된 데이터를 받아 처리하는 상황에서 멀티 스레드를 활용하여 처리량과 속도를 늘리고자할 적용할 수 있는 패턴이다. 데이터를 생성하는 스레드와 이를 처리하는 스레드를 분리할 수 있으며 각각은 1개 이상의 스레드로 구성될 수 있다. 데이터를 생성하는 스레드를 Producer, 데이터를 처리하는 스레드를 Consumer이다. Producer는 처리하고자 하는 데이터를 생성해 저장소에 저장해 두면 Consumer는 이 저장소에서 데이터를 받아 처리한다. 여기서 저장소는 Channel이라고 한다. Channel은 데이터를 저장할 수 있는 물리적 한계가 있으며, Producer는 이 물리적 한계 내에서 데이터를 저장하고 저장할 수 있는 공간이 없을 경우 대기한다. Consumer는 Channel에 저장된 데이터를 가져와 처리하는데, 만약 Channel에 데이터가 없을 경우 대기한다.

예제 코드

위의 클래스 다이어그램에서 언급된 클래스들은 앞서 설명한 내용과 같다. 먼저 이 클래스들을 사용하는 예제 코드는 다음과 같다.

package tstThread;

public class Main {
	public static void main(String[] args) {
		Channel channel = new Channel(10);
		
		new Producer("PRODUCER-1", channel).start();
		new Producer("PRODUCER-2", channel).start();
		
		new Consumer("CONSUMER-A", channel).start();
		new Consumer("CONSUMER-B", channel).start();
		new Consumer("CONSUMER-C", channel).start();
		new Consumer("CONSUMER-D", channel).start();
	}
}

Producer 스레드는 2개, Consumer 스레드는 4개를 구성시킨다. Channel에 저장할 수 있는 데이터의 최대 용량은 10으로 지정했다. 이 예제에서 Producer가 생성하는 데이터는 문자열이며, Producer 클래스는 다음과 같다.

package tstThread;

public class Producer extends Thread {
	private final Channel channel;
	private int number = 0;
	
	public Producer(String name, Channel channel) {
		super(name);
		this.channel = channel;
	}
	
	public void run() {
		try {
			while(true) {
				//Thread.sleep(200);
				
				String data = "[DATA-" + number + "@" + getName() + "]";
				channel.put(data);
				number++;
			}
		} catch(InterruptedException e) {
			//.
		}
	}
}

생성된 데이터를 받아 처리하는 Consumer 클래스는 다음과 같은데, 받은 문자열 데이터를 조합한 새로운 문자열을 출력한다.

package tstThread;

public class Consumer extends Thread {
	private final Channel channel;
	
	public Consumer(String name, Channel channel) {
		super(name);
		this.channel = channel;
	}
	
	public void run() {
		try {
			while(true) {
				String data = channel.get();
				
				//Thread.sleep(100);
				System.out.println("#: " + getName() + " consumes " + data);
			}
		} catch(InterruptedException e) {
			//.
		}
	}
}

다음은 Producer가 생성한 데이터를 저장하는 Channel 클래스이다.

package tstThread;

import java.util.LinkedList;

public class Channel {
	private final LinkedList<String> dataBuffer = new LinkedList<String>();
	private final int maxSize;
	
	public Channel(int maxSize) {
		this.maxSize = maxSize;
	}
	
	public synchronized void put(String data) throws InterruptedException {
		while(dataBuffer.size() >= maxSize) {
			wait();
		}
		
		dataBuffer.addLast(data);
		
		notifyAll();
	}
	
	public synchronized String get() throws InterruptedException {
		while(dataBuffer.size() <= 0) {
			wait();
		}
		
		String data = dataBuffer.pollFirst();
		notifyAll();
		
		//System.out.println("\t" + dataBuffer.size());
		
		return data;
	}
}

실행결과는 다음과 같다.

#: CONSUMER-C consumes [DATA-58449@PRODUCER-1]
#: CONSUMER-C consumes [DATA-59980@PRODUCER-2]
#: CONSUMER-B consumes [DATA-58456@PRODUCER-1]
#: CONSUMER-C consumes [DATA-59981@PRODUCER-2]
#: CONSUMER-B consumes [DATA-59982@PRODUCER-2]
#: CONSUMER-C consumes [DATA-59983@PRODUCER-2]

.
.
.

#: CONSUMER-B consumes [DATA-58476@PRODUCER-1]
#: CONSUMER-A consumes [DATA-58477@PRODUCER-1]
#: CONSUMER-B consumes [DATA-58478@PRODUCER-1]
#: CONSUMER-A consumes [DATA-58479@PRODUCER-1]
#: CONSUMER-B consumes [DATA-58480@PRODUCER-1]
#: CONSUMER-A consumes [DATA-58481@PRODUCER-1]
#: CONSUMER-C consumes [DATA-58475@PRODUCER-1]
#: CONSUMER-C consumes [DATA-59988@PRODUCER-2]
#: CONSUMER-D consumes [DATA-58472@PRODUCER-1]

Read-Write Lock 패턴

패턴 명칭

Read-Write Lock

필요한 상황

Database에서 어떤 테이블이 있다고 하자. 이 테이블은 다수의 클라이언트에서 동시에 읽고 쓰이는 대상이다. 어떤 클라이언트는 이 테이블에 데이터를 쓰고, 어떤 또 다른 클라이언트는 이 테이블에서 데이터를 읽는다. 그런데 만약 2개의 클라이언트에서 특정 레코드 데이터를 동시에 각각 쓰고 읽기를 수행하면 읽는 쓰레드 쪽에서는 망가진 데이터를 읽을 수 있다. 또한 만약 2개의 클라이언트에서 동시에 특정 레코드 데이터를 쓰려고 할때 망가진 데이터가 저장될 수 있다. 문제가 없는 경우는 2개의 클라이언트가 동시에 특정 레코드의 데이터를 읽을 때 뿐이다. 이 패턴은 어떤 데이터에 대해 동시에 읽고 쓸때의 상황에서, 또는 동시에 데이터를 쓰는 상황에서 문제가 발생하지 않도록 해주는데 목적이 있다.

예제 코드

위의 클래스 다이어그램에서 언급된 클래스 중 Reader와 Writer는 각각 어떤 데이터를 읽고 쓰는 스레드이며, Data는 이 쓰레드가 읽거나 쓰는 대상이 되는 데이터를 담고 있는 클래스이다. Lock은 읽기와 쓰기에 대한 스레드 제어를 위한 클래스이다. 이 클래스들 사용하는 예제 코드는 다음과 같다.

package tstThread;

public class Main {
	public static void main(String[] args) {
		Data data = new Data(10);
		
		new Reader(data).start();
		new Reader(data).start();
		new Reader(data).start();
		
		String[] weeks = {"월", "화", "수", "목", "금", "토", "일"};
		new Writer(data, weeks).start();
		
		String[] numbers = {"ONE", "TWO", "THREE", "FOUR", "FIVE", "SIX", "SEVEN", "EIGHT", "NINE"};
		new Writer(data, numbers).start();
		
		String[] digits = {"1", "2", "3", "4", "5", "6", "7", "8", "9"};
		new Writer(data, digits).start();
	}
}

Reader 클래스는 다음과 같다.

package tstThread;

public class Reader extends Thread {
	private final Data data;
	
	public Reader(Data data) {
		this.data = data;
	}
	
	public void run() {
		try {
			while(true) {
				String v = data.read();
				System.out.println(Thread.currentThread().getName() + " -> " + v);
			}
		} catch(InterruptedException e) {
			//.
		}
	}
}

Writer 클래스는 다음과 같다.

package tstThread;

import java.util.Random;

public class Writer extends Thread {
	private static final Random random = new Random();
	private final Data data;
	private final String[] inputs;
	private int index = 0;
	
	public Writer(Data data, String[] inputs) {
		this.data = data;
		this.inputs = inputs;
	}
	
	public void run() {
		try {
			while(true) {
				String input = inputs[index];
				index = (index + 1) % inputs.length;
				data.write(input);
				Thread.sleep(random.nextInt(1000));
			}
		} catch(InterruptedException e) {
			//.
		}
	}	
}

Data 클래스는 다음과 같다.

package tstThread;

public class Data {
	private final StringBuilder buffer;
	private final Lock lock = new Lock();
	
	public Data(int size) {
		this.buffer = new StringBuilder("#empty");
	}
	
	public String read() throws InterruptedException {
		lock.readLock();
		try {
			Thread.sleep(100);		
			return buffer.toString();
		} finally {
			lock.readUnlock();
		}
	}
	
	public void write(String v) throws InterruptedException {
		lock.writeLock();
		try {
			Thread.sleep(100);
			buffer.setLength(0);
			buffer.append(v);
		} finally {
			lock.writeUnlock();
		}
	}
}

Lock 클래스는 다음과 같다.

package tstThread;

public class Lock {
	private int readingReaders = 0;
	private int waitingReaders = 0;
	private boolean preferReader = false;
	
	private int waitingWriters = 0;
	private int writingWriters = 0;
	private boolean preferWriter = true;
	
	public synchronized void readLock() throws InterruptedException {
		waitingReaders++;
		try {
			while(writingWriters > 0 || (preferWriter && waitingWriters > 0)) {
				wait();
			}
		} finally {
			waitingReaders--;
		}
		
		readingReaders++;
	}
	
	public synchronized void readUnlock() {
		readingReaders--;
		preferWriter = true;
		preferReader = false;
		notifyAll();
	}
	
	public synchronized void writeLock() throws InterruptedException {
		waitingWriters++;
		try {
			while(readingReaders > 0 || writingWriters > 0 || (preferReader && waitingReaders > 0)) {
				wait();
			}
			
		} finally {
			waitingWriters--;
		}
		
		writingWriters++;
	}
	
	public synchronized void writeUnlock() {
		writingWriters--;
		preferWriter = false;
		preferReader = true;
		notifyAll();
	}
}

실행 결과는 다음과 같다.

Thread-2 -> #empty
Thread-1 -> #empty
Thread-0 -> #empty
Thread-0 -> 월
Thread-2 -> 월
Thread-1 -> 월
Thread-0 -> 1

.
.
.

Thread-0 -> 3
Thread-0 -> FOUR
Thread-2 -> FOUR
Thread-1 -> FOUR