=================
== The Archive ==
=================

Bucket4j 로 API Rate Limiting 를 구현해보자

|

개요

최근 사이드 프로젝트를 진행하면서, API 에 Rate limiting 을 적용 해야 할 일이 생겼다.

어떻게 제한을 둘까 고민하던 와중 Bucket4J 를 알게 되었다.

Bucket4JToken bucket 알고리즘을 구현한 라이브러리로,

이를 적용하면 API Rate Limiting 를 구현할 수 있을 것 같았다.

하지만 찾아보니 생각보다 예제가 충분하지 않았고, 오래된 정보들이 많아 적용하는데 어려움을 겪었다.

그래서 아예 따로 spring-boot-starter-webspring-boot-starter-webflux 를 간단한 demo 로

각각 간단하게 적용해보았다.

Getting Started

라이브러리는 아래와 같이 추가해주면 된다.

1
2
3
4
5
6
dependencies {
    ...

    // Bucket4j
    implementation("com.bucket4j:bucket4j-core:8.6.0")
}

Basic

간단하게 어떻게 동작하는지 테스트 코드로 작성해본 내용이다.

tryConsume

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
@Test
fun basicTest() {
    // bucket with capacity 20 tokens and
    // with refilling speed 1 token per each 6 second (refillTokens = 10, refillPeriod = 60s)
    val bucket =
        Bucket.builder()
            .addLimit {
                it.capacity(20).refillGreedy(10, Duration.ofMinutes(1L))
            }
            .build()

    repeat(20) {
        assertTrue(bucket.tryConsume(1))
    }
    assertFalse(bucket.tryConsume(1))
}

Refill Greedy

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Test
fun basicRefillGreedyTest() =
    runBlocking {
        val tokenCapacity = 20L
        val refillTokens = 10
        val refillPeriod = Duration.ofSeconds(10L)
        // bucket with capacity 20 tokens and
        // with refilling speed 1 token per each 1 second (refillTokens = 10, refillPeriod = 10s)
        val bucket =
            Bucket.builder()
                .addLimit {
                    it.capacity(tokenCapacity).refillGreedy(refillTokens.toLong(), refillPeriod)
                }
                .build()

        bucket.tryConsume(tokenCapacity) // Consume all directly
        assertFalse(bucket.tryConsume(1)) // Check emtpy

        repeat(refillTokens) {
            delay(refillPeriod.toMillis() / refillTokens) // Delay until refill greedy
            assertTrue(bucket.tryConsume(1)) // Check refill success
            assertFalse(bucket.tryConsume(1)) // Check refill only 1 token
        }
        assertFalse(bucket.tryConsume(1))
    }

구현 전제

먼저 필자는

코드로 보면 아래와 같다.

1
2
3
4
5
6
enum class UserRole {
    ADMIN,
    PREMIUM,
    USER,
    ANONYMOUS,
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
enum class PricingPlan(val limit: Bandwidth) {
    FREE(
        Bandwidth.builder()
            .capacity(20)
            .refillIntervally(20, Duration.ofHours(1))
            .build(),
    ),
    BASIC(
        Bandwidth.builder()
            .capacity(40)
            .refillIntervally(40, Duration.ofHours(1))
            .build(),
    ),
    PROFESSIONAL(
        Bandwidth.builder()
            .capacity(100)
            .refillIntervally(100, Duration.ofHours(1))
            .build(),
    ),
    ;

