[Kotlin Coroutines] 21장. 플로우 만들기
|
원시값을 가지는 플로우
- 플로우를 만드는 가장 간단한 방법
- 어떤 값을 가져야 하는지 정의하는
flowOf함수를 사용하는 것
- 어떤 값을 가져야 하는지 정의하는
- 값이 없는 플로우의 경우
emptyFlow()
컨버터
Iterable,Iterator,Sequence를 플로우로 바꾸려면asFlow를 사용
- asFlow 는 즉시 사용 가능한 원소들의 플로우를 만듦
- 플로우 처리 함수를 사용해 처리 가능한 원소들의 플로우를 시작할 때 유용함
함수를 플로우로 바꾸기
- 플로우는 시간상 지연되는 하나의 값을 나타낼 때 자주 사용됨
- 따라서 중단 함수를 플로우로 변환하는 것 또한 가능
- 이때 중단 함수의 결과가 플로우의 유일한 값
- 일반 함수를 변경하려면 함수 참조값이 필요
- 코틀린에서는
::를 사용해서 참조 가능
플로우와 리액티브 스트림
- 리액티브 스트림을 활용하고 있다면 코드를 별로 바꾸지 않고 플로우 적용이 가능
Flux,Flowable,Observable은kotlinx-coroutines-reactive라이브러리의asFlow함수를 사용해Flow로 변환 가능한Publisher인터페이스를 구현하고 있음
- 역으로 변환하려면?
- 좀 더 복잡한 라이브러리를 사용해야 함
- https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-reactor/
| |
플로우 빌더
- 플로우를 만들 때 가장 많이 사용되는 방법은
flow빌더 - 빌더는
flow함수를 먼저 호출하고, 람다식 내부에서emit함수를 사용해 다음 값을 방출함 Channel이나Flow에서 모든 값을 방출하려면emitAll을 사용할 수 있음
플로우 빌더 이해하기
- 플로우 빌더는 플로우를 만드는 가장 기본적인 방법
- 다른 모든 방법 또한 빌더를 기초로 하고 있음
flow빌더는 내부적으로 아주 간단collect메서드 내부에서block함수를 호출하는Flow인터페이스를 구현함
flow빌더를 호출하면 단지 객체를 만들 뿐반면
collect를 호출하면collector인터페이스의block함수를 호출하게 됨- 이 예제의
block함수는 1에서 정의된 람다식
- 리시버는 2에서 정의된 람다식인
collect
- 이 예제의
(
FlowCollector와 같이) 람다식으로 정의된 함수 인터페이스를 정의하면 람다식의 본체는 함수형 인터페이스의 함수 본체로 사용됨- 여기서는
emit - 그러므로
emit함수의 본체는println(value)가 됨
- 여기서는
따라서
collect를 호출하면 1에서 정의된 람다식을 실행하기 시작하고emit을 호출했을 때 2에서 정의된 람다식을 호출함이것이 플로우가 작동하는 원리
플로우의 다른 모든 특성 또한 이런 원리에 기초하여 만들어짐
채널플로우 (channelFlow)
Flow는 콜드 데이터 스트림이므로 필요할 때만 값을 생성몇몇 상황에서는 특정 원소를 찾는 상황이 발생할 수 있다.
- 이럴 때 필요할 때만 다음 페이지를 지연 요청하게 된다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54data class UserInChannelFlowExample(val name: String) interface UserApiInChannelFlowExample { suspend fun takePage(pageNumber: Int): List<UserInChannelFlowExample> } class FakeUserApiInChannelFlowExample : UserApiInChannelFlowExample { private val userInChannelFlowExamples = List(20) { UserInChannelFlowExample("User$it") } private val pageSize: Int = 3 override suspend fun takePage(pageNumber: Int): List<UserInChannelFlowExample> { delay(1000) // suspending return userInChannelFlowExamples .drop(pageSize * pageNumber) .take(pageSize) } } fun allUsersFlowInChannelFlowExample(api: UserApiInChannelFlowExample): Flow<UserInChannelFlowExample> = flow { var page = 0 do { println("Fetching page $page") val users = api.takePage(page++) // suspending emitAll(users.asFlow()) } while (users.isNotEmpty()) } suspend fun main() { val api = FakeUserApiInChannelFlowExample() val users = allUsersFlowInChannelFlowExample(api) val user = users .first { println("Checking $it") delay(1000) // suspending it.name == "User3" } println(user) } // Fetching page 0 // (1 sec) // Checking User(name=User0) // (1 sec) // Checking User(name=User1) // (1 sec) // Checking User(name=User2) // (1 sec) // Fetching page 1 // (1 sec) // Checking User(name=User3) // (1 sec) // User(name=User3)반면 원소를 처리하고 있을 때 미리 페이지를 받아오고 싶은 경우
- 네트워크 호출을 더 빈번하게 하는 단점
- 결과를 더 빠르게 얻어올 수 있는 장점
- 이렇게 하려면 데이터 새성을 하고 소비하는 과정이 별개로 진행되어야 함
- 이는 채널과 같은 핫 데이터 스트림의 전형적인 특징
- 따라서 채널과 플로우를 합친 형태가 필요
channelFlow- 플로우처럼
Flow인터페이스를 구현하기 때문에 플로우가 가지는 특징을 제공 - 채널 플로우 빌더는 일반 함수,
collect와 같은 최종 연산으로 시작됨 - 한 번 시작하기만 하면 리시버를 기다릴 필요 없이 분리된 코루틴에서 값을 생성한다는 점이 채널과 비슷함
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56data class UserInChannelFlowExample2(val name: String) interface UserApiInChannelFlowExample2 { suspend fun takePage(pageNumber: Int): List<UserInChannelFlowExample2>? } class FakeUserApiInChannelFlowExample2 : UserApiInChannelFlowExample2 { private val userInChannelFlowExample2s = List(20) { UserInChannelFlowExample2("User$it") } private val pageSize: Int = 3 override suspend fun takePage(pageNumber: Int): List<UserInChannelFlowExample2> { delay(1000) return userInChannelFlowExample2s .drop(pageSize * pageNumber) .take(pageSize) } } fun allUsersFlowInChannelFlowExample2(api: UserApiInChannelFlowExample2): Flow<UserInChannelFlowExample2> = channelFlow { var page = 0 do { println("Fetching page $page") val users = api.takePage(page++) // suspending users?.forEach { send(it) } } while (!users.isNullOrEmpty()) } suspend fun main() { val api = FakeUserApiInChannelFlowExample2() val users = allUsersFlowInChannelFlowExample2(api) val user = users .first { println("Checking $it") delay(1000) it.name == "User3" } println(user) } // Fetching page 0 // (1 sec) // Checking UserInChannelFlowExample2(name=User0) // Fetching page 1 // (1 sec) // Checking UserInChannelFlowExample2(name=User1) // Fetching page 2 // (1 sec) // Checking UserInChannelFlowExample2(name=User2) // Fetching page 3 // (1 sec) // Checking UserInChannelFlowExample2(name=User3) // Fetching page 4 // (1 sec) // UserInChannelFlowExample2(name=User3)channelFlow는ProducerScope<T>에서 작동ProducerScope는produce빌더가 사용하는 것과 같은 타입ProducerScope는CoroutineScope를 구현했기 때문에 빌더에서 새로운 코루틴을 시작할 때 사용할 수 있음- 원소를 생성하려면
emit대신에send를 사용함 - 채널에 접근해
SendChannel함수를 직접 조작할 수도 있음
1 2 3 4 5 6 7 8 9 10public interface ProducerScope<in E> : CoroutineScope, SendChannel<E> { /** * A reference to the channel this coroutine [sends][send] elements to. * It is provided for convenience, so that the code in the coroutine can refer * to the channel as `channel` as opposed to `this`. * All the [SendChannel] functions on this interface delegate to * the channel instance returned by this property. */ public val channel: SendChannel<E> }- 플로우처럼
여러 개의 값을 독립적으로 계산해야 할 때
channelFlow를 주로 사용함channelFlow는 코루틴 스코프를 생성하여launch와 같은 코루틴 빌더를 직접 시작할 수 있음flow는 코루틴 빌더가 필요로 하는 스코프를 만들지 못함다른 코루틴처럼
channelFlow도 모든 자식 코루틴이 종료 상태가 될 때까지 끝나지 않음
콜백플로우 (callbackFlow)
- 사용자의 클릭이나 활동 변화를 감지해야 하는 이벤트 플로우의 경우, 감지하는 프로세스는 이벤트를 처리하는 프로세스와 독립적이어야 하므로
channelFlow를 사용해도 좋지만, 이 경우에는callbackFlow를 사용하는 것이 더 낫다. channelFlow와callbackFlow의 가장 큰 차이점은callbackFlow가 콜백 함수를 래핑하는 방식으로 변경된 것callbackFlow는ProducerScope<T>에서 작동- 아래는 콜백을 처리하는데 유용한 몇 가지 함수
awaitClose { … }trySendBlocking(value)close()cancel(throwable)
요약
- 플로우를 생성하는 여러 가지 방법에 대해 정리