Producer-Consumer 패턴

패턴 명칭

Producer-Consumer

필요한 상황

어떤 데이터가 생성되면, 그 생성된 데이터를 받아 처리하는 상황에서 멀티 스레드를 활용하여 처리량과 속도를 늘리고자할 적용할 수 있는 패턴이다. 데이터를 생성하는 스레드와 이를 처리하는 스레드를 분리할 수 있으며 각각은 1개 이상의 스레드로 구성될 수 있다. 데이터를 생성하는 스레드를 Producer, 데이터를 처리하는 스레드를 Consumer이다. Producer는 처리하고자 하는 데이터를 생성해 저장소에 저장해 두면 Consumer는 이 저장소에서 데이터를 받아 처리한다. 여기서 저장소는 Channel이라고 한다. Channel은 데이터를 저장할 수 있는 물리적 한계가 있으며, Producer는 이 물리적 한계 내에서 데이터를 저장하고 저장할 수 있는 공간이 없을 경우 대기한다. Consumer는 Channel에 저장된 데이터를 가져와 처리하는데, 만약 Channel에 데이터가 없을 경우 대기한다.

예제 코드

위의 클래스 다이어그램에서 언급된 클래스들은 앞서 설명한 내용과 같다. 먼저 이 클래스들을 사용하는 예제 코드는 다음과 같다.

package tstThread;

public class Main {
	public static void main(String[] args) {
		Channel channel = new Channel(10);
		
		new Producer("PRODUCER-1", channel).start();
		new Producer("PRODUCER-2", channel).start();
		
		new Consumer("CONSUMER-A", channel).start();
		new Consumer("CONSUMER-B", channel).start();
		new Consumer("CONSUMER-C", channel).start();
		new Consumer("CONSUMER-D", channel).start();
	}
}

Producer 스레드는 2개, Consumer 스레드는 4개를 구성시킨다. Channel에 저장할 수 있는 데이터의 최대 용량은 10으로 지정했다. 이 예제에서 Producer가 생성하는 데이터는 문자열이며, Producer 클래스는 다음과 같다.

package tstThread;

public class Producer extends Thread {
	private final Channel channel;
	private int number = 0;
	
	public Producer(String name, Channel channel) {
		super(name);
		this.channel = channel;
	}
	
	public void run() {
		try {
			while(true) {
				//Thread.sleep(200);
				
				String data = "[DATA-" + number + "@" + getName() + "]";
				channel.put(data);
				number++;
			}
		} catch(InterruptedException e) {
			//.
		}
	}
}

생성된 데이터를 받아 처리하는 Consumer 클래스는 다음과 같은데, 받은 문자열 데이터를 조합한 새로운 문자열을 출력한다.

package tstThread;

public class Consumer extends Thread {
	private final Channel channel;
	
	public Consumer(String name, Channel channel) {
		super(name);
		this.channel = channel;
	}
	
	public void run() {
		try {
			while(true) {
				String data = channel.get();
				
				//Thread.sleep(100);
				System.out.println("#: " + getName() + " consumes " + data);
			}
		} catch(InterruptedException e) {
			//.
		}
	}
}

다음은 Producer가 생성한 데이터를 저장하는 Channel 클래스이다.

package tstThread;

import java.util.LinkedList;

public class Channel {
	private final LinkedList<String> dataBuffer = new LinkedList<String>();
	private final int maxSize;
	
	public Channel(int maxSize) {
		this.maxSize = maxSize;
	}
	
	public synchronized void put(String data) throws InterruptedException {
		while(dataBuffer.size() >= maxSize) {
			wait();
		}
		
		dataBuffer.addLast(data);
		
		notifyAll();
	}
	
	public synchronized String get() throws InterruptedException {
		while(dataBuffer.size() <= 0) {
			wait();
		}
		
		String data = dataBuffer.pollFirst();
		notifyAll();
		
		//System.out.println("\t" + dataBuffer.size());
		
		return data;
	}
}

실행결과는 다음과 같다.

#: CONSUMER-C consumes [DATA-58449@PRODUCER-1]
#: CONSUMER-C consumes [DATA-59980@PRODUCER-2]
#: CONSUMER-B consumes [DATA-58456@PRODUCER-1]
#: CONSUMER-C consumes [DATA-59981@PRODUCER-2]
#: CONSUMER-B consumes [DATA-59982@PRODUCER-2]
#: CONSUMER-C consumes [DATA-59983@PRODUCER-2]

