=================
== The Archive ==
=================

[Kotlin Coroutines] 23장. 플로우 처리

|

Introduction

map

1
2
3
4
5
suspend fun main() {
    flowOf(1, 2, 3) // [1, 2, 3]
        .map { it * it } // [1, 4, 9]
        .collect { print(it) } // 149
}

filter

1
2
3
4
5
6
7
8
suspend fun main() {
    (1..10).asFlow() // [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
        .filter { it <= 5 } // [1, 2, 3, 4, 5]
        .filter { isEven(it) } // [2, 4]
        .collect { print(it) } // 24
}

private fun isEven(num: Int): Boolean = num % 2 == 0

take 와 drop

take

1
2
3
4
5
suspend fun main() {
    ('A'..'Z').asFlow()
        .take(5) // [A, B, C, D, E]
        .collect { print(it) } // ABCDE
}

drop

1
2
3
4
5
suspend fun main() {
    ('A'..'Z').asFlow()
        .drop(20) // [U, V, W, X, Y, Z]
        .collect { print(it) } // UVWXYZ
}

컬렉션 처리는 어떻게 작동할까?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
suspend fun main() {
    flowOf('a', 'b')
        .map { it.uppercase() }
        .collect { print(it) } // AB
}

private fun <T, R> Flow<T>.map(transform: suspend (value: T) -> R): Flow<R> =
    flow {
        collect { value ->
            emit(transform(value))
        }
    }

private fun <T> flowOf(vararg elements: T): Flow<T> =
    flow {
        for (element in elements) {
            emit(element)
        }
    }
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
suspend fun main() {
    flow map@{ // 1
        flow flowOf@{ // 2
            for (element in arrayOf('a', 'b')) { // 3
                this@flowOf.emit(element) // 4
            }
        }.collect { value -> // 5
            this@map.emit(value.uppercase()) // 6
        }
    }.collect { // 7
        print(it) // 8
    }
}

merge, zip, combine

