<Spring> Coroutine Actor 이용해서 단일 서버 락 구현하기
📌 Actor🔵 Actor란?Actor 모델은 병렬 프로그래밍에서 상태를 가진 개체가 메시지를 주고받으며 비동기적으로 동작하는 개념이다.즉, 자체적인 상태를 가지고 메시지를 수신하며 이를 처리하는
wtg1026.tistory.com
여기서 이어지는 글입니다.
📌 리셋
만든 지 하루 만에 갈아엎어버렸다.
문제가 발견된 건 아닌데, 많은 요소가 복잡하게 엮여 있어서 문제가 발생할 가능성이 높다고 생각했다.
이 부분을 해결하고자 했고 처음부터 다시 로직을 구현해 봤다.
📌 구현 2
이전 로직에서 변경한 부분은 다음과 같다.
- 락 획득 기준 : Queue의 맨 앞 -> LockActor에서 실행됨을 획득으로 간주
- Key 별 Actor 할당, 조회 : ConcurrentHashMap -> LockManagerActor의 변수로 처리
- 미사용 Actor 삭제 처리 : 락 로직 내에서 처리 -> 스케줄링으로 처리
- LockManager로의 Return : Channel -> CompletableDeferred
복잡했던 구조를 더 단순하게 가져가고 싶었다.
Actor를 메시지 큐처럼 사용했었는데, 이 방식이 내부적으로 복잡했었다.
Actor는 작업들의 동시성을 지킬 수 있는데, 이를 락 설정에만 낭비하고 있는 것 같다는 생각이 들었다.
이에 락을 적용해야 할 로직을 Actor 내부에서 실행하게 된다면, 위와 같은 복잡한 로직이 필요 없을 것 같다는 생각에서 구현이 시작되었다.
만약 Actor가 pub/sub 용도가 아닌 직접 내부에서 로직을 실행하는 방식으로 간다면, Queue의 필요성이 사라진다.
Channel이 이미 Queue의 역할을 해주기 때문이다.
그렇기에 락 획득을 판단하는 기준이 Queue의 맨 앞에서 LockActor에서 실행되는 대상으로 변경된다.
또한 기존 미사용 Actor 삭제 로직은 Actor 조회, Queue에 채널 삽입 등 여러 로직과 연관되어 있었다.
이에 삭제 로직을 최대한 독립적으로 만들고 싶었다.
Actor 조회와 Actor 삭제 로직은 둘 중 하나만 작동해야 한다.
그렇지 않으면 같은 키를 가진 요청에 대해 2개의 Actor를 생성하고 동시성이 깨지는 상황이 발생할 수 있다.
이를 이전과 같이 ConcurrentHashMap의 compute()를 이용해서 처리할 수 있었다.
하지만 compute()의 경우, suspend function이 아니라서 다른 Actor로 메시지를 보내기 위해선 코루틴을 생성하여 처리해야 했다.
이 기능을 Actor로 구현한다면 두 작업 간의 동시성을 지킬 수 있고, 작업 처리를 위해 코루틴을 생성할 필요가 없었다.
그래서 기존 ConcurrentHashMap을 통해 키에 해당하는 Actor를 관리하는 작업을 새로운 Actor를 구현하여 처리했다.
삭제 로직이 락 작업에 있어서 병목지점, 성능을 저하하는 작업을 하지 않길 원했다.
락을 설정하여 작업을 처리할 경우, 작업이 순차적으로 진행된다.
이미 락으로 인한 오버헤드가 존재하는 작업에서 더 이상의 오버헤드를 추가하고 싶지 않았다.
이에 삭제 작업은 스케줄러를 통해 처리했다.
Actor에서 Service로 반환값을 받기 위해 기존에는 Channel을 이용했다.
하지만 한 Channel을 만들어서 모든 요청에서 사용하는 방식은 return 값에 대한 통일이 필요했다.
그리고 Queue가 사라진 상태에서
이 부분의 제약을 해결하기 위해, 한 Channel을 공유하는 것이 아니라 CompleteableDeferred를 사용하여 다양한 응답 타입에 대응할 수 있도록 했다.
위와 같은 이유로 변경한 결과 로직의 진행은 아래와 같다.
SuspendableLockManager -> LockManagerActor -> LockActor
^ | |
|--------------------------------------------
lockManager에서 lock을 요청하면 두 액터의 로직 처리를 이용하여 lock과 로직을 처리하게 된다.
SuspendableLockManager -> LockManagerActor -> LockActor 순서로 메시지가 전달된다.
또한 Deferred에 응답을 채우는 방식으로 LockManagerActor, LockActor에서 SuspendableLockManager로 응답이 전달된다.
에러 또한 위 방식으로 전달된다.
🔵 SuspendableLockManager
interface LockManager {
/**
* 서비스 로직 (block)에 key를 이용하여 락을 설정한 후 실행한다.
*/
suspend fun <T> lock(key: String, block: suspend () -> T): T
}
/**
* render 안하면 잘 보입니다. render 푸세요
*
* SuspendableLockManager -> LockManagerActor -> LockActor </br>
* ^ | |
* |-----------------------------------
*
* SuspendableLockManager -> LockManagerActor -> LockActor 순서로 메세지가 전달됨
*
* Deferred에 응답을 채우는 방식으로 LockManagerActor / LockActor 에서 SuspendableLockManager 로 응답이 전달됨
* 에러 또한 이 방식으로 전달됨
*/
@Component
class SuspendableLockManager(
private val coroutineExceptionHandler: ErrorPublishingCoroutineExceptionHandler,
private val lockConfig: LockConfig.ActorLockConfig,
) : LockManager {
private val actor = lockManagerActor(
waitTimeMilli = lockConfig.waitTimeMilli,
leaseTimeMilli = lockConfig.leaseTimeMilli
)
override suspend fun <RETURN> lock(key: String, block: suspend () -> RETURN): RETURN {
val result = CompletableDeferred<Any?>()
// 락 시도
actor.send(
LockManagerMsg.TryLock(
key = key,
block = block,
result = result
)
)
return try {
// 서비스 로직 반환값 or 에러
val rtn = withTimeout(lockConfig.waitTimeMilli + lockConfig.leaseTimeMilli + 1000) {
result.await()
}
// 반환값이 에러면 throw
if (rtn is Exception) {
throw rtn
}
// TODO: as RETURN 안하는 방법 찾아서 수정 바람
rtn as RETURN
} catch (e: TimeoutCancellationException) {
// 락 획득 시간 에러 처리
throw FailToExecuteException(ErrorCode.LOCK_TIMEOUT_ERROR)
} catch (e: Exception) {
// 이외의 에러 처리
throw e
}
}
/**
* channel이 빈 엑터 map에서 삭제하는 스케줄러
*/
@Scheduled(fixedDelay = 1000 * 60)
private fun scheduledClearEmptyActor() {
CoroutineScope(Dispatchers.IO + coroutineExceptionHandler.handler).launch {
clearEmptyActor()
}
}
/**
* channel이 빈 엑터 map에서 삭제
*/
suspend fun clearEmptyActor() {
actor.send(LockManagerMsg.ClearActor())
}
}
LockManager는 lock 서비스를 제공하는 역할을 한다.
이 인터페이스에 대한 구현체로 SuspendableLockManager를 생성했다.
락 설정에 대해 Actor에 요청하고 대기한 후, 응답에 관해 처리한다.
또한 미사용 Actor에 대한 삭제 처리도 이 객체가 담당한다.
🔵 LockManagerActor
private sealed class LockManagerMsg {
/** 락 시도 */
class TryLock(
val requestTime: LocalDateTime = LocalDateTime.now(),
val key: String,
val block: suspend () -> Any?,
val result: CompletableDeferred<Any?>,
) : LockManagerMsg()
/** 미사용 엑터 삭제 */
class ClearActor : LockManagerMsg()
}
/**
* 키에 해당하는 actor에 요청을 보내는 역할을 하는 actor
*/
@OptIn(ObsoleteCoroutinesApi::class)
private fun lockManagerActor(
waitTimeMilli: Long,
leaseTimeMilli: Long,
) = CoroutineScope(Dispatchers.IO).actor<LockManagerMsg>(capacity = 1000) {
val actorMap = HashMap<String, SendChannel<LockMsg>>()
for (msg in channel) {
when (msg) {
is LockManagerMsg.TryLock -> {
if (msg.requestTime.isBefore(LocalDateTime.now().minusSeconds(waitTimeMilli.milliToSec()))) {
// 락 획득 시간보다 더 오랜 시간이 걸렸다면 timeout 에러 발생
msg.result.complete(FailToExecuteException(ErrorCode.ACQUIRE_LOCK_TIMEOUT))
} else {
try {
// actor 가져오기
val actor = actorMap.computeIfAbsent(msg.key) { _ ->
lockActor(
waitTimeMilli = waitTimeMilli,
leaseTimeMilli = leaseTimeMilli
)
}
// 락 처리 요청
actor.send(
LockMsg.Lock(
requestTime = LocalDateTime.now(),
block = msg.block,
result = msg.result
)
)
} catch (e: Exception) {
msg.result.complete(e)
}
}
}
is LockManagerMsg.ClearActor -> {
logger.info { "before $actorMap" }
// actor channel 비었는지 조회
actorMap.entries.chunked(100).map { actors ->
val deferreds = actors.map { (key, actor) ->
async {
val result = CompletableDeferred<Boolean>()
actor.send(LockMsg.IsEmpty(result))
val isEmpty = try {
withTimeout(10) {
result.await()
}
} catch (e: Exception) {
// 에러에 대한 처리는 따로 하지않음
// 시간 안에 처리 안되면, 작업이 남았다고 간주
false
}
if (isEmpty) {
// 빔
key
} else {
// 안빔
""
}
}
}.toTypedArray()
// 빈 actor만 필터링
val emptyKeys = awaitAll(*deferreds).filter { it != "" }
// 빈 actor 삭제
emptyKeys.forEach { key -> actorMap.remove(key) }
}
logger.info { "after $actorMap" }
}
}
}
}
이는 Key에 따른 Actor에 요청을 보내는 역할을 하는 Actor이다.
기능은 2개 있다.
- lock 시도
- 미사용 Actor 삭제
Lock 시도 로직은 간단하다.
키에 맞는 Actor를 생성, 조회한 후, 이에 lock 요청을 보내는 것이다.
위에서 언급했듯이 삭제 로직은 Actor를 통해 처리되므로 조회 로직과의 동시성이 유지될 수 있다.
하지만 이 부분 때문에 삭제 로직은 최대한 빠르게 작동해야 한다.
그렇지 않으면 조회 성능이 떨어지게 된다.
이러한 점을 극복하기 위해 병렬 처리와 withTimeout을 이용했다.
삭제를 위해 Actor에 처리할 요청이 남았는지 확인하는 작업이 필요하다.
이 요청이 빠르게 처리되지 않는다면, 채널에 처리할 작업이 남아있다 판단할 수 있다.
그래서 확인 로직을 withTimeout로 감싸서, return이 늦다면 삭제 대상이 아니라 식별하도록 했다.
이를 통해 최대한 빠르게 작동하도록 구현했다.
🔵 LockActor
private sealed class LockMsg {
/** 락 설정 */
class Lock(
val requestTime: LocalDateTime,
val block: suspend () -> Any?,
val result: CompletableDeferred<Any?>,
) : LockMsg()
/** 엑터의 채널이 비었는지 확인 */
class IsEmpty(
val result: CompletableDeferred<Boolean>,
) : LockMsg()
}
/**
* 특정 키에 해당된 작업을 순차적으로 수행하는 actor
* msg가 실행된다 == 락을 획득했다 로 여김
*/
@OptIn(ObsoleteCoroutinesApi::class, ExperimentalCoroutinesApi::class)
private fun lockActor(
waitTimeMilli: Long,
leaseTimeMilli: Long,
) = CoroutineScope(Dispatchers.IO).actor<LockMsg>(capacity = 1000) {
for (msg in channel) {
when (msg) {
is LockMsg.Lock -> {
if (msg.requestTime.isBefore(LocalDateTime.now().minusSeconds(waitTimeMilli.milliToSec()))) {
// 락은 획득했지만 락 획득 시간보다 더 오랜 시간이 걸렸다면 timeout 에러 발생
msg.result.complete(FailToExecuteException(ErrorCode.ACQUIRE_LOCK_TIMEOUT))
} else {
try {
// 로직 실행 및 deferred에 결과값 넣기
withTimeout(leaseTimeMilli) {
val rtn = msg.block()
msg.result.complete(rtn)
}
} catch (e: TimeoutCancellationException) {
// 락 획득 시간 에러 처리
msg.result.complete(FailToExecuteException(ErrorCode.LOCK_TIMEOUT_ERROR))
} catch (e: Exception) {
// 이외의 에러 처리
msg.result.complete(e)
}
}
}
is LockMsg.IsEmpty -> {
msg.result.complete(channel.isEmpty)
}
}
}
}
이 부분이 실질적으로 락을 설정하고 서비스 로직을 실행하는 Actor이다.
이 Actor에서 로직이 실행된다는 것을 락을 획득했다고 여긴다.
처리에 대한 결과, 에러는 전달받은 CompletableDeferred를 통해 SuspendableLockManager로 전달한다.
📌 마무리 2