.
.
.

#: CONSUMER-B consumes [DATA-58476@PRODUCER-1]
#: CONSUMER-A consumes [DATA-58477@PRODUCER-1]
#: CONSUMER-B consumes [DATA-58478@PRODUCER-1]
#: CONSUMER-A consumes [DATA-58479@PRODUCER-1]
#: CONSUMER-B consumes [DATA-58480@PRODUCER-1]
#: CONSUMER-A consumes [DATA-58481@PRODUCER-1]
#: CONSUMER-C consumes [DATA-58475@PRODUCER-1]
#: CONSUMER-C consumes [DATA-59988@PRODUCER-2]
#: CONSUMER-D consumes [DATA-58472@PRODUCER-1]

Read-Write Lock 패턴

패턴 명칭

Read-Write Lock

필요한 상황

Database에서 어떤 테이블이 있다고 하자. 이 테이블은 다수의 클라이언트에서 동시에 읽고 쓰이는 대상이다. 어떤 클라이언트는 이 테이블에 데이터를 쓰고, 어떤 또 다른 클라이언트는 이 테이블에서 데이터를 읽는다. 그런데 만약 2개의 클라이언트에서 특정 레코드 데이터를 동시에 각각 쓰고 읽기를 수행하면 읽는 쓰레드 쪽에서는 망가진 데이터를 읽을 수 있다. 또한 만약 2개의 클라이언트에서 동시에 특정 레코드 데이터를 쓰려고 할때 망가진 데이터가 저장될 수 있다. 문제가 없는 경우는 2개의 클라이언트가 동시에 특정 레코드의 데이터를 읽을 때 뿐이다. 이 패턴은 어떤 데이터에 대해 동시에 읽고 쓸때의 상황에서, 또는 동시에 데이터를 쓰는 상황에서 문제가 발생하지 않도록 해주는데 목적이 있다.

예제 코드

위의 클래스 다이어그램에서 언급된 클래스 중 Reader와 Writer는 각각 어떤 데이터를 읽고 쓰는 스레드이며, Data는 이 쓰레드가 읽거나 쓰는 대상이 되는 데이터를 담고 있는 클래스이다. Lock은 읽기와 쓰기에 대한 스레드 제어를 위한 클래스이다. 이 클래스들 사용하는 예제 코드는 다음과 같다.

package tstThread;

public class Main {
	public static void main(String[] args) {
		Data data = new Data(10);
		
		new Reader(data).start();
		new Reader(data).start();
		new Reader(data).start();
		
		String[] weeks = {"월", "화", "수", "목", "금", "토", "일"};
		new Writer(data, weeks).start();
		
		String[] numbers = {"ONE", "TWO", "THREE", "FOUR", "FIVE", "SIX", "SEVEN", "EIGHT", "NINE"};
		new Writer(data, numbers).start();
		
		String[] digits = {"1", "2", "3", "4", "5", "6", "7", "8", "9"};
		new Writer(data, digits).start();
	}
}

Reader 클래스는 다음과 같다.

package tstThread;

public class Reader extends Thread {
	private final Data data;
	
	public Reader(Data data) {
		this.data = data;
	}
	
	public void run() {
		try {
			while(true) {
				String v = data.read();
				System.out.println(Thread.currentThread().getName() + " -> " + v);
			}
		} catch(InterruptedException e) {
			//.
		}
	}
}

Writer 클래스는 다음과 같다.

package tstThread;

import java.util.Random;

public class Writer extends Thread {
	private static final Random random = new Random();
	private final Data data;
	private final String[] inputs;
	private int index = 0;
	
	public Writer(Data data, String[] inputs) {
		this.data = data;
		this.inputs = inputs;
	}
	
	public void run() {
		try {
			while(true) {
				String input = inputs[index];
				index = (index + 1) % inputs.length;
				data.write(input);
				Thread.sleep(random.nextInt(1000));
			}
		} catch(InterruptedException e) {
			//.
		}
	}	
}

Data 클래스는 다음과 같다.

package tstThread;

public class Data {
	private final StringBuilder buffer;
	private final Lock lock = new Lock();
	
	public Data(int size) {
		this.buffer = new StringBuilder("#empty");
	}
	