    companion object {
        fun resolvePlanFromUserRole(userRole: UserRole): PricingPlan {
            return when (userRole) {
                UserRole.ANONYMOUS -> FREE
                UserRole.USER -> BASIC
                UserRole.PREMIUM -> PROFESSIONAL
                UserRole.ADMIN -> PROFESSIONAL
            }
        }
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
@Service
class PricingPlanService(
    private val jwtProvider: JWTProvider,
) {
    private val cache = ConcurrentHashMap<String, Bucket>()

    fun resolveBucket(token: String) = cache.computeIfAbsent(token, this::newBucket)

    private fun newBucket(token: String): Bucket {
        val userRole = jwtProvider.getRole(token) ?: UserRole.ANONYMOUS

        val pricingPlan = PricingPlan.resolvePlanFromUserRole(userRole)

        return Bucket.builder()
            .addLimit(pricingPlan.limit)
            .build()
    }
}

간단한 구조다.

만약 여기서 조금 더 수정한다면, cache 부분을 Redis 로 처리한다는 등이 적용될 것 같다.

이제 상세 구현으로 가보자.

webflux 인 경우

webflux 의 경우 WebFilter 를 구현하여 처리하였다.

먼저 아래에 RateLimitInterceptor 라는 클래스를 보자.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Configuration
class RateLimitInterceptor(
    private val pricingPlanService: PricingPlanService,
) : WebFilter {
    private val logger = KotlinLogging.logger {}

    override fun filter(
        exchange: ServerWebExchange,
        chain: WebFilterChain,
    ): Mono<Void> {
        return Mono.just(getAuthorizationHeader(exchange))
            .map { getRowToken(it) }
            .flatMap { token ->
                val bucket = pricingPlanService.resolveBucket(token)
                val probe = bucket.tryConsumeAndReturnRemaining(1)
                val remainingLimit = probe.remainingTokens
                if (probe.isConsumed) {
                    logger.info { "Remaining limit - $remainingLimit" }
                    exchange.response.headers.set(X_RATE_LIMIT_REMAINING, remainingLimit.toString())
                } else {
                    val waitForRefill = probe.nanosToWaitForRefill / NANO_SECONDS
                    exchange.response.headers.set(X_RATE_LIMIT_RETRY_AFTER_SECONDS, waitForRefill.toString())
                    exchange.response.setStatusCode(HttpStatus.TOO_MANY_REQUESTS)
                    return@flatMap exchange.response.setComplete()
                }
                chain.filter(exchange)
            }
    }

    private fun getAuthorizationHeader(exchange: ServerWebExchange): String {
        return exchange.request.headers[HttpHeaders.AUTHORIZATION]
            ?.firstOrNull { it.isNotEmpty() }
            ?: ""
    }

    private fun getRowToken(authorizationHeader: String): String {
        return if (authorizationHeader.startsWith(BEARER_)) {
            authorizationHeader.substringAfter(BEARER_)
        } else {
            authorizationHeader
        }
    }
}

필자는 토큰이 들어오는 경우와 들어오지 않는 경우 모두 고려했다.

먼저 토큰을 가져와서 파싱 한 뒤, 해당 토큰을 pricingPlanService.resolveBucket 로 전달해서 버켓을 가져오도록 했다.

그리고 해당 버켓이 비어있는지 아닌지에 따라

제공하도록 하였다.

실제 결과는 아래와 같다.

정상적인 경우

HTTP/1.1 200 OK
X-Rate-Limit-Remaining: 19
content-length: 0

<Response body is empty>

Response code: 200 (OK); Time: 130ms (130 ms); Content length: 0 bytes (0 B)

버켓이 비어있는 경우

HTTP/1.1 429 Too Many Requests
X-Rate-Limit-Retry-After-Seconds: 3588
content-length: 0

<Response body is empty>

Response code: 429 (Too Many Requests); Time: 2ms (2 ms); Content length: 0 bytes (0 B)

web 인 경우

이제 spring-boot-starter-web 일 때로 보자.

이때는 HandlerInterceptorpreHandle 을 구현하여 처리하였다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Component
class RateLimitInterceptor(
    private val pricingPlanService: PricingPlanService,
) : HandlerInterceptor {
    private val logger = KotlinLogging.logger {}

    override fun preHandle(
        request: HttpServletRequest,
        response: HttpServletResponse,
        handler: Any,
    ): Boolean {
        val authorizationHeader = request.getHeader(HttpHeaders.AUTHORIZATION) ?: ""
        val token =
            if (authorizationHeader.startsWith(BEARER_)) {
                authorizationHeader.substringAfter(BEARER_)
            } else {
                authorizationHeader
            }

        val bucket = pricingPlanService.resolveBucket(token)
        val probe = bucket.tryConsumeAndReturnRemaining(1)
        val remainingLimit = probe.remainingTokens

        return if (probe.isConsumed) {
            logger.info { "Remaining limit - $remainingLimit" }
            response.addHeader(X_RATE_LIMIT_REMAINING, remainingLimit.toString())
            true
        } else {
            logger.error { "You have exhausted your API Request Quota" }
            val waitForRefill = probe.nanosToWaitForRefill / NANO_SECONDS
            response.addHeader(X_RATE_LIMIT_RETRY_AFTER_SECONDS, waitForRefill.toString())
            response.sendError(
                HttpStatus.TOO_MANY_REQUESTS.value(),
                "You have exhausted your API Request Quota",
            )
            false
        }
    }
}

그리고 추가적으로 WebMvcConfigurer 에서 패턴을 적용하여 중복 적용이 안되게끔 처리하였다.

1
2
3
4
5
6
7
8
9
@Configuration
class AppConfig(
    private val rateLimitInterceptor: RateLimitInterceptor,
) : WebMvcConfigurer {
    override fun addInterceptors(registry: InterceptorRegistry) {
        registry.addInterceptor(rateLimitInterceptor)
            .addPathPatterns("/web/**") // prevent call twice
    }
}

결과는 아래와 같다.

정상적인 경우

HTTP/1.1 200 
X-Rate-Limit-Remaining: 19
Content-Length: 0
Date: Tue, 21 Nov 2023 18:52:25 GMT
Keep-Alive: timeout=60
Connection: keep-alive

<Response body is empty>

Response code: 200; Time: 99ms (99 ms); Content length: 0 bytes (0 B)

버켓이 비어있는 경우

HTTP/1.1 429 
X-Rate-Limit-Retry-After-Seconds: 3532
Content-Type: application/json
Transfer-Encoding: chunked
Date: Tue, 21 Nov 2023 18:53:32 GMT
Keep-Alive: timeout=60
Connection: keep-alive

{
  "timestamp": "2023-11-21T18:53:33.260+00:00",
  "status": 429,
  "error": "Too Many Requests",
  "path": "/web"
}

Response code: 429; Time: 36ms (36 ms); Content length: 100 bytes (100 B)

정리

Bucket4J 라이브러리를 spring-boot-starter-webspring-boot-starter-webflux 로 각각

구현하면서, 어떻게 API Rate Limiting 을 적용할 지 이해하는지와 각 구조를 이해하는 데 도움이 많이 되었다.

작업한 레포는 https://github.com/bossm0n5t3r/spring-boot-bucket4j에 멀티 모듈로 구성되어있으니 참고하면 된다.

References

Categories:

Tags: