Backend

<Spring> Coroutine Actor 이용해서 단일 서버 락 구현하기 2

wjdtkdgns 2024. 8. 22. 11:45
 

<Spring> Coroutine Actor 이용해서 단일 서버 락 구현하기

📌 Actor🔵 Actor란?Actor 모델은 병렬 프로그래밍에서 상태를 가진 개체가 메시지를 주고받으며 비동기적으로 동작하는 개념이다.즉, 자체적인 상태를 가지고 메시지를 수신하며 이를 처리하는

wtg1026.tistory.com

여기서 이어지는 글입니다.

 

 

📌 리셋

만든 지 하루 만에 갈아엎어버렸다.

문제가 발견된 건 아닌데, 많은 요소가 복잡하게 엮여 있어서 문제가 발생할 가능성이 높다고 생각했다.

이 부분을 해결하고자 했고 처음부터 다시 로직을 구현해 봤다.

 

 

📌 구현 2

이전 로직에서 변경한 부분은 다음과 같다.

  • 락 획득 기준 : Queue의 맨 앞 -> LockActor에서 실행됨을 획득으로 간주
  • Key 별 Actor 할당, 조회 : ConcurrentHashMap -> LockManagerActor의 변수로 처리
  • 미사용 Actor 삭제 처리 : 락 로직 내에서 처리 -> 스케줄링으로 처리
  • LockManager로의 Return : Channel -> CompletableDeferred

 

복잡했던 구조를 더 단순하게 가져가고 싶었다.

Actor를 메시지 큐처럼 사용했었는데, 이 방식이 내부적으로 복잡했었다.

Actor는 작업들의 동시성을 지킬 수 있는데, 이를 락 설정에만 낭비하고 있는 것 같다는 생각이 들었다.

이에 락을 적용해야 할 로직을 Actor 내부에서 실행하게 된다면, 위와 같은 복잡한 로직이 필요 없을 것 같다는 생각에서 구현이 시작되었다.

 

만약 Actor가 pub/sub 용도가 아닌 직접 내부에서 로직을 실행하는 방식으로 간다면, Queue의 필요성이 사라진다.

Channel이 이미 Queue의 역할을 해주기 때문이다.

그렇기에 락 획득을 판단하는 기준이 Queue의 맨 앞에서 LockActor에서 실행되는 대상으로 변경된다.

 

또한 기존 미사용 Actor 삭제 로직은 Actor 조회, Queue에 채널 삽입 등 여러 로직과 연관되어 있었다.

이에 삭제 로직을 최대한 독립적으로 만들고 싶었다.

Actor 조회와 Actor 삭제 로직은 둘 중 하나만 작동해야 한다.

그렇지 않으면 같은 키를 가진 요청에 대해 2개의 Actor를 생성하고 동시성이 깨지는 상황이 발생할 수 있다.

이를 이전과 같이 ConcurrentHashMap의 compute()를 이용해서 처리할 수 있었다.

하지만 compute()의 경우, suspend function이 아니라서 다른 Actor로 메시지를 보내기 위해선 코루틴을 생성하여 처리해야 했다.

이 기능을 Actor로 구현한다면 두 작업 간의 동시성을 지킬 수 있고, 작업 처리를 위해 코루틴을 생성할 필요가 없었다.

그래서 기존 ConcurrentHashMap을 통해 키에 해당하는 Actor를 관리하는 작업을 새로운 Actor를 구현하여 처리했다.

 

삭제 로직이 락 작업에 있어서 병목지점, 성능을 저하하는 작업을 하지 않길 원했다.

락을 설정하여 작업을 처리할 경우, 작업이 순차적으로 진행된다.

이미 락으로 인한 오버헤드가 존재하는 작업에서 더 이상의 오버헤드를 추가하고 싶지 않았다.

이에 삭제 작업은 스케줄러를 통해 처리했다.

 

Actor에서 Service로 반환값을 받기 위해 기존에는 Channel을 이용했다.

하지만 한 Channel을 만들어서 모든 요청에서 사용하는 방식은 return 값에 대한 통일이 필요했다.

그리고 Queue가 사라진 상태에서

이 부분의 제약을 해결하기 위해, 한 Channel을 공유하는 것이 아니라 CompleteableDeferred를 사용하여 다양한 응답 타입에 대응할 수 있도록 했다.

 

위와 같은 이유로 변경한 결과 로직의 진행은 아래와 같다.

SuspendableLockManager -> LockManagerActor -> LockActor 
     ^                   |               |
     |--------------------------------------------

lockManager에서 lock을 요청하면 두 액터의 로직 처리를 이용하여 lock과 로직을 처리하게 된다.

SuspendableLockManager -> LockManagerActor -> LockActor 순서로 메시지가 전달된다.

또한 Deferred에 응답을 채우는 방식으로 LockManagerActor, LockActor에서 SuspendableLockManager로 응답이 전달된다.

에러 또한 위 방식으로 전달된다.

 

🔵 SuspendableLockManager

interface LockManager {
    /**
     * 서비스 로직 (block)에 key를 이용하여 락을 설정한 후 실행한다.
     */
    suspend fun <T> lock(key: String, block: suspend () -> T): T
}
/**
 * render 안하면 잘 보입니다. render 푸세요
 *
 * SuspendableLockManager -> LockManagerActor -> LockActor </br>
 *              ^                   |               |
 *              |-----------------------------------
 *
 * SuspendableLockManager -> LockManagerActor -> LockActor 순서로 메세지가 전달됨
 *
 * Deferred에 응답을 채우는 방식으로 LockManagerActor / LockActor 에서 SuspendableLockManager 로 응답이 전달됨
 * 에러 또한 이 방식으로 전달됨
 */
@Component
class SuspendableLockManager(
    private val coroutineExceptionHandler: ErrorPublishingCoroutineExceptionHandler,
    private val lockConfig: LockConfig.ActorLockConfig,
) : LockManager {
    private val actor = lockManagerActor(
        waitTimeMilli = lockConfig.waitTimeMilli,
        leaseTimeMilli = lockConfig.leaseTimeMilli
    )

    override suspend fun <RETURN> lock(key: String, block: suspend () -> RETURN): RETURN {
        val result = CompletableDeferred<Any?>()

        // 락 시도
        actor.send(
            LockManagerMsg.TryLock(
                key = key,
                block = block,
                result = result
            )
        )

        return try {
            // 서비스 로직 반환값 or 에러
            val rtn = withTimeout(lockConfig.waitTimeMilli + lockConfig.leaseTimeMilli + 1000) {
                result.await()
            }

            // 반환값이 에러면 throw
            if (rtn is Exception) {
                throw rtn
            }

            // TODO: as RETURN 안하는 방법 찾아서 수정 바람
            rtn as RETURN
        } catch (e: TimeoutCancellationException) {
            // 락 획득 시간 에러 처리
            throw FailToExecuteException(ErrorCode.LOCK_TIMEOUT_ERROR)
        } catch (e: Exception) {
            // 이외의 에러 처리
            throw e
        }
    }

    /**
     * channel이 빈 엑터 map에서 삭제하는 스케줄러
     */
    @Scheduled(fixedDelay = 1000 * 60)
    private fun scheduledClearEmptyActor() {
        CoroutineScope(Dispatchers.IO + coroutineExceptionHandler.handler).launch {
            clearEmptyActor()
        }
    }

    /**
     * channel이 빈 엑터 map에서 삭제
     */
    suspend fun clearEmptyActor() {
        actor.send(LockManagerMsg.ClearActor())
    }
}

LockManager는 lock 서비스를 제공하는 역할을 한다.

이 인터페이스에 대한 구현체로 SuspendableLockManager를 생성했다.

락 설정에 대해 Actor에 요청하고 대기한 후, 응답에 관해 처리한다.

또한 미사용 Actor에 대한 삭제 처리도 이 객체가 담당한다.

 

🔵 LockManagerActor

private sealed class LockManagerMsg {
    /** 락 시도 */
    class TryLock(
        val requestTime: LocalDateTime = LocalDateTime.now(),
        val key: String,
        val block: suspend () -> Any?,
        val result: CompletableDeferred<Any?>,
    ) : LockManagerMsg()

    /** 미사용 엑터 삭제 */
    class ClearActor : LockManagerMsg()
}

/**
 * 키에 해당하는 actor에 요청을 보내는 역할을 하는 actor
 */
@OptIn(ObsoleteCoroutinesApi::class)
private fun lockManagerActor(
    waitTimeMilli: Long,
    leaseTimeMilli: Long,
) = CoroutineScope(Dispatchers.IO).actor<LockManagerMsg>(capacity = 1000) {
    val actorMap = HashMap<String, SendChannel<LockMsg>>()

    for (msg in channel) {
        when (msg) {
            is LockManagerMsg.TryLock -> {
                if (msg.requestTime.isBefore(LocalDateTime.now().minusSeconds(waitTimeMilli.milliToSec()))) {
                    // 락 획득 시간보다 더 오랜 시간이 걸렸다면 timeout 에러 발생
                    msg.result.complete(FailToExecuteException(ErrorCode.ACQUIRE_LOCK_TIMEOUT))
                } else {
                    try {
                        // actor 가져오기
                        val actor = actorMap.computeIfAbsent(msg.key) { _ ->
                            lockActor(
                                waitTimeMilli = waitTimeMilli,
                                leaseTimeMilli = leaseTimeMilli
                            )
                        }

                        // 락 처리 요청
                        actor.send(
                            LockMsg.Lock(
                                requestTime = LocalDateTime.now(),
                                block = msg.block,
                                result = msg.result
                            )
                        )
                    } catch (e: Exception) {
                        msg.result.complete(e)
                    }
                }
            }

            is LockManagerMsg.ClearActor -> {
                logger.info { "before $actorMap" }

                // actor channel 비었는지 조회
                actorMap.entries.chunked(100).map { actors ->
                    val deferreds = actors.map { (key, actor) ->
                        async {
                            val result = CompletableDeferred<Boolean>()

                            actor.send(LockMsg.IsEmpty(result))

                            val isEmpty = try {
                                withTimeout(10) {
                                    result.await()
                                }
                            } catch (e: Exception) {
                                // 에러에 대한 처리는 따로 하지않음
                                // 시간 안에 처리 안되면, 작업이 남았다고 간주
                                false
                            }

                            if (isEmpty) {
                                // 빔
                                key
                            } else {
                                // 안빔
                                ""
                            }
                        }
                    }.toTypedArray()

                    // 빈 actor만 필터링
                    val emptyKeys = awaitAll(*deferreds).filter { it != "" }

                    // 빈 actor 삭제
                    emptyKeys.forEach { key -> actorMap.remove(key) }
                }

                logger.info { "after $actorMap" }
            }
        }
    }
}