	public String read() throws InterruptedException {
		lock.readLock();
		try {
			Thread.sleep(100);		
			return buffer.toString();
		} finally {
			lock.readUnlock();
		}
	}
	
	public void write(String v) throws InterruptedException {
		lock.writeLock();
		try {
			Thread.sleep(100);
			buffer.setLength(0);
			buffer.append(v);
		} finally {
			lock.writeUnlock();
		}
	}
}

Lock 클래스는 다음과 같다.

package tstThread;

public class Lock {
	private int readingReaders = 0;
	private int waitingReaders = 0;
	private boolean preferReader = false;
	
	private int waitingWriters = 0;
	private int writingWriters = 0;
	private boolean preferWriter = true;
	
	public synchronized void readLock() throws InterruptedException {
		waitingReaders++;
		try {
			while(writingWriters > 0 || (preferWriter && waitingWriters > 0)) {
				wait();
			}
		} finally {
			waitingReaders--;
		}
		
		readingReaders++;
	}
	
	public synchronized void readUnlock() {
		readingReaders--;
		preferWriter = true;
		preferReader = false;
		notifyAll();
	}
	
	public synchronized void writeLock() throws InterruptedException {
		waitingWriters++;
		try {
			while(readingReaders > 0 || writingWriters > 0 || (preferReader && waitingReaders > 0)) {
				wait();
			}
			
		} finally {
			waitingWriters--;
		}
		
		writingWriters++;
	}
	
	public synchronized void writeUnlock() {
		writingWriters--;
		preferWriter = false;
		preferReader = true;
		notifyAll();
	}
}

실행 결과는 다음과 같다.

Thread-2 -> #empty
Thread-1 -> #empty
Thread-0 -> #empty
Thread-0 -> 월
Thread-2 -> 월
Thread-1 -> 월
Thread-0 -> 1

.
.
.

Thread-0 -> 3
Thread-0 -> FOUR
Thread-2 -> FOUR
Thread-1 -> FOUR

JSON에 대한 Java 객체 직렬화, GSON

구글의 GSON은 JSON 데이터를 Java 객체로 생성해주는 라이브러리입니다.

GSON에 대한 jar 라이브러리를 참조(필자는 gson-2.3.1.jar를 사용)합니다. 먼저 첫번째 예제는 JSON 문자열 대한 객체 직렬화입니다.

String strJSON = "{'name': 'Dip2K', 'age': 44 }";

Gson gson = new GsonBuilder().create();
Person person = gson.fromJson(strJSON, Person.class);
System.out.println(person);

strJSON에 담긴 JSON 데이터를 Person이라는 객체로 생성하고 있습니다. Person은 새롭게 정의한 클래스로 다음과 같습니다.

package tstThread;

import java.util.ArrayList;
import java.util.HashMap;

public class Person {
    private String name;
    private int age;
	
    private Car car = new Car("TEST");
	
    public Person(String name, int age) {	
        this.name = name;
        this.age = age;
    }
	
    @Override
    public String toString() {
        return "[" + name + ", " + age + "]";
    }
}

실행 결과는 다음과 같습니다.

[Dip2K, 44]

다음은 자바 객체를 JSON 데이터로 역직렬화하는 코드예입니다.

Person person = new Person("Dip2K", 44);

Gson gson = new GsonBuilder().create();
String strJson = gson.toJson(person);

System.out.println(strJson);

위의 코드에서 사용된 Person 클래스는 정의는 다음과 같습니다.

package tstThread;

import java.util.ArrayList;
import java.util.HashMap;

public class Person {
    private String name;
    private int age;
	
    private Car car = new Car("TEST");
	
    private ArrayList<String> array = new ArrayList<String>();
    private HashMap<String, Integer> map = new HashMap<String, Integer>();
	
    public Person(String name, int age) {	
        this.name = name;
        this.age = age;
		
        array.add("Card1");
        array.add("Card2");
        array.add("Card3");
		
        map.put("KEY1", 100);
        map.put("KEY2", 200);
        map.put("KEY3", 300);
    }
	
    @Override
    public String toString() {
        return "[" + name + ", " + age + ", " + array.get(1) + "]";
    }
}

새롭게 정의된 Person를 보면 내부 필드 객체로 Car 클래스 타입의 객체를 하나 더 갖고 있는데, 이 Car 클래스는 다음과 같습니다.

package tstThread;

public class Car {
    private String name;
	