테스트도 잘 통과한다
아쉬운 점
- lock을 통해 서비스 로직이 수행되어 생긴 return 값에 대해 as를 이용하여 강제 형변환을 한다.
- timeout 로직이 여러 군데에 혼재되어 있다.
차차 수정해보도록 하자
'Backend' 카테고리의 다른 글
<Spring> Coroutine Actor 이용해서 단일 서버 락 구현하기 (0) | 2024.08.20 |
---|---|
<Spring> Slack 메세지 전송 에러 처리 (0) | 2024.08.04 |
<Spring> Webflux + Coroutine vs MVC (0) | 2024.06.12 |
<Spring> SUSU의 Coroutine (0) | 2024.05.11 |
<Spring> Webflux + Coroutine + MDC (0) | 2024.05.10 |
<Spring> Coroutine Actor 이용해서 단일 서버 락 구현하기
📌 Actor🔵 Actor란?Actor 모델은 병렬 프로그래밍에서 상태를 가진 개체가 메시지를 주고받으며 비동기적으로 동작하는 개념이다.즉, 자체적인 상태를 가지고 메시지를 수신하며 이를 처리하는
wtg1026.tistory.com
여기서 이어지는 글입니다.
📌 리셋
만든 지 하루 만에 갈아엎어버렸다.
문제가 발견된 건 아닌데, 많은 요소가 복잡하게 엮여 있어서 문제가 발생할 가능성이 높다고 생각했다.
이 부분을 해결하고자 했고 처음부터 다시 로직을 구현해 봤다.
📌 구현 2
이전 로직에서 변경한 부분은 다음과 같다.
- 락 획득 기준 : Queue의 맨 앞 -> LockActor에서 실행됨을 획득으로 간주
- Key 별 Actor 할당, 조회 : ConcurrentHashMap -> LockManagerActor의 변수로 처리
- 미사용 Actor 삭제 처리 : 락 로직 내에서 처리 -> 스케줄링으로 처리
- LockManager로의 Return : Channel -> CompletableDeferred
복잡했던 구조를 더 단순하게 가져가고 싶었다.
Actor를 메시지 큐처럼 사용했었는데, 이 방식이 내부적으로 복잡했었다.
Actor는 작업들의 동시성을 지킬 수 있는데, 이를 락 설정에만 낭비하고 있는 것 같다는 생각이 들었다.
이에 락을 적용해야 할 로직을 Actor 내부에서 실행하게 된다면, 위와 같은 복잡한 로직이 필요 없을 것 같다는 생각에서 구현이 시작되었다.
만약 Actor가 pub/sub 용도가 아닌 직접 내부에서 로직을 실행하는 방식으로 간다면, Queue의 필요성이 사라진다.
Channel이 이미 Queue의 역할을 해주기 때문이다.
그렇기에 락 획득을 판단하는 기준이 Queue의 맨 앞에서 LockActor에서 실행되는 대상으로 변경된다.
또한 기존 미사용 Actor 삭제 로직은 Actor 조회, Queue에 채널 삽입 등 여러 로직과 연관되어 있었다.
이에 삭제 로직을 최대한 독립적으로 만들고 싶었다.
Actor 조회와 Actor 삭제 로직은 둘 중 하나만 작동해야 한다.
그렇지 않으면 같은 키를 가진 요청에 대해 2개의 Actor를 생성하고 동시성이 깨지는 상황이 발생할 수 있다.
이를 이전과 같이 ConcurrentHashMap의 compute()를 이용해서 처리할 수 있었다.
하지만 compute()의 경우, suspend function이 아니라서 다른 Actor로 메시지를 보내기 위해선 코루틴을 생성하여 처리해야 했다.
이 기능을 Actor로 구현한다면 두 작업 간의 동시성을 지킬 수 있고, 작업 처리를 위해 코루틴을 생성할 필요가 없었다.
그래서 기존 ConcurrentHashMap을 통해 키에 해당하는 Actor를 관리하는 작업을 새로운 Actor를 구현하여 처리했다.
삭제 로직이 락 작업에 있어서 병목지점, 성능을 저하하는 작업을 하지 않길 원했다.
락을 설정하여 작업을 처리할 경우, 작업이 순차적으로 진행된다.
이미 락으로 인한 오버헤드가 존재하는 작업에서 더 이상의 오버헤드를 추가하고 싶지 않았다.
이에 삭제 작업은 스케줄러를 통해 처리했다.
Actor에서 Service로 반환값을 받기 위해 기존에는 Channel을 이용했다.
하지만 한 Channel을 만들어서 모든 요청에서 사용하는 방식은 return 값에 대한 통일이 필요했다.
그리고 Queue가 사라진 상태에서
이 부분의 제약을 해결하기 위해, 한 Channel을 공유하는 것이 아니라 CompleteableDeferred를 사용하여 다양한 응답 타입에 대응할 수 있도록 했다.
위와 같은 이유로 변경한 결과 로직의 진행은 아래와 같다.
SuspendableLockManager -> LockManagerActor -> LockActor
^ | |
|--------------------------------------------
lockManager에서 lock을 요청하면 두 액터의 로직 처리를 이용하여 lock과 로직을 처리하게 된다.
SuspendableLockManager -> LockManagerActor -> LockActor 순서로 메시지가 전달된다.
또한 Deferred에 응답을 채우는 방식으로 LockManagerActor, LockActor에서 SuspendableLockManager로 응답이 전달된다.
에러 또한 위 방식으로 전달된다.
🔵 SuspendableLockManager
interface LockManager {
/**
* 서비스 로직 (block)에 key를 이용하여 락을 설정한 후 실행한다.
*/
suspend fun <T> lock(key: String, block: suspend () -> T): T
}
/**
* render 안하면 잘 보입니다. render 푸세요
*
* SuspendableLockManager -> LockManagerActor -> LockActor </br>
* ^ | |
* |-----------------------------------
*
* SuspendableLockManager -> LockManagerActor -> LockActor 순서로 메세지가 전달됨
*
* Deferred에 응답을 채우는 방식으로 LockManagerActor / LockActor 에서 SuspendableLockManager 로 응답이 전달됨
* 에러 또한 이 방식으로 전달됨
*/
@Component
class SuspendableLockManager(
private val coroutineExceptionHandler: ErrorPublishingCoroutineExceptionHandler,
private val lockConfig: LockConfig.ActorLockConfig,
) : LockManager {
private val actor = lockManagerActor(
waitTimeMilli = lockConfig.waitTimeMilli,
leaseTimeMilli = lockConfig.leaseTimeMilli
)
override suspend fun <RETURN> lock(key: String, block: suspend () -> RETURN): RETURN {
val result = CompletableDeferred<Any?>()
// 락 시도
actor.send(
LockManagerMsg.TryLock(
key = key,
block = block,
result = result
)
)
return try {
// 서비스 로직 반환값 or 에러
val rtn = withTimeout(lockConfig.waitTimeMilli + lockConfig.leaseTimeMilli + 1000) {
result.await()
}
// 반환값이 에러면 throw
if (rtn is Exception) {
throw rtn
}
// TODO: as RETURN 안하는 방법 찾아서 수정 바람
rtn as RETURN
} catch (e: TimeoutCancellationException) {
// 락 획득 시간 에러 처리
throw FailToExecuteException(ErrorCode.LOCK_TIMEOUT_ERROR)
} catch (e: Exception) {
// 이외의 에러 처리
throw e
}
}
/**
* channel이 빈 엑터 map에서 삭제하는 스케줄러
*/
@Scheduled(fixedDelay = 1000 * 60)
private fun scheduledClearEmptyActor() {
CoroutineScope(Dispatchers.IO + coroutineExceptionHandler.handler).launch {
clearEmptyActor()
}
}
/**
* channel이 빈 엑터 map에서 삭제
*/
suspend fun clearEmptyActor() {
actor.send(LockManagerMsg.ClearActor())
}
}
LockManager는 lock 서비스를 제공하는 역할을 한다.
이 인터페이스에 대한 구현체로 SuspendableLockManager를 생성했다.
락 설정에 대해 Actor에 요청하고 대기한 후, 응답에 관해 처리한다.
또한 미사용 Actor에 대한 삭제 처리도 이 객체가 담당한다.
🔵 LockManagerActor
private sealed class LockManagerMsg {
/** 락 시도 */
class TryLock(
val requestTime: LocalDateTime = LocalDateTime.now(),
val key: String,
val block: suspend () -> Any?,
val result: CompletableDeferred<Any?>,
) : LockManagerMsg()
/** 미사용 엑터 삭제 */
class ClearActor : LockManagerMsg()
}
/**
* 키에 해당하는 actor에 요청을 보내는 역할을 하는 actor
*/
@OptIn(ObsoleteCoroutinesApi::class)
private fun lockManagerActor(
waitTimeMilli: Long,
leaseTimeMilli: Long,
) = CoroutineScope(Dispatchers.IO).actor<LockManagerMsg>(capacity = 1000) {
val actorMap = HashMap<String, SendChannel<LockMsg>>()
for (msg in channel) {
when (msg) {
is LockManagerMsg.TryLock -> {
if (msg.requestTime.isBefore(LocalDateTime.now().minusSeconds(waitTimeMilli.milliToSec()))) {
// 락 획득 시간보다 더 오랜 시간이 걸렸다면 timeout 에러 발생
msg.result.complete(FailToExecuteException(ErrorCode.ACQUIRE_LOCK_TIMEOUT))
} else {
try {
// actor 가져오기
val actor = actorMap.computeIfAbsent(msg.key) { _ ->
lockActor(
waitTimeMilli = waitTimeMilli,
leaseTimeMilli = leaseTimeMilli
)
}
// 락 처리 요청
actor.send(
LockMsg.Lock(
requestTime = LocalDateTime.now(),
block = msg.block,
result = msg.result
)
)
} catch (e: Exception) {
msg.result.complete(e)
}
}
}
is LockManagerMsg.ClearActor -> {
logger.info { "before $actorMap" }
// actor channel 비었는지 조회
actorMap.entries.chunked(100).map { actors ->
val deferreds = actors.map { (key, actor) ->
async {
val result = CompletableDeferred<Boolean>()
actor.send(LockMsg.IsEmpty(result))
val isEmpty = try {
withTimeout(10) {
result.await()
}
} catch (e: Exception) {
// 에러에 대한 처리는 따로 하지않음
// 시간 안에 처리 안되면, 작업이 남았다고 간주
false
}
if (isEmpty) {
// 빔
key
} else {
// 안빔
""
}
}
}.toTypedArray()
// 빈 actor만 필터링
val emptyKeys = awaitAll(*deferreds).filter { it != "" }
// 빈 actor 삭제
emptyKeys.forEach { key -> actorMap.remove(key) }
}
logger.info { "after $actorMap" }
}
}
}
}
이는 Key에 따른 Actor에 요청을 보내는 역할을 하는 Actor이다.
기능은 2개 있다.
- lock 시도
- 미사용 Actor 삭제
Lock 시도 로직은 간단하다.
키에 맞는 Actor를 생성, 조회한 후, 이에 lock 요청을 보내는 것이다.
위에서 언급했듯이 삭제 로직은 Actor를 통해 처리되므로 조회 로직과의 동시성이 유지될 수 있다.
하지만 이 부분 때문에 삭제 로직은 최대한 빠르게 작동해야 한다.
그렇지 않으면 조회 성능이 떨어지게 된다.
이러한 점을 극복하기 위해 병렬 처리와 withTimeout을 이용했다.
삭제를 위해 Actor에 처리할 요청이 남았는지 확인하는 작업이 필요하다.
이 요청이 빠르게 처리되지 않는다면, 채널에 처리할 작업이 남아있다 판단할 수 있다.
그래서 확인 로직을 withTimeout로 감싸서, return이 늦다면 삭제 대상이 아니라 식별하도록 했다.
이를 통해 최대한 빠르게 작동하도록 구현했다.
🔵 LockActor
private sealed class LockMsg {
/** 락 설정 */
class Lock(
val requestTime: LocalDateTime,
val block: suspend () -> Any?,
val result: CompletableDeferred<Any?>,
) : LockMsg()
/** 엑터의 채널이 비었는지 확인 */
class IsEmpty(
val result: CompletableDeferred<Boolean>,
) : LockMsg()
}
/**
* 특정 키에 해당된 작업을 순차적으로 수행하는 actor
* msg가 실행된다 == 락을 획득했다 로 여김
*/
@OptIn(ObsoleteCoroutinesApi::class, ExperimentalCoroutinesApi::class)
private fun lockActor(
waitTimeMilli: Long,
leaseTimeMilli: Long,
) = CoroutineScope(Dispatchers.IO).actor<LockMsg>(capacity = 1000) {
for (msg in channel) {
when (msg) {
is LockMsg.Lock -> {
if (msg.requestTime.isBefore(LocalDateTime.now().minusSeconds(waitTimeMilli.milliToSec()))) {
// 락은 획득했지만 락 획득 시간보다 더 오랜 시간이 걸렸다면 timeout 에러 발생
msg.result.complete(FailToExecuteException(ErrorCode.ACQUIRE_LOCK_TIMEOUT))
} else {
try {
// 로직 실행 및 deferred에 결과값 넣기
withTimeout(leaseTimeMilli) {
val rtn = msg.block()
msg.result.complete(rtn)
}
} catch (e: TimeoutCancellationException) {
// 락 획득 시간 에러 처리
msg.result.complete(FailToExecuteException(ErrorCode.LOCK_TIMEOUT_ERROR))
} catch (e: Exception) {
// 이외의 에러 처리
msg.result.complete(e)
}
}
}
is LockMsg.IsEmpty -> {
msg.result.complete(channel.isEmpty)
}
}
}
}
이 부분이 실질적으로 락을 설정하고 서비스 로직을 실행하는 Actor이다.
이 Actor에서 로직이 실행된다는 것을 락을 획득했다고 여긴다.
처리에 대한 결과, 에러는 전달받은 CompletableDeferred를 통해 SuspendableLockManager로 전달한다.
📌 마무리 2

테스트도 잘 통과한다
아쉬운 점
- lock을 통해 서비스 로직이 수행되어 생긴 return 값에 대해 as를 이용하여 강제 형변환을 한다.
- timeout 로직이 여러 군데에 혼재되어 있다.
차차 수정해보도록 하자
'Backend' 카테고리의 다른 글
<Spring> Coroutine Actor 이용해서 단일 서버 락 구현하기 (0) | 2024.08.20 |
---|---|
<Spring> Slack 메세지 전송 에러 처리 (0) | 2024.08.04 |
<Spring> Webflux + Coroutine vs MVC (0) | 2024.06.12 |
<Spring> SUSU의 Coroutine (0) | 2024.05.11 |
<Spring> Webflux + Coroutine + MDC (0) | 2024.05.10 |