Worker Thread 패턴

패턴 명칭

Worker Thread

필요한 상황

어떤 데이터가 생성되면, 이 데이터를 또 다른 여러 개의 스레드에서 처리한다. 물론 데이터의 생성 역시 또 다른 여러 개의 스레드에서 처리한다. 데이터의 처리를 동시에 처리하면서 처리하는 방식은 한가지로 정해진 것이 아닌 다양한 방식으로 수행되며, 간단이 추가될 수 있어야 한다. 이때 사용할 수 있는 패턴이다. 사실, 데이터의 다양한 방식의 처리는 Worker Thread의 응용이다.

예제 코드

Client 클래스는 처리할 데이터를 생성한다. 이렇게 생성된 데이터는 Request라는 클래스에 담기게 되는데 이 Request는 추상 클래스이며, 이 클래스를 상속받아 데이터에 대한 처리 방식을 정의할 수 있다. Client가 생성한 Request는 바로 처리되는게 아니고 Channel 클래스에 저장된다. 이렇게 저장된 데이터에 대한 Request는 Worker 클래스에의해 스레드로 처리된다. 이 Worker 클래스는 WorkerPool이라는 스레드 저장소에 미리 생성되어 관리된다. 이러한 클래스들을 사용하는 코드는 다음과 같다.

package tstThread;

public class Main {
	public static void main(String[] args) {
		Channel channel = new Channel(10);
		WorkerPool workers = new WorkerPool(5, channel);
		workers.start();
		
		new Client("ClientA", channel).start();
		new Client("ClientB", channel).start();
		new Client("ClientC", channel).start();
	}
}

Channel에 최대로 저장할 수 있는 Request의 개수는 10개, 데이터를 처리하는 Worker의 개수는 5개로 정했으며, 데이터를 생성하는 Client 스레드의 개수는 3개이다. Client 클래스의 코드는 다음과 같다.

package tstThread;

import java.util.Random;

public class Client extends Thread {
	private final Channel channel;
	private static final Random random = new Random();
	
	public Client(String name, Channel channel) {
		super(name);
		this.channel = channel;
	}
	
	public void run() {
		try {
			for(int i = 0; true; i++) {
				Request request;

				Thread.sleep(random.nextInt(1000));
				if(random.nextInt(2) == 0) { 
					request = new OneRequest(getName(), i);
				} else {
					request = new TwoRequest(getName(), i);
				}
				
				channel.putRequest(request);
			}
		} catch(InterruptedException e) {
			//.
		}
	}
}

처리해야할 데이터는 정수값이며, 이 정수값의 데이터에 대한 처리는 무작위로 결정되는데, 실제 처리에 대한 코드는 OneRequest와 TwoRequest 클래스에 정의되어 있다. Request 추상 클래스에 대한 코드는 다음과 같다.

package tstThread;

public abstract class Request {
	protected final String clentName;
	protected final int number;
	
	public Request(String clentName, int number) {
		this.clentName = clentName;
		this.number = number;
	}
	
	public abstract void execute();
}

이 추상클래스를 구현하는 OneRequest 클래스는 다음과 같다.

package tstThread;

public class OneRequest extends Request {

	public OneRequest(String name, int number) {
		super(name, number);
	}

	public void execute() {
		String result = "ECHO: " + number + "@" + clentName;
		System.out.println(Thread.currentThread().getName() + " -> " + result);
	}
}

또 다른 처리 방식인 TwoRequest 클래스는 다음과 같다.

package tstThread;

public class TwoRequest extends Request {

	public TwoRequest(String name, int number) {
		super(name, number);
	}

	public void execute() {
		String result = "POWER: " + (number*number) + "@" + clentName;
		System.out.println(Thread.currentThread().getName() + " -> " + result);
	}
}

이 Request 클래스에 대한 객체는 Client가 생성하여 Channel에 저장되는데, Channel 클래스의 코드는 다음과 같다.

package tstThread;