    public Car(String name) {
        this.name = name;
    }
}

이 예제는 단순한 클래스 객체 뿐만이 아나리 객체 안에 또 다른 객체가 담겨 있을때에 대한 GSON의 역직렬화가 가능하다는 것을 확인하기 위함입니다. 실행해 보면 다음과 같습니다.

{"name":"Dip2K","age":44,"car":{"name":"TEST"},"array":["Card1","Card2","Card3"],"map":{"KEY2":200,"KEY1":100,"KEY3":300}}

이제 위의 예제에서 얻는 JSON 문자열을 다시 Person 객체로 직열화하는 예제입니다.

String strJSON = "{'name':'Dip2K','age':44,'car':{'name':'TEST'},'array':['Card1','Card2','Card3'],'map':{'KEY2':200,'KEY1':100,'KEY3':300}}";

Gson gson = new GsonBuilder().create();
Person person = gson.fromJson(strJSON, Person.class);
System.out.println(person);

결과는 다음처럼 직열화가 잘된것을 확인할 수 있습니다.

[Dip2K, 44, Card2]

예제로 정리하는 코틀린의 코루틴(Kotlin Coroutine)

코루틴은 스레드와 기능적으로 같지만, 스레드에 비교하면 좀더 가볍고 유연하며 한단계 더 진화된 병렬 프로그래밍을 위한 기술입니다. 하나의 스레드 내에서 여러개의 코루틴이 실행되는 개념인데, 아래의 코드는 동일한 기능을 스레드와 코루틴으로 각각 구현한 코드의 예시입니다.

Thread(Runnable {
    for(i in 1..10) {
        Thread.sleep(1000L)
        print("I'm working in Thread.")
    }
}).start()

GlobalScope.launch() {
    repeat(10) {
        delay(1000L)
        print("I'm working in Coroutine.")
    }
}

아래는 코루틴에 대해서 초점을 맞춰서 가장 간단한 코루틴의 예제입니다.

print("Start Main Thread")

GlobalScope.launch {
    delay(3000)
    print("in Coroutine ...")
}

print("End Main Thread")

코루틴은 GlobalScope.launch로 정의되며 { .. } 으로 묶은 코드가 비동기적으로 실행됩니다. 실행 결과는 다음과 같습니다.

V: Start Main Thread
V: End Main Thread
V: in Coroutine ...

다음은 비동기적으로 실행된 코루틴이 완료되어 그 결과를 반환받는 예제입니다.

GlobalScope.launch {
    launch {
        print("At Here!")
    }

    val value: Int = async {
        var total = 0
        for (i in 1..10) total += i
        total
    }.await()

    print("$value")
}

결과는 다음과 같습니다.

V: At Here!
V: 55

다음 코드 역시 비동기적으로 실행된 코루틴의 완료를 기다리고 그 결과를 반환받아 출력하는 예제입니다.

GlobalScope.launch {
    val x = doSomething()
    print("done something, $x")
}

private suspend fun doSomething():Int {
    val value: Int = GlobalScope.async(Dispatchers.IO) {
        var total = 0
        for (i in 1..10) total += i
        print("do something in a suspend method: $total")
        total
    }.await()

    return value
}

비동기적으로 실행되는 코루틴을 별도의 함수로 분리했는데, 코루틴 내부에서 실행되는 함수는 suspend로 지정해야 합니다. 위의 코드의 결과는 다음과 같습니다.

V: do something in a suspend method: 55
V: done something, 55

이번에는 2개의 코루틴을 실행하고 이 2개의 결과를 받아 출력하는 예제입니다.

print("Start...")
GlobalScope.launch(Dispatchers.Main) {
    val job1 = async(Dispatchers.IO) {
        var total = 0
        for (i in 1..10) {
            total += i
            delay(100)
        }
        print("job1")
        total
    }

    val job2 = async(Dispatchers.Main) {
        var total = 0
        for (i in 1..10) {
            delay(100)
            total += i
        }
        print("job2")
        total
    }

    val result1 = job1.await()
    val result2 = job2.await()

    print("results are $result1 and $result2")
}
print("End.")

위의 코드에서 볼 수 있는 Dispatchers.Main, Dispatchers.IO는 각각 UI 변경 등을 처리하는 메인 스레드 그리고 입출력 연산을 처리하기에 적합한 IO 스레드를 의미하며, 코루틴들은 이처럼 지정된 스레드 내에서 실행됩니다. 위 코드의 결과는 다음과 같습니다.

