[Kotlin Coroutines] 24장. 공유 플로우와 상태 플로우
공유 플로우 Link to heading
- 브로드캐스트 채널과 비슷한 MutableSharedFlow
- 공유 플로우를 통해 메시지를 보내면 (내보내면) 대기하고 있는 모든 코루틴이 수신하게 됨
suspend fun main(): Unit =
coroutineScope {
val mutableSharedFlow =
MutableSharedFlow<String>(replay = 0)
// or MutableSharedFlow<String>()
launch {
mutableSharedFlow.collect {
println("#1 received $it")
}
}
launch {
mutableSharedFlow.collect {
println("#2 received $it")
}
}
delay(1000)
mutableSharedFlow.emit("Message1")
mutableSharedFlow.emit("Message2")
}
// (1 sec)
// #2 received Message1
// #2 received Message2
// #1 received Message1
// #1 received Message2
// (program never ends)
- 자식 코루틴이 launch 로 시작된 뒤 MutableSharedFlow 를 감지하고 있어서 종료가 안됨
- MutableSharedFlow 를 종료할 방법은 없으므로 프로그램을 종료하려면 전체 스코프를 취소하는 방법밖에 없음
- MutableSharedFlow 는 메시지를 보내는 작업을 유지할 수도 있음
- replay 인자 (default 0) 를 설정하면 마지막으로 전송한 값들이 정해진 수만큼 저장됨
- 코루틴이 감지를 시작하면 저장된 값들을 먼저 받게 됨
- resetReplayCache 를 사용하면 값을 저장한 캐시를 초기화할 수 있음
@OptIn(ExperimentalCoroutinesApi::class)
suspend fun main(): Unit =
coroutineScope {
val mutableSharedFlow =
MutableSharedFlow<String>(
replay = 2,
)
mutableSharedFlow.emit("Message1")
mutableSharedFlow.emit("Message2")
mutableSharedFlow.emit("Message3")
println(mutableSharedFlow.replayCache)
// [Message2, Message3]
launch {
mutableSharedFlow.collect {
println("#1 received $it")
}
// #1 received Message2
// #1 received Message3
}
delay(100)
mutableSharedFlow.resetReplayCache()
println(mutableSharedFlow.replayCache) // []
}
- MutableSharedFlow 는 SharedFlow 와 FlowCollector 모두를 상속함
public interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
override suspend fun emit(value: T)
public fun tryEmit(value: T): Boolean
public val subscriptionCount: StateFlow<Int>
@ExperimentalCoroutinesApi
public fun resetReplayCache()
}
public interface SharedFlow<out T> : Flow<T> {
public val replayCache: List<T>
override suspend fun collect(collector: FlowCollector<T>): Nothing
}
public fun interface FlowCollector<in T> {
public suspend fun emit(value: T)
}
suspend fun main(): Unit =
coroutineScope {
val mutableSharedFlow = MutableSharedFlow<String>()
val sharedFlow: SharedFlow<String> = mutableSharedFlow
val collector: FlowCollector<String> = mutableSharedFlow
launch {
mutableSharedFlow.collect {
println("#1 received $it")
}
}
launch {
sharedFlow.collect {
println("#2 received $it")
}
}
delay(1000)
mutableSharedFlow.emit("Message1")
collector.emit("Message2")
}
// (1 sec)
// #2 received Message1
// #2 received Message2
// #1 received Message1
// #1 received Message2
shareIn Link to heading
- 플로우는 사용자 액션, 데이터베이스 변경, 또는 새로운 메시지와 같은 변화를 감지할 때 주로 사용됨
- 다양한 클래스가 변화를 감지하는 상황에서 하나의 플로우로 여러 개의 플로우를 만들고 싶다면?
- SharedFlow 가 해결책이며, Flow → SharedFlow 로 바꾸는 가장 쉬운 방법이 shareIn 함수를 사용하는 것
suspend fun main(): Unit =
coroutineScope {
val flow =
flowOf("A", "B", "C")
.onEach { delay(1000) }
val sharedFlow: SharedFlow<String> =
flow.shareIn(
scope = this,
started = SharingStarted.Eagerly,
// replay = 0 (default)
)
delay(500)
launch {
sharedFlow.collect { println("#1 $it") }
}
delay(1000)
launch {
sharedFlow.collect { println("#2 $it") }
}
delay(1000)
launch {
sharedFlow.collect { println("#3 $it") }
}
}
// (1 sec)
// #1 A
// (1 sec)
// #2 B
// #1 B
// (1 sec)
// #1 C
// #2 C
// #3 C
shareIn 함수는 첫 번째 인자로 코루틴 스코프를 받음
세번째 인자로는 기본값이 0인 replay
두번째 인자 started 가 흥미로운데, 리스너의 수에 따라 값을 언제부터 감지할지 결정함
SharingStarted.Eagerly
- 즉시 값을 감지하기 시작
- 플로우로 값을 전송
- replay 값에 제한이 있고, 감지를 시작하기 전에 값이 나오면 일부를 유실할 수 있음
suspend fun main(): Unit = coroutineScope { val flow = flowOf("A", "B", "C") val sharedFlow: SharedFlow<String> = flow.shareIn( scope = this, started = SharingStarted.Eagerly, ) delay(100) launch { sharedFlow.collect { println("#1 $it") } } print("Done") } // (0.1 sec) // Done
SharingStarted.Lazily
- 첫번째 구독자가 나올 때 감시하기 시작함
- 첫번째 구독자는 내보내진 모든 값을 수신하는 것이 보장되며, 이후의 구독자는 replay 수만큼 가장 최근에 저장된 값들을 받게 됨
- 모든 구독자가 사라져도 업스트림 (데이터를 방출하는) 플로우는 액티브 상태지만, 구독자가 없으면 replay 수만큼 가장 최근의 값들만 캐싱함
suspend fun main(): Unit = coroutineScope { val flow1 = flowOf("A", "B", "C") val flow2 = flowOf("D") .onEach { delay(1000) } val sharedFlow = merge(flow1, flow2).shareIn( scope = this, started = SharingStarted.Lazily, ) delay(100) launch { sharedFlow.collect { println("#1 $it") } } delay(1000) launch { sharedFlow.collect { println("#2 $it") } } } // (0.1 sec) // #1 A // #1 B // #1 C // (1 sec) // #2 D // #1 D
WhileSubscribed()
- 첫번째 구독자가 나올 때 감시하기 시작함
- 마지막 구독자가 사라지면 플로우도 멈춤
- 마지막 구독자가 사라지고 난 뒤 감지할 시간을 나타내는 파라미터 (stopTimeoutMillis)
- 멈춘 뒤 리플레이 값을 가지고 있는 시간을 나타내는 파라미터 (replayExpirationMillis)
SharingStarted 인터페이스를 구현하여 커스텀화된 전략을 정의하는 것도 가능
suspend fun main(): Unit = coroutineScope { val flow = flowOf("A", "B", "C", "D") .onStart { println("Started") } .onCompletion { println("Finished") } .onEach { delay(1000) } val sharedFlow = flow.shareIn( scope = this, started = SharingStarted.WhileSubscribed(), ) delay(3000) launch { println("#1 ${sharedFlow.first()}") } launch { println("#2 ${sharedFlow.take(2).toList()}") } delay(3000) launch { println("#3 ${sharedFlow.first()}") } } // (3 sec) // Started // (1 sec) // #1 A // (1 sec) // #2 [A, B] // Finished // (1 sec) // Started // (1 sec) // #3 A // Finished
동일한 변화를 감지하려고 하는 서비스가 여러 개일 때 shareIn 을 사용하면 편리함
상태 플로우 Link to heading
- 상태플로우는 공유플로우의 개념을 확장시킨 것
- replay 인자 값이 1인 공유플로우와 비슷하게 작동함
- 상태플로우는 value 프로퍼티로 접근 가능한 값 하나를 항상 가지고 있음
- 초기 값은 생성자를 통해 전달되어야 함
- value 프로퍼티로 값을 얻어올 수도 있고 설정할 수도 있음
public interface StateFlow<out T> : SharedFlow<T> {
/**
* The current value of this state flow.
*/
public val value: T
}
public interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {
public override var value: T
public fun compareAndSet(expect: T, update: T): Boolean
}
- MutableStateFlow 는 값을 감지할 수 있는 보관소
- 상태플로우는 데이터가 덮어 씌워지기 때문에, 관찰이 느린 경우 상태의 중간 변화를 받을 수 없는 경우도 있음
- 모든 이벤트를 다 받으려면 공유플로우를 사용해야 함
suspend fun main() =
coroutineScope {
val state = MutableStateFlow("A")
println(state.value) // A
launch {
state.collect { println("Value changed to $it") }
// Value changed to A
}
delay(1000)
state.value = "B" // Value changed to B
delay(1000)
launch {
state.collect { println("and now it is $it") }
// and now it is B
}
delay(1000)
state.value = "C" // Value changed to C and now it is C
}
stateIn Link to heading
- stateIn 은 Flow
를 StateFlow 로 변환하는 함수 - 스코프에서만 호출 가능하지만 중단 함수이기도 함
- StateFlow 는 항상 값을 가져야 하기 때문에 값을 명시하지 않았을 때는 첫 번째 값이 계산될 때까지 기다려야 함
suspend fun main(): Unit =
coroutineScope {
val flow =
flowOf("A", "B", "C")
.onEach { delay(1000) }
.onEach { println("Produced $it") }
val stateFlow: StateFlow<String> = flow.stateIn(this)
println("Listening")
println(stateFlow.value)
stateFlow.collect { println("Received $it") }
}
// (1 sec)
// Produced A
// Listening
// A
// Received A
// (1 sec)
// Produced B
// Received B
// (1 sec)
// Produced C
// Received C
suspend fun main(): Unit =
coroutineScope {
val flow =
flowOf("A", "B")
.onEach { delay(1000) }
.onEach { println("Produced $it") }
val stateFlow: StateFlow<String> =
flow.stateIn(
scope = this,
started = SharingStarted.Lazily,
initialValue = "Empty",
)
println(stateFlow.value)
delay(2000)
stateFlow.collect { println("Received $it") }
}
// Empty
// (2 sec)
// Received Empty
// (1 sec)
// Produced A
// Received A
// (1 sec)
// Produced B
// Received B
- 하나의 데이터 소스에서 값이 변경된 걸 감지하는 경우에 주로 stateIn 함수를 사용함
요약 Link to heading
- 안드로이드 개발자에게 특히 중요함