=================
== The Archive ==
=================

[Kotlin Coroutines] 24장. 공유 플로우와 상태 플로우

|

공유 플로우

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
suspend fun main(): Unit =
    coroutineScope {
        val mutableSharedFlow =
            MutableSharedFlow<String>(replay = 0)
        // or MutableSharedFlow<String>()

        launch {
            mutableSharedFlow.collect {
                println("#1 received $it")
            }
        }
        launch {
            mutableSharedFlow.collect {
                println("#2 received $it")
            }
        }

        delay(1000)
        mutableSharedFlow.emit("Message1")
        mutableSharedFlow.emit("Message2")
    }

// (1 sec)
// #2 received Message1
// #2 received Message2
// #1 received Message1
// #1 received Message2
// (program never ends)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@OptIn(ExperimentalCoroutinesApi::class)
suspend fun main(): Unit =
    coroutineScope {
        val mutableSharedFlow =
            MutableSharedFlow<String>(
                replay = 2,
            )
        mutableSharedFlow.emit("Message1")
        mutableSharedFlow.emit("Message2")
        mutableSharedFlow.emit("Message3")

        println(mutableSharedFlow.replayCache)
        // [Message2, Message3]

        launch {
            mutableSharedFlow.collect {
                println("#1 received $it")
            }
            // #1 received Message2
            // #1 received Message3
        }

        delay(100)
        mutableSharedFlow.resetReplayCache()
        println(mutableSharedFlow.replayCache) // []
    }
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
public interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
    override suspend fun emit(value: T)
    public fun tryEmit(value: T): Boolean
    public val subscriptionCount: StateFlow<Int>
    @ExperimentalCoroutinesApi
    public fun resetReplayCache()
}

public interface SharedFlow<out T> : Flow<T> {
    public val replayCache: List<T>
    override suspend fun collect(collector: FlowCollector<T>): Nothing
}

public fun interface FlowCollector<in T> {
    public suspend fun emit(value: T)
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
suspend fun main(): Unit =
    coroutineScope {
        val mutableSharedFlow = MutableSharedFlow<String>()
        val sharedFlow: SharedFlow<String> = mutableSharedFlow
        val collector: FlowCollector<String> = mutableSharedFlow

        launch {
            mutableSharedFlow.collect {
                println("#1 received $it")
            }
        }

        launch {
            sharedFlow.collect {
                println("#2 received $it")
            }
        }

        delay(1000)
        mutableSharedFlow.emit("Message1")
        collector.emit("Message2")
    }

// (1 sec)
// #2 received Message1
// #2 received Message2
// #1 received Message1
// #1 received Message2

shareIn

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
suspend fun main(): Unit =
    coroutineScope {
        val flow =
            flowOf("A", "B", "C")
                .onEach { delay(1000) }

        val sharedFlow: SharedFlow<String> =
            flow.shareIn(
                scope = this,
                started = SharingStarted.Eagerly,
                // replay = 0 (default)
            )

        delay(500)

        launch {
            sharedFlow.collect { println("#1 $it") }
        }

        delay(1000)

        launch {
            sharedFlow.collect { println("#2 $it") }
        }

        delay(1000)

        launch {
            sharedFlow.collect { println("#3 $it") }
        }
    }

// (1 sec)
// #1 A
// (1 sec)
// #2 B
// #1 B
// (1 sec)
// #1 C
// #2 C
// #3 C

상태 플로우

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public interface StateFlow<out T> : SharedFlow<T> {
    /**
     * The current value of this state flow.
     */
    public val value: T
}

public interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {
    public override var value: T
    public fun compareAndSet(expect: T, update: T): Boolean
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
suspend fun main() =
    coroutineScope {
        val state = MutableStateFlow("A")
        println(state.value) // A
        launch {
            state.collect { println("Value changed to $it") }
            // Value changed to A
        }

        delay(1000)
        state.value = "B" // Value changed to B

        delay(1000)
        launch {
            state.collect { println("and now it is $it") }
            // and now it is B
        }

        delay(1000)
        state.value = "C" // Value changed to C and now it is C
    }

stateIn

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
suspend fun main(): Unit =
    coroutineScope {
        val flow =
            flowOf("A", "B", "C")
                .onEach { delay(1000) }
                .onEach { println("Produced $it") }
        val stateFlow: StateFlow<String> = flow.stateIn(this)

        println("Listening")
        println(stateFlow.value)
        stateFlow.collect { println("Received $it") }
    }

// (1 sec)
// Produced A
// Listening
// A
// Received A
// (1 sec)
// Produced B
// Received B
// (1 sec)
// Produced C
// Received C
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
suspend fun main(): Unit =
    coroutineScope {
        val flow =
            flowOf("A", "B")
                .onEach { delay(1000) }
                .onEach { println("Produced $it") }

        val stateFlow: StateFlow<String> =
            flow.stateIn(
                scope = this,
                started = SharingStarted.Lazily,
                initialValue = "Empty",
            )

        println(stateFlow.value)

        delay(2000)
        stateFlow.collect { println("Received $it") }
    }

// Empty
// (2 sec)
// Received Empty
// (1 sec)
// Produced A
// Received A
// (1 sec)
// Produced B
// Received B

요약