V: Start...
V: End.
V: job1
V: job2
V: results are 55 and 55

다음은 코루틴이 완료를 기다리기 위한 await 호출을 사용하지 않는 또다른 방법입니다.

GlobalScope.launch(Dispatchers.IO) {
    val v = withContext(Dispatchers.Main) {
        var total = 0
        for (i in 1..10) {
            delay(100)
            total += i
        }

        total
    }

    print("result: $v")
    print("Do something in IO thread")
}

withContext를 써서 새로운 코루틴을 다른 스레드에서 동기적으로 실행하도록 하는 코드입니다. 결과는 다음과 같습니다.

V: result: 55
V: Do something in IO thread

launch는 Job 객체를 반환하는데, 이를 통해 다음 예제처럼 코루틴을 중간에 중단시킬 수 있습니다.

print("start..")

val job = GlobalScope.launch() {
    repeat(10) {
        delay(1000L)
        print("I'm working.")
    }
}

runBlocking {
    delay(3000L)
    job.cancel()
}

print("stop")

실행 결과는 다음과 같습니다.

V: start..
V: I'm working.
V: I'm working.
V: stop

이번에는 Job 객체를 통해 코루틴이 완전이 종료될때까지 기다리는 예제입니다.

print("start..")

val job = GlobalScope.launch() {
    repeat(10) {
        delay(1000L)
        print("I'm working.")
    }
}

runBlocking {
    job.join()
}

print("stop")

결과는 다음과 같습니다.

V: start..
V: I'm working.
V: I'm working.
V: I'm working.
V: I'm working.
V: I'm working.
V: I'm working.
V: I'm working.
V: I'm working.
V: I'm working.
V: I'm working.
V: stop

코루틴은 정해진 시간이 되면 코루틴의 완료되지 못할지라도 중지하게 할 수 있는데, 아래의 코드가 바로 그 예입니다.

print("start")

val job = GlobalScope.launch {
    withTimeout(4000L) {
        repeat(10) {
            delay(1000L)
            print("I'm working.")
        }
    }
}

print("end")

결과는 다음과 같습니다.

V: start
V: end
V: I'm working.
V: I'm working.
V: I'm working.

코루틴은 채널(Channel)이라는 개념을 통해 코루틴에서 생성한 데이터를 또 다른 코루틴에게 전달할 수 있습니다. 아래의 코드는 코루틴에서 1~5까지의 정수에 대한 제곱값을 생성하면 생성된 정수 4개를 또 다른 코루틴에서 받아 출력하는 예입니다.

runBlocking {
    print("start")

    val channel = Channel<Int>()

    launch {
        for (x in 1..5) {
            channel.send(x * x)
        }
    }

    repeat(5) {
        val v = channel.receive()
        print("$v")
    }

    print("end")
}

결과는 다음과 같습니다.

V: start
V: 1
V: 4
V: 9
V: 16
V: 25
V: end

데이터를 생성하는 쪽이나 받는 쪽에서는 얼마나 많은 데이터를 생성할지 또는 받을지를 예측할 수 없는 경우가 대부분입니다. 데이터를 생성하는 쪽에서 채널의 close 함수를 호출하면 받는쪽에서 더 이상 데이터가 없다는 것을 인지하게 되는데, 아래는 이에 대한 코드 예입니다.

runBlocking {
    print("start")

    val channel = Channel<Int>()

    launch {
        for(x in 1..5) channel.send(x*x)
        channel.close()
    }

    for(y in channel) print("$y")

    print("end")
}

결과는 다음과 같습니다.

V: start
V: 1
V: 4
V: 9
V: 16
V: 25
V: end

다음은 데이터를 생성하는 코루틴을 함수화하여 이 함수를 통해 생성된 데이터를 처리하는 예제입니다.

runBlocking {
    print("start")

    val squares = procedureSquares()
    squares.consumeEach {
        print("$it")
    }

    print("end")
}

private fun CoroutineScope.procedureSquares(): ReceiveChannel<Int> = produce {
    for(x in 1..5) send(x*x)
}

결과는 다음과 같습니다.

V: start
V: 1
V: 4
V: 9
V: 16
V: 25
V: end