merge

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
suspend fun main() {
    val ints: Flow<Int> = flowOf(1, 2, 3)
    val doubles: Flow<Double> = flowOf(0.1, 0.2, 0.3)

    val together: Flow<Number> = merge(ints, doubles)
    print(together.toList())
    // [1, 2, 3, 0.1, 0.2, 0.3]
    // or [1, 0.1, 0.2, 0.3, 2, 3]
    // or [1, 0.1, 0.2, 0.3, 2, 3]
    // or [0.1, 1, 2, 3, 0.2, 0.3]
    // or any other combination
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
suspend fun main() {
    val ints: Flow<Int> =
        flowOf(1, 2, 3)
            .onEach { delay(1000) }
    val doubles: Flow<Double> = flowOf(0.1, 0.2, 0.3)

    val together: Flow<Number> = merge(ints, doubles)
    together.collect { println(it) }
}

// 0.1
// 0.2
// 0.3
// (1 sec)
// 1
// (1 sec)
// 2
// (1 sec)
// 3

zip

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
suspend fun main() {
    val flow1 =
        flowOf("A", "B", "C")
            .onEach { delay(400) }
    val flow2 =
        flowOf(1, 2, 3, 4)
            .onEach { delay(1000) }
    flow1.zip(flow2) { f1, f2 -> "${f1}_$f2" }
        .collect { println(it) }
}

// (1 sec)
// A_1
// (1 sec)
// B_2
// (1 sec)
// C_3

combine

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
suspend fun main() {
    val flow1 =
        flowOf("A", "B", "C")
            .onEach { delay(400) }
    val flow2 =
        flowOf(1, 2, 3, 4)
            .onEach { delay(1000) }
    flow1.combine(flow2) { f1, f2 -> "${f1}_$f2" }
        .collect { println(it) }
}

// (1 sec)
// B_1
// (0.2 sec)
// C_1
// (0.8 sec)
// C_2
// (1 sec)
// C_3
// (1 sec)
// C_4

fold 와 scan

fold

1
2
3
4
5
6
7
fun main() {
    val list = listOf(1, 2, 3, 4)
    val res = list.fold(0) { acc, i -> acc + i }
    println(res) // 10
    val res2 = list.fold(1) { acc, i -> acc * i }
    println(res2) // 24
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
suspend fun main() {
    val list =
        flowOf(1, 2, 3, 4)
            .onEach { delay(1000) }
    val res = list.fold(0) { acc, i -> acc + i }
    println(res)
}

// (4 sec)
// 10

scan

1
2
3
4
5
fun main() {
    val list = listOf(1, 2, 3, 4)
    val res = list.scan(0) { acc, i -> acc + i }
    println(res) // [0, 1, 3, 6, 10]
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
suspend fun main() {
    flowOf(1, 2, 3, 4)
        .onEach { delay(1000) }
        .scan(0) { acc, v -> acc + v }
        .collect { println(it) }
}

// 0
// (1 sec)
// 1
// (1 sec)
// 3
// (1 sec)
// 6
// (1 sec)
// 10

flatMapConcat, flapMapMerge, flatMapLatest

flatMapConcat

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private fun flowFrom(elem: String) =
    flowOf(1, 2, 3)
        .onEach { delay(1000) }
        .map { "${it}_$elem " }

@OptIn(ExperimentalCoroutinesApi::class)
suspend fun main() {
    flowOf("A", "B", "C")
        .flatMapConcat { flowFrom(it) }
        .collect { println(it) }
}

// (1 sec)
// 1_A
// (1 sec)
// 2_A
// (1 sec)
// 3_A
// (1 sec)
// 1_B
// (1 sec)
// 2_B
// (1 sec)
// 3_B
// (1 sec)
// 1_C
// (1 sec)
// 2_C
// (1 sec)
// 3_C

flapMapMerge

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private fun flowFrom(elem: String) =
    flowOf(1, 2, 3)
        .onEach { delay(1000) }
        .map { "${it}_$elem " }

@OptIn(ExperimentalCoroutinesApi::class)
suspend fun main() {
    flowOf("A", "B", "C")
        .flatMapMerge { flowFrom(it) }
        .collect { println(it) }
}

// (1 sec)
// 1_A
// 1_B
// 1_C
// (1 sec)
// 2_A
// 2_B
// 2_C
// (1 sec)
// 3_A
// 3_B
// 3_C
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private fun flowFrom(elem: String) =
    flowOf(1, 2, 3)
        .onEach { delay(1000) }
        .map { "${it}_$elem " }

@OptIn(ExperimentalCoroutinesApi::class)
suspend fun main() {
    flowOf("A", "B", "C")
        .flatMapMerge(concurrency = 2) { flowFrom(it) }
        .collect { println(it) }
}

// (1 sec)
// 1_A
// 1_B
// (1 sec)
// 2_A
// 2_B
// (1 sec)
// 3_A
// 3_B
// (1 sec)
// 1_C
// (1 sec)
// 2_C
// (1 sec)
// 3_C

flatMapLatest

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
private fun flowFrom(elem: String) =
    flowOf(1, 2, 3)
        .onEach { delay(1000) }
        .map { "${it}_$elem " }

@OptIn(ExperimentalCoroutinesApi::class)
suspend fun main() {
    flowOf("A", "B", "C")
        .flatMapLatest { flowFrom(it) }
        .collect { println(it) }
}

// (1 sec)
// 1_C
// (1 sec)
// 2_C
// (1 sec)
// 3_C
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private fun flowFrom(elem: String) =
    flowOf(1, 2, 3)
        .onEach { delay(1000) }
        .map { "${it}_$elem " }

@OptIn(ExperimentalCoroutinesApi::class)
suspend fun main() {
    flowOf("A", "B", "C")
        .onEach { delay(1200) }
        .flatMapLatest { flowFrom(it) }
        .collect { println(it) }
}

// (2.2 sec)
// 1_A
// (1.2 sec)
// 1_B
// (1.2 sec)
// 1_C
// (1 sec)
// 2_C
// (1 sec)
// 3_C

재시도 (retry)

1
2
3
4
5
6
7
public fun <T> Flow<T>.retry(
    retries: Long = Long.MAX_VALUE,
    predicate: suspend (cause: Throwable) -> Boolean = { true }
): Flow<T> {
    require(retries > 0) { "Expected positive amount of retries, but had $retries" }
    return retryWhen { cause, attempt -> attempt < retries && predicate(cause) }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
public fun <T> Flow<T>.retryWhen(predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean): Flow<T> =
    flow {
        var attempt = 0L
        var shallRetry: Boolean
        do {
            shallRetry = false
            val cause = catchImpl(this)
            if (cause != null) {
                if (predicate(cause, attempt)) {
                    shallRetry = true
                    attempt++
                } else {
                    throw cause
                }
            }
        } while (shallRetry)
    }
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
@Suppress("UNREACHABLE_CODE")
suspend fun main() {
    flow {
        emit(1)
        emit(2)
        error("E")
        emit(3)
    }.retry(3) {
        print(it.message)
        true
    }.collect { print(it) } // 12E12E12E12(exception thrown)
}

중복 제거 함수

distinctUntilChanged

1
2
3
4
5
suspend fun main() {
    flowOf(1, 2, 2, 3, 2, 1, 1, 3)
        .distinctUntilChanged()
        .collect { print(it) } // 123213
}
1
2
3
4
5
public fun <T> Flow<T>.distinctUntilChanged(): Flow<T> =
    when (this) {
        is StateFlow<*> -> this // state flows are always distinct
        else -> distinctUntilChangedBy(keySelector = defaultKeySelector, areEquivalent = defaultAreEquivalent)
    }
1
2
3
4
5
6
7
private fun <T> Flow<T>.distinctUntilChangedBy(
    keySelector: (T) -> Any?,
    areEquivalent: (old: Any?, new: Any?) -> Boolean
): Flow<T> = when {
    this is DistinctFlowImpl<*> && this.keySelector === keySelector && this.areEquivalent === areEquivalent -> this // same
    else -> DistinctFlowImpl(this, keySelector, areEquivalent)
}

distinctUntilChangedBy

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private data class User(val id: Int, val name: String) {
    override fun toString(): String = "[$id] $name"
}

suspend fun main() {
    val users =
        flowOf(
            User(1, "Alex"),
            User(1, "Bob"),
            User(2, "Bob"),
            User(2, "Celine"),
        )

    println(users.distinctUntilChangedBy { it.id }.toList())
    // [[1] Alex, [2] Bob]
    println(users.distinctUntilChangedBy { it.name }.toList())
    // [[1] Alex, [1] Bob, [2] Celine]
    println(
        users.distinctUntilChanged { prev, next ->
            prev.id == next.id || prev.name == next.name
        }.toList(),
    ) // [[1] Alex, [2] Bob]
    // [2] Bob was emitted,
    // because we compare to the previous emitted
}

최종 연산

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
suspend fun main() {
    val flow =
        flowOf(1, 2, 3, 4) // [1, 2, 3, 4]
            .map { it * it } // [1, 4, 9, 16]

    println(flow.first()) // 1
    println(flow.count()) // 4

    println(flow.reduce { acc, value -> acc * value }) // 576
    println(flow.fold(0) { acc, value -> acc + value }) // 30
}

요약

References