=================
== The Archive ==
=================

[Kotlin Coroutines] 14장. 공유 상태로 인한 문제

|

개요

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
private class UserDownloader(
    private val api: NetworkService,
) {
    private val users = mutableListOf<User>()

    fun downloaded(): List<User> = users.toList()

    suspend fun fetchUser(id: Int) {
        val newUser = api.fetchUser(id)
        users += newUser
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
private class FakeNetworkService : NetworkService {
    override suspend fun fetchUser(id: Int): User {
        delay(2)
        return User("User$id")
    }
}

suspend fun main() {
    val downloader = UserDownloader(FakeNetworkService())
    coroutineScope {
        repeat(1_000_000) {
            launch {
                downloader.fetchUser(it)
            }
        }
    }
    print(downloader.downloaded().size) // 815781, 805445, 753823, ...
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
private var counter = 0

fun main() =
    runBlocking {
        massiveRun {
            counter++
        }
        println(counter) // 332598, 300346, 375650, ...
    }

private suspend fun massiveRun(action: suspend () -> Unit) =
    withContext(Dispatchers.Default) {
        repeat(1000) {
            launch {
                repeat(1000) { action() }
            }
        }
    }

동기화 블로킹

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
private var counter = 0

fun main() =
    runBlocking {
        val lock = Any()
        massiveRun {
            synchronized(lock) { // We are blocking threads!
                counter++
            }
        }
        println("Counter = $counter") // 1000000
    }

private suspend fun massiveRun(action: suspend () -> Unit) =
    withContext(Dispatchers.Default) {
        repeat(1000) {
            launch {
                repeat(1000) { action() }
            }
        }
    }

원자성

1
2
3
4
5
6
7
8
9
private var counter = AtomicInteger()

fun main() =
    runBlocking {
        massiveRun {
            counter.incrementAndGet()
        }
        println(counter.get()) // 1000000
    }
1
2
3
4
5
6
7
8
9
private var counter = AtomicInteger()

fun main() =
    runBlocking {
        massiveRun {
            counter.set(counter.get() + 1)
        }
        println(counter.get()) // 165668, 172644, 179273, ...
    }
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
class UserDownloader(
    private val api: NetworkService,
) {
    private val users = AtomicReference(listOf<User>())

    fun downloaded(): List<User> = users.get()

    suspend fun fetchUser(id: Int) {
        val newUser = api.fetchUser(id)
        users.getAndUpdate { it + newUser }
    }
}

Q. ☝️위의 예시가 제대로 종료안됨. 너무 오래 걸림

Q. 어떻게 원자성을 보장하나?

References

싱글스레드로 제한된 디스패치

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@OptIn(ExperimentalCoroutinesApi::class)
private val dispatcher =
    Dispatchers.IO
        .limitedParallelism(1)

private var counter = 0

fun main() =
    runBlocking {
        massiveRun {
            withContext(dispatcher) {
                counter++
            }
        }
        println(counter) // 1000000
    }

private suspend fun massiveRun(action: suspend () -> Unit) =
    withContext(Dispatchers.Default) {
        repeat(1000) {
            launch {
                repeat(1000) { action() }
            }
        }
    }

뮤텍스

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
suspend fun main() =
    coroutineScope {
        repeat(5) {
            launch {
                delayAndPrint()
            }
        }
    }

private val mutex = Mutex()

private suspend fun delayAndPrint() {
    mutex.lock()
    delay(1000)
    println("Done")
    mutex.unlock()
}

// (1 sec)
// Done
// (1 sec)
// Done
// (1 sec)
// Done
// (1 sec)
// Done
// (1 sec)
// Done
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
private val mutex = Mutex()

private var counter = 0

fun main() =
    runBlocking {
        massiveRun {
            mutex.withLock {
                counter++
            }
        }
        println(counter) // 1000000
    }

private suspend fun massiveRun(action: suspend () -> Unit) =
    withContext(Dispatchers.Default) {
        repeat(1000) {
            launch {
                repeat(1000) { action() }
            }
        }
    }

세마포어 (semaphore)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
suspend fun main() =
    coroutineScope {
        val semaphore = Semaphore(2)

        repeat(5) {
            launch {
                semaphore.withPermit {
                    delay(1000)
                    print(it)
                }
            }
        }
    }

// 01
// (1 sec)
// 32
// (1 sec)
// 4

요약

References