다음은 데이터를 생성하는 코루틴을 파이프라인 형태로 묶어 처리하는 것으로, 첫번째 코루틴에서 생성한 값을 또 다른 코루틴에서 받아 처리하여 또 다른 코루틴으로 전달하는 예제입니다.

runBlocking {
    print("start")

    val numbers = productNumbers()
    val squares = squares(numbers)

    for(i in 1..5) print("${squares.receive()}")

    print("end")

    coroutineContext.cancelChildren()
}

private fun CoroutineScope.productNumbers() = produce<Int> {
    var x = 1
    while(true) {
        print("send ${x} on productNumbers")
        send(x++)
        delay(100)
    }
}

private fun CoroutineScope.squares(numbers:ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
    for(x in numbers) {
        print("send ${x} on squares")
        send(x*x)
    }
}

결과는 다음과 같습니다.

V: start
V: send 1 on productNumbers
V: send 1 on squares
V: 1
V: send 2 on productNumbers
V: send 2 on squares
V: 4
V: send 3 on productNumbers
V: send 3 on squares
V: 9
V: send 4 on productNumbers
V: send 4 on squares
V: 16
V: send 5 on productNumbers
V: send 5 on squares
V: 25
V: end

데이터를 생성하는 코루틴은 1개지만, 이를 원활하게 처리하기 위해 여러개의 코루틴으로 생성된 데이터를 처리할 수 있습니다. 아래는 데이터를 생성하는 코루틴 1개와 생성된 데이터를 처리하는 5개의 코루틴에 대한 예제입니다.

runBlocking {
    val producer = productNumbers()
    repeat(5) {
        launchProcessor(it, producer)

    }

    delay(1000L)
    producer.cancel()
}

fun CoroutineScope.launchProcessor(id:Int, channel: ReceiveChannel<Int>) {
    launch {
        for(msg in channel) {
            print("Processor #$id received $msg")
        }
    }
}

private fun CoroutineScope.productNumbers() = produce<Int> {
    var x = 1
    while(true) {
        print("send ${x} on productNumbers")
        send(x++)
        delay(100)
    }
}

결과는 다음과 같습니다.

V: send 1 on productNumbers
V: Processor #0 received 1
V: send 2 on productNumbers
V: Processor #0 received 2
V: send 3 on productNumbers
V: Processor #1 received 3
V: send 4 on productNumbers
V: Processor #2 received 4
V: send 5 on productNumbers
V: Processor #3 received 5
V: send 6 on productNumbers
V: Processor #4 received 6
V: send 7 on productNumbers
V: Processor #0 received 7
V: send 8 on productNumbers
V: Processor #1 received 8
V: send 9 on productNumbers
V: Processor #2 received 9
V: send 10 on productNumbers
V: Processor #3 received 10

반대로 데이터를 생성하는 코루틴은 여러개이고 처리하는 코루틴은 1개인 경우도 있습니다. 아래는 데이터를 생성하는 코루틴 2개와 생성된 데이터를 처리하는 코루틴 1개에 대한 예제입니다.

runBlocking {
    val channel = Channel<String>()
    launch {
        sendString(channel, "foo", 200L)
    }

    launch {
        sendString(channel, "BAR", 500L)
    }

    repeat(6) {
        print("${channel.receive()}")
    }

    coroutineContext.cancelChildren()
}

private suspend fun sendString(channel: SendChannel<String>, s:String, time:Long) {
    while(true) {
        delay(time)
        channel.send(s)
    }
}

결과는 다음과 같습니다.

V: foo
V: foo
V: BAR
V: foo
V: foo
V: BAR

아래의 예제는 2개의 코루틴에서 하나의 데이터에 대해 어떤 처리를 해서 주고 받는 기능에 대한 코드입니다.

print("start")
data class Ball(var hits:Int)

suspend fun player(name:String, table: Channel) {
    for(ball in table) {
        ball.hits++
        print("$name $ball")
        delay(300)
        table.send(ball)
    }
}

runBlocking {
    var table = Channel<Ball>()

    launch {
        player("ping", table)
    }

    launch {
        player("pong", table)
    }

    table.send(Ball(0))
    delay(1000)
    coroutineContext.cancelChildren()
}
print("end")

결과는 다음과 같습니다.

V: start
V: ping Ball(hits=1)
V: pong Ball(hits=2)
V: ping Ball(hits=3)
V: pong Ball(hits=4)
V: end