=================
== The Archive ==
=================

[Kotlin Coroutines] 12장. 디스패처

|

개요

코틀린 코루틴 라이브러리가 제공하는 중요한 기능

디스패처?

코틀린 코루틴에서 코루틴이 어떤 스레드에서 실행될지 정하는 것은 CoroutineContext

기본 디스패처

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
suspend fun main() =
    coroutineScope {
        repeat(1000) {
            // runBlocking 은 디스패처가 설정되어 있지 않으면 자신만의 디스패처를 사용함
            launch { // or launch(Dispatchers.Default) {
                // To make it busy
                List(1000) { Random.nextLong() }.maxOrNull()

                val threadName = Thread.currentThread().name
                println("Running on thread: $threadName")
            }
        }
    }

// Running on thread: DefaultDispatcher-worker-3
// Running on thread: DefaultDispatcher-worker-7
// Running on thread: DefaultDispatcher-worker-10
// Running on thread: DefaultDispatcher-worker-5
// Running on thread: DefaultDispatcher-worker-2
// Running on thread: DefaultDispatcher-worker-6
// Running on thread: DefaultDispatcher-worker-8
// Running on thread: DefaultDispatcher-worker-9
// Running on thread: DefaultDispatcher-worker-10
// Running on thread: DefaultDispatcher-worker-2
// Running on thread: DefaultDispatcher-worker-10
// ...

기본 디스패처를 제한하기

메인 디스패처

IO 디스패처

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
suspend fun main() {
    val time =
        measureTimeMillis {
            coroutineScope {
                repeat(50) {
                    launch(Dispatchers.IO) {
                        Thread.sleep(1000)
                    }
                }
            }
        }
    println(time) // 1038, 1045, 1038, ...
}

커스텀 스레드 풀을 사용하는 IO 디스패처

정해진 수의 스레드 풀을 가진 디스패처

싱글스레드로 제한된 디스패처

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
private var i = 0

suspend fun main(): Unit =
    coroutineScope {
        repeat(10_000) {
            launch(Dispatchers.IO) { // or Default
                i++
            }
        }
        delay(1000)
        println(i) // 9762, 9804, 9813, ...
    }
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
private var i = 0

@OptIn(ExperimentalCoroutinesApi::class)
suspend fun main(): Unit =
    coroutineScope {
        val dispatcher =
            Dispatchers.Default
                .limitedParallelism(1)

        repeat(10000) {
            launch(dispatcher) {
                i++
            }
        }
        delay(1000)
        println(i) // 10000
    }
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
@OptIn(ExperimentalCoroutinesApi::class)
suspend fun main(): Unit =
    coroutineScope {
        val dispatcher =
            Dispatchers.Default
                .limitedParallelism(1)

        val job = Job()
        repeat(5) {
            launch(dispatcher + job) {
                Thread.sleep(1000)
            }
        }
        job.complete()
        val time = measureTimeMillis { job.join() }
        println("Took $time") // Took 5024
    }

프로젝트 룸의 가상 스레드 사용하기

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
object LoomDispatcher : ExecutorCoroutineDispatcher() {
    val Dispatchers.LOOM: CoroutineDispatcher
        get() = LoomDispatcher

    override val executor: Executor =
        Executor { command ->
            Thread.startVirtualThread(command)
        }

    override fun dispatch(
        context: CoroutineContext,
        block: Runnable,
    ) {
        executor.execute(block)
    }

    override fun close() {
        error("Cannot be invoked on Dispatchers.LOOM")
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
suspend fun main() =
    measureTimeMillis {
        coroutineScope {
            repeat(100_000) {
                launch(Dispatchers.LOOM) {
                    Thread.sleep(1000)
                }
            }
        }
    }.let(::println) // 2349, 2219, 2452, ...
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
@OptIn(ExperimentalCoroutinesApi::class)
suspend fun main() =
    measureTimeMillis {
        val dispatcher =
            Dispatchers.IO
                .limitedParallelism(100_000)

        coroutineScope {
            repeat(100_000) {
                launch(dispatcher) {
                    Thread.sleep(1000)
                }
            }
        }
    }.let(::println) // 22989

제한받지 않는 디스패처

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@OptIn(ExperimentalCoroutinesApi::class, DelicateCoroutinesApi::class)
suspend fun main(): Unit =
    withContext(newSingleThreadContext("Thread1")) {
        var continuation: Continuation<Unit>? = null

        launch(newSingleThreadContext("Thread2")) {
            delay(1000)
            continuation?.resume(Unit)
        }

        launch(Dispatchers.Unconfined) {
            println(Thread.currentThread().name) // Thread1

            suspendCancellableCoroutine<Unit> {
                continuation = it
            }

            println(Thread.currentThread().name) // Thread2

            delay(1000)

            println(Thread.currentThread().name)
            // kotlinx.coroutines.DefaultExecutor
            // (used by delay)
        }
    }

메인 디스패처로 즉시 옮기기

다른 디스패처로 옮기는 걸 확인해보자

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class SwitchDispatchersTest {
    private val dispatcher =
        Executors
            .newSingleThreadExecutor()
            .asCoroutineDispatcher()

    @OptIn(ExperimentalCoroutinesApi::class)
    @BeforeEach
    fun setup() {
        Dispatchers.setMain(dispatcher)
    }

    @Test
    @DisplayName("Dispatchers.Main -> Dispatchers.Main.immediate")
    fun switchDispatchersTest0(): Unit =
        runTest {
            launch(Dispatchers.Main) {
                val mainDispatcher = coroutineContext[ContinuationInterceptor] as CoroutineDispatcher

                assert(mainDispatcher.toString() == "Dispatchers.Main")
                val rootThreadName = Thread.currentThread().name

                withContext(Dispatchers.Main.immediate) {
                    val mainImmediateDispatcher = coroutineContext[ContinuationInterceptor] as CoroutineDispatcher

                    assert(mainImmediateDispatcher.toString() == "Dispatchers.Main")
                    assert(mainDispatcher.toString() == mainImmediateDispatcher.toString())

                    val threadName = Thread.currentThread().name
                    assert(threadName == rootThreadName)
                }
            }
        }

    @Test
    @DisplayName("Dispatchers.Main -> Dispatchers.Unconfined")
    fun switchDispatchersTest1(): Unit =
        runTest {
            launch(Dispatchers.Main) {
                val mainDispatcher = coroutineContext[ContinuationInterceptor] as CoroutineDispatcher

                assert(mainDispatcher.toString() == "Dispatchers.Main")
                val rootThreadName = Thread.currentThread().name
                withContext(Dispatchers.Unconfined) {
                    val unconfinedDispatcher = coroutineContext[ContinuationInterceptor] as CoroutineDispatcher

                    assert(unconfinedDispatcher.toString() == "Dispatchers.Unconfined")
                    assert(mainDispatcher.toString() != unconfinedDispatcher.toString())

                    val threadName = Thread.currentThread().name
                    assert(threadName == rootThreadName)
                }
            }
        }
}

컨티뉴에이션 인터셉터

작업의 종류에 따른 각 디스패처의 성능 비교

중단블로킹CPU 집약적인 연산메모리 집약적인 연산
싱글스레드1,002100,00339,10394,358
디폴트 디스패처(스레드 8개)1,00213,0038,47321,461
IO 디스패처 (스레드 63개)1,0022,0039,89320,776
스레드 100개1,0021,00316,37921,004

요약