[Kotlin Coroutines] 16장. 채널
|introduction
- 코루틴끼리의 통신을 위한 기본적인 방법으로 채널 API 가 추가
- 책을 교환하는 데 사용되는 공공 책장을 보면, 다른 사람이 찾는 책을 한 사람이 먼저 가지고 와야 하는데, 채널이 동작하는 방식과 비슷
- 채널은 송신자와 수신자의 수에 제한이 없으며, 채널을 통해 전송된 모든 값은 단 한 번만 받을 수 있음
- Channel 은 두 개의 서로 다른 인터페이스를 구현한 하나의 인터페이스
- SendChannel 은 원소를 보내거나(또는 더하거나) 채널을 닫는 용도로 사용됨
- ReceiveChannel 은 원소를 받을 때(또는 꺼낼 때) 사용됨
|
|
|
|
|
|
두 인터페이스는 구분되어 있고, 채널의 진입점을 제한하기 위해 둘 중 하나만 노출시키는 것도 가능
send, receive 모두 중단 함수라는 것을 확인할 수 있음
- 원소를 보내고 받는 함수가 중단 함수인 것은 필수적인 특징
receive 를 호출했는데 채널에 원소가 없는 경우
- 코루틴은 원소가 들어올 때까지 중단됨
send 를 호출했는데 채널의 용량이 다 찼을 경우 중단됨
- 대부분의 채널은 용량이 제한되어 있다는 걸 나중에 확인 가능
만약 중단 함수가 아닌 함수로 보내거나 받아야 한다면?
- trySend, tryReceive 를 사용할 수 있음
- 두 연산 모두 성공했는지 실패했는지에 대한 정보를 담고 있는 ChannelResult 를 즉시 반환
- 용량이 제한적인 채널에만 사용 가능
- (버퍼가 없는) 랑데뷰 채널에서는 작동하지 않기 때문
채널은 송신자와 수신자의 수에 제한이 없음
하지만 채널의 양쪽 끝에 각각 하나의 코루틴만 있는 경우가 가장 일반적
채널의 가장 간단한 예
- 각기 다른 코루틴에 생성자(송신자)와 소비자(수신자)가 있어야 함
- 생성자는 원소를 보내고, 소비자는 원소를 받음
- 아래가 그 예
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
suspend fun main(): Unit = coroutineScope { val channel = Channel<Int>() launch { repeat(5) { index -> delay(1000) println("Producing next one") channel.send(index * 2) } } launch { repeat(5) { val received = channel.receive() println(received) } } } // (1 sec) // Producing next one // 0 // (1 sec) // Producing next one // 2 // (1 sec) // Producing next one // 4 // (1 sec) // Producing next one // 6 // (1 sec) // Producing next one // 8
위와 같은 구현 방식은 불완전함
- 수신자는 얼마나 많은 원소를 보내는 지 알아야 함
- 수신자가 이런 정보를 아는 경우는 별로 없기에, 송신자가 보내는 만큼 수신자가 기다리는 방식을 선호
- 채널이 닫힐 때까지 원소를 받기 위해 for 루프 또는 consumeEach 함수를 사용할 수 있음
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
suspend fun main(): Unit = coroutineScope { val channel = Channel<Int>() launch { repeat(5) { index -> println("Producing next one") delay(1000) channel.send(index * 2) } channel.close() } launch { channel.consumeEach { element -> println(element) } // or // for (element in channel) { // println(element) // } } } // Producing next one // (1 sec) // Producing next one // 0 // (1 sec) // Producing next one // 2 // (1 sec) // Producing next one // 4 // (1 sec) // Producing next one // 6 // (1 sec) // 8
위와 같이 원소를 보내는 방식의 문제점
- 예외가 발생했을 때 채널을 닫는 걸 깜박하기 쉽다는 것
- 예외로 인해 코루틴이 원소를 보내는 걸 중단하면, 다른 코루틴은 원소를 영원히 기다려야 함 😱
- ReceiveChannel 을 반환하는 코루틴 빌더인 produce 함수를 사용하는 것이 좀 더 편리함
- produce 함수는 빌더로 시작된 코루틴이 어떻게 종료되든 상관없이 채널을 닫음
- 끝나거나, 중단되거나, 취소되거나
- 따라서 반드시 close 를 호출함
- produce 빌더는 채널을 만드는 가장 인기있고, 안전하고, 편리한 방법
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
@OptIn(ExperimentalCoroutinesApi::class) suspend fun main(): Unit = coroutineScope { val channel = produce { repeat(5) { index -> println("Producing next one") delay(1000) send(index * 2) } } for (element in channel) { println(element) } } // Producing next one // (1 sec) // 0 // Producing next one // (1 sec) // 2 // Producing next one // (1 sec) // 4 // Producing next one // (1 sec) // 6 // Producing next one // (1 sec) // 8
채널 타입
- 설정한 용량 크기에 따라 네 가지로 구분
- 무제한 (Unlimited)
- 제한 없는 용량 버퍼를 가진
Channel.UNLIMITED
로 설정된 채널 - send 가 중단되지 않음
- 제한 없는 용량 버퍼를 가진
- 버퍼 (Buffered)
- 특정 용량 크기 또는
Channel.BUFFERED
로 설정된 채널 - Channel.BUFFERED
- 기본 값은 64
- JVM 의 kotlinx.coroutines.channels.defaultBuffer 를 설정하면 오버라이드 가능
- 특정 용량 크기 또는
- 랑데뷰 (Rendezvous)
- ‘약속’을 의미
- 용량이 0 이거나
Channel.RENDEZVOUS
인 채널 Channel.RENDEZVOUS
- 용량이 0
- 송신자와 수신자가 만날 때만 원소를 교환함
- 융합 (Conflated)
- 버퍼의 크기가 1인
Channel.CONFLATED
를 가진 채널 - 새로운 원소가 이전 원소를 대체
- 버퍼의 크기가 1인
- 채널이 가진 용량을 실제 예를 보면서 확인해보자
|
|
|
|
|
|
|
|
버퍼 오버플로일 때
채널을 커스텀화하기 위해 버퍼가 꽉 찼을 때의 행동을 정의할 수 있음
- onBufferOverflow 파라미터
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
internal fun <E> CoroutineScope.produce( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 0, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND, start: CoroutineStart = CoroutineStart.DEFAULT, onCompletion: CompletionHandler? = null, @BuilderInference block: suspend ProducerScope<E>.() -> Unit ): ReceiveChannel<E> { val channel = Channel<E>(capacity, onBufferOverflow) val newContext = newCoroutineContext(context) val coroutine = ProducerCoroutine(newContext, channel) if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion) coroutine.start(start, coroutine, block) return coroutine }
옵션
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
public enum class BufferOverflow { /** * Suspend on buffer overflow. */ SUSPEND, /** * Drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend. */ DROP_OLDEST, /** * Drop **the latest** value that is being added to the buffer right now on buffer overflow * (so that buffer contents stay the same), do not suspend. */ DROP_LATEST }
- SUSPEND
- 기본 옵션
- 버퍼가 가득 찼을 때, send 메서드가 중단됨
- DROP_OLDEST
- 버퍼가 가득 찼을 때, 가장 오래된 원소가 제거됨
- DROP_LATEST
- 버퍼가 가득 찼을 때, 가장 최근의 원소가 제거됨
- SUSPEND
채널 용량 중 Channel.CONFLATED 는 용량을 1로 설정하고 onBufferOverflow 를 DROP_OLDEST 로 설정한 것을 알 수 있음
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
public fun <E> Channel( capacity: Int = RENDEZVOUS, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND, onUndeliveredElement: ((E) -> Unit)? = null ): Channel<E> = when (capacity) { RENDEZVOUS -> { if (onBufferOverflow == BufferOverflow.SUSPEND) BufferedChannel(RENDEZVOUS, onUndeliveredElement) // an efficient implementation of rendezvous channel else ConflatedBufferedChannel(1, onBufferOverflow, onUndeliveredElement) // support buffer overflow with buffered channel } CONFLATED -> { require(onBufferOverflow == BufferOverflow.SUSPEND) { "CONFLATED capacity cannot be used with non-default onBufferOverflow" } ConflatedBufferedChannel(1, BufferOverflow.DROP_OLDEST, onUndeliveredElement) } UNLIMITED -> BufferedChannel(UNLIMITED, onUndeliveredElement) // ignores onBufferOverflow: it has buffer, but it never overflows BUFFERED -> { // uses default capacity with SUSPEND if (onBufferOverflow == BufferOverflow.SUSPEND) BufferedChannel(CHANNEL_DEFAULT_CAPACITY, onUndeliveredElement) else ConflatedBufferedChannel(1, onBufferOverflow, onUndeliveredElement) } else -> { if (onBufferOverflow === BufferOverflow.SUSPEND) BufferedChannel(capacity, onUndeliveredElement) else ConflatedBufferedChannel(capacity, onBufferOverflow, onUndeliveredElement) } }
require(onBufferOverflow == BufferOverflow.SUSPEND)
조건이 좀 특이한 것으로 보임- 현재 produce 함수에서 onBufferOverflow 를 설정할 수 없으므로, 오버플로 옵션을 변경하려면 Channel 함수를 사용해 채널을 정의해야 함
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
suspend fun main(): Unit = coroutineScope { val channel = Channel<Int>( capacity = 2, onBufferOverflow = BufferOverflow.DROP_OLDEST, ) launch { repeat(5) { index -> channel.send(index * 2) delay(100) println("Sent") } channel.close() } delay(1000) for (element in channel) { println(element) delay(1000) } } // Sent // (0.1 sec) // Sent // (0.1 sec) // Sent // (0.1 sec) // Sent // (0.1 sec) // Sent // (1 - 4 * 0.1 = 0.6 sec) // 6 // (1 sec) // 8
전달되지 않은 원소 핸들러
- Channel 함수에서 반드시 알아야 하는 또 다른 파라미터 onUndeliveredElement
|
|
- 어떠한 이유로 처리되지 않았을 때 호출됨
- 주로 채널에서 보낸 자원을 닫을 때 사용됨
팬아웃(Fan-out)
- 여러 개의 코루틴이 하나의 채널로부터 원소를 받을 수도 있음
- 하지만 원소를 적절히 처리하려면 반드시 for 루프를 사용해야 함
- consumeEach 는 여러 개의 코루틴이 사용하기에는 안전하지 않습니다.
why?
- consumeEach 는 여러 개의 코루틴이 사용하기에는 안전하지 않습니다.
|
|
- 원소는 공평하게 배분됨
- 채널은 원소를 기다리는 코루틴들을 FIFO 큐로 가지고 있음
- 위 예제에서 코루틴이 순차적으로 원소를 받는 이유 (0, 1, 2, 0, 1, 2, …)
팬인(Fan-in)
- 여러 개의 코루틴이 하나의 채널로 원소를 전송할 수 있음
|
|
- 다수의 채널을 하나의 채널로 합쳐야 할 경우가 있음
- 이런 경우 produce 함수로 여러 개의 채널을 합치는
fanIn
함수를 사용할 수 있음
- 이런 경우 produce 함수로 여러 개의 채널을 합치는
파이프라인
- 한 채널로부터 받은 원소를 다른 채널로 전송하는 경우를 파이프라인이라고 부름
|
|
통신의 기본 형태로서의 채널
- 채널은 서로 다른 코루틴이 통신할 때 유용함
- 충돌이 발생하지 않으며 공평함을 보장
- 공유 상태로 인한 문제가 일어나지 않음
- 예시
- 여러 바리스타가 커피를 만드는 상황
- 각각의 바리스타는 서로 독립적으로 작업을 수행하는 코루틴이라 할 수 있음
- 커피의 종류가 다르면 준비하는 데 걸리는 시간도 다르지만, 주문은 받은 순서대로 처리하고 싶음
- 이를 해결하는 가장 쉬운 방법
- 주문을 채널로 받고 만들어진 커피를 다른 채널로 보내는 것
- 바리스타는 produce 빌더를 사용해 정의할 수 있음
- 파이프라인을 설정하고 이전에 정의한 fanIn 함수를 사용해 다른 바리스타들이 생성한 결과를 하나로 합칠 수 있음
실제 사용 예
- 온라인 쇼핑몰
- 엄청난 수의 판매자들이 제공하는 상품 정보가 변경되는 것을 감지해야 함
- 판매자가 정보를 변경할 때마다 갱신해야 할 상품 리스트를 찾고, 하나씩 업데이트하게 됨
- 음… 채널을 제공해서 잘 처리해보자.
요약
- 채널은 코루틴끼리 통신할 때 사용하는 강력한 기본 도구
- 송신자와 수신자의 수에 제한이 없음
- 채널을 통해 보내진 데이터는 단 한 번 받는 것이 보장됨
- 보통 produce 빌더를 통해 채널을 생성
- 채널은 특정 작업에 사용되는 코루틴의 수를 조절하는 파이프라인을 설정할 때 사용될 수 있음
- 최근에는 플로우를 채널과 연결해서 사용하는 경우가 많음