import java.util.LinkedList;

public class Channel {
	private final int maxCountRequests;
	private final LinkedList<Request> requestQueue = new LinkedList<Request>();
	
	public Channel(int maxCountRequests) {
		this.maxCountRequests = maxCountRequests;
	}
	
	public synchronized void putRequest(Request request) {
		while(requestQueue.size() >= maxCountRequests) {
			try {
				wait();
			} catch(InterruptedException e) {
				//.
			}
		}
		
		requestQueue.addLast(request);
		notifyAll();
	}
	
	public synchronized Request takeRequest() {
		while(requestQueue.size() <= 0) {
			try {
				wait();
			} catch(InterruptedException e) {
				//.
			}
		}
		
		Request request = requestQueue.pollFirst();
		notifyAll();

		return request;
	}
}

데이터를 처리하는 Worker 클래스는 다음과 같다.

package tstThread;

public class Worker extends Thread {
	private final Channel channel;
	
	public Worker(String name, Channel channel) {
		super(name);
		this.channel = channel;
	}
	
	public void run() {
		while(true) {
			Request request = channel.takeRequest();
			request.execute();
		}
	}
}

Worker 클래스의 객체는 WorkerPool이라는 클래스를 통해 생성되어 관리되며 코드는 다음과 같다.

package tstThread;

public class WorkerPool {
	private final Worker[] threadPool;
	
	public WorkerPool(int countThreads, Channel channel) {
		threadPool = new Worker[countThreads];
		for(int i=0; i<threadPool.length; i++) {
			threadPool[i] = new Worker("Worker-" + i, channel);
		}
	}
	
	public void start() {
		for(int i=0; i<threadPool.length; i++) {
			threadPool[i].start();
		}
	}
}

실행 결과는 다음과 같다.

Worker-0 -> ECHO: 0@ClientA
Worker-4 -> ECHO: 0@ClientB
Worker-3 -> POWER: 0@ClientC
Worker-2 -> POWER: 1@ClientB
Worker-3 -> POWER: 4@ClientB
Worker-2 -> POWER: 1@ClientC

.
.
.

orker-0 -> POWER: 625@ClientC
Worker-2 -> POWER: 784@ClientB
Worker-0 -> POWER: 841@ClientB
Worker-2 -> POWER: 784@ClientA
Worker-0 -> POWER: 841@ClientA
Worker-2 -> ECHO: 30@ClientB
Worker-0 -> POWER: 676@ClientC
Worker-2 -> ECHO: 30@ClientA
Worker-3 -> POWER: 961@ClientA
Worker-2 -> ECHO: 32@ClientA
Worker-3 -> POWER: 961@ClientB

Producer-Consumer 패턴

패턴 명칭

Producer-Consumer

필요한 상황

어떤 데이터가 생성되면, 그 생성된 데이터를 받아 처리하는 상황에서 멀티 스레드를 활용하여 처리량과 속도를 늘리고자할 적용할 수 있는 패턴이다. 데이터를 생성하는 스레드와 이를 처리하는 스레드를 분리할 수 있으며 각각은 1개 이상의 스레드로 구성될 수 있다. 데이터를 생성하는 스레드를 Producer, 데이터를 처리하는 스레드를 Consumer이다. Producer는 처리하고자 하는 데이터를 생성해 저장소에 저장해 두면 Consumer는 이 저장소에서 데이터를 받아 처리한다. 여기서 저장소는 Channel이라고 한다. Channel은 데이터를 저장할 수 있는 물리적 한계가 있으며, Producer는 이 물리적 한계 내에서 데이터를 저장하고 저장할 수 있는 공간이 없을 경우 대기한다. Consumer는 Channel에 저장된 데이터를 가져와 처리하는데, 만약 Channel에 데이터가 없을 경우 대기한다.

예제 코드

위의 클래스 다이어그램에서 언급된 클래스들은 앞서 설명한 내용과 같다. 먼저 이 클래스들을 사용하는 예제 코드는 다음과 같다.