이는 Key에 따른 Actor에 요청을 보내는 역할을 하는 Actor이다.

기능은 2개 있다.

  1. lock 시도
  2. 미사용 Actor 삭제

 

Lock 시도 로직은 간단하다.

키에 맞는 Actor를 생성, 조회한 후, 이에 lock 요청을 보내는 것이다.

 

위에서 언급했듯이 삭제 로직은 Actor를 통해 처리되므로 조회 로직과의 동시성이 유지될 수 있다.

하지만 이 부분 때문에 삭제 로직은 최대한 빠르게 작동해야 한다.

그렇지 않으면 조회 성능이 떨어지게 된다.

이러한 점을 극복하기 위해 병렬 처리와 withTimeout을 이용했다.

삭제를 위해 Actor에 처리할 요청이 남았는지 확인하는 작업이 필요하다.

이 요청이 빠르게 처리되지 않는다면, 채널에 처리할 작업이 남아있다 판단할 수 있다.

그래서 확인 로직을 withTimeout로 감싸서, return이 늦다면 삭제 대상이 아니라 식별하도록 했다.

이를 통해 최대한 빠르게 작동하도록 구현했다.

 

🔵 LockActor

private sealed class LockMsg {
    /** 락 설정 */
    class Lock(
        val requestTime: LocalDateTime,
        val block: suspend () -> Any?,
        val result: CompletableDeferred<Any?>,
    ) : LockMsg()

    /** 엑터의 채널이 비었는지 확인 */
    class IsEmpty(
        val result: CompletableDeferred<Boolean>,
    ) : LockMsg()
}

/**
 * 특정 키에 해당된 작업을 순차적으로 수행하는 actor
 * msg가 실행된다 == 락을 획득했다 로 여김
 */
@OptIn(ObsoleteCoroutinesApi::class, ExperimentalCoroutinesApi::class)
private fun lockActor(
    waitTimeMilli: Long,
    leaseTimeMilli: Long,
) = CoroutineScope(Dispatchers.IO).actor<LockMsg>(capacity = 1000) {
    for (msg in channel) {
        when (msg) {
            is LockMsg.Lock -> {
                if (msg.requestTime.isBefore(LocalDateTime.now().minusSeconds(waitTimeMilli.milliToSec()))) {
                    // 락은 획득했지만 락 획득 시간보다 더 오랜 시간이 걸렸다면 timeout 에러 발생
                    msg.result.complete(FailToExecuteException(ErrorCode.ACQUIRE_LOCK_TIMEOUT))
                } else {
                    try {
                        // 로직 실행 및 deferred에 결과값 넣기
                        withTimeout(leaseTimeMilli) {
                            val rtn = msg.block()

                            msg.result.complete(rtn)
                        }
                    } catch (e: TimeoutCancellationException) {
                        // 락 획득 시간 에러 처리
                        msg.result.complete(FailToExecuteException(ErrorCode.LOCK_TIMEOUT_ERROR))
                    } catch (e: Exception) {
                        // 이외의 에러 처리
                        msg.result.complete(e)
                    }
                }
            }

            is LockMsg.IsEmpty -> {
                msg.result.complete(channel.isEmpty)
            }
        }
    }
}

이 부분이 실질적으로 락을 설정하고 서비스 로직을 실행하는 Actor이다.

이 Actor에서 로직이 실행된다는 것을 락을 획득했다고 여긴다.

처리에 대한 결과, 에러는 전달받은 CompletableDeferred를 통해 SuspendableLockManager로 전달한다.

 

📌 마무리 2

테스트도 잘 통과한다

 

아쉬운 점

  1. lock을 통해 서비스 로직이 수행되어 생긴 return 값에 대해 as를 이용하여 강제 형변환을 한다.
  2. timeout 로직이 여러 군데에 혼재되어 있다.

차차 수정해보도록 하자