package tstThread;

public class Main {
	public static void main(String[] args) {
		Channel channel = new Channel(10);
		
		new Producer("PRODUCER-1", channel).start();
		new Producer("PRODUCER-2", channel).start();
		
		new Consumer("CONSUMER-A", channel).start();
		new Consumer("CONSUMER-B", channel).start();
		new Consumer("CONSUMER-C", channel).start();
		new Consumer("CONSUMER-D", channel).start();
	}
}

Producer 스레드는 2개, Consumer 스레드는 4개를 구성시킨다. Channel에 저장할 수 있는 데이터의 최대 용량은 10으로 지정했다. 이 예제에서 Producer가 생성하는 데이터는 문자열이며, Producer 클래스는 다음과 같다.

package tstThread;

public class Producer extends Thread {
	private final Channel channel;
	private int number = 0;
	
	public Producer(String name, Channel channel) {
		super(name);
		this.channel = channel;
	}
	
	public void run() {
		try {
			while(true) {
				//Thread.sleep(200);
				
				String data = "[DATA-" + number + "@" + getName() + "]";
				channel.put(data);
				number++;
			}
		} catch(InterruptedException e) {
			//.
		}
	}
}

생성된 데이터를 받아 처리하는 Consumer 클래스는 다음과 같은데, 받은 문자열 데이터를 조합한 새로운 문자열을 출력한다.

package tstThread;

public class Consumer extends Thread {
	private final Channel channel;
	
	public Consumer(String name, Channel channel) {
		super(name);
		this.channel = channel;
	}
	
	public void run() {
		try {
			while(true) {
				String data = channel.get();
				
				//Thread.sleep(100);
				System.out.println("#: " + getName() + " consumes " + data);
			}
		} catch(InterruptedException e) {
			//.
		}
	}
}

다음은 Producer가 생성한 데이터를 저장하는 Channel 클래스이다.

package tstThread;

import java.util.LinkedList;

public class Channel {
	private final LinkedList<String> dataBuffer = new LinkedList<String>();
	private final int maxSize;
	
	public Channel(int maxSize) {
		this.maxSize = maxSize;
	}
	
	public synchronized void put(String data) throws InterruptedException {
		while(dataBuffer.size() >= maxSize) {
			wait();
		}
		
		dataBuffer.addLast(data);
		
		notifyAll();
	}
	
	public synchronized String get() throws InterruptedException {
		while(dataBuffer.size() <= 0) {
			wait();
		}
		
		String data = dataBuffer.pollFirst();
		notifyAll();
		
		//System.out.println("\t" + dataBuffer.size());
		
		return data;
	}
}

실행결과는 다음과 같다.

#: CONSUMER-C consumes [DATA-58449@PRODUCER-1]
#: CONSUMER-C consumes [DATA-59980@PRODUCER-2]
#: CONSUMER-B consumes [DATA-58456@PRODUCER-1]
#: CONSUMER-C consumes [DATA-59981@PRODUCER-2]
#: CONSUMER-B consumes [DATA-59982@PRODUCER-2]
#: CONSUMER-C consumes [DATA-59983@PRODUCER-2]

.
.
.

#: CONSUMER-B consumes [DATA-58476@PRODUCER-1]
#: CONSUMER-A consumes [DATA-58477@PRODUCER-1]
#: CONSUMER-B consumes [DATA-58478@PRODUCER-1]
#: CONSUMER-A consumes [DATA-58479@PRODUCER-1]
#: CONSUMER-B consumes [DATA-58480@PRODUCER-1]
#: CONSUMER-A consumes [DATA-58481@PRODUCER-1]
#: CONSUMER-C consumes [DATA-58475@PRODUCER-1]
#: CONSUMER-C consumes [DATA-59988@PRODUCER-2]
#: CONSUMER-D consumes [DATA-58472@PRODUCER-1]