Backend

<Spring> Coroutine Actor ์ด์šฉํ•ด์„œ ๋‹จ์ผ ์„œ๋ฒ„ ๋ฝ ๊ตฌํ˜„ํ•˜๊ธฐ

wjdtkdgns 2024. 8. 20. 01:53

๐Ÿ“Œ Actor

๐Ÿ”ต Actor๋ž€?

Actor ๋ชจ๋ธ์€ ๋ณ‘๋ ฌ ํ”„๋กœ๊ทธ๋ž˜๋ฐ์—์„œ ์ƒํƒœ๋ฅผ ๊ฐ€์ง„ ๊ฐœ์ฒด๊ฐ€ ๋ฉ”์‹œ์ง€๋ฅผ ์ฃผ๊ณ ๋ฐ›์œผ๋ฉฐ ๋น„๋™๊ธฐ์ ์œผ๋กœ ๋™์ž‘ํ•˜๋Š” ๊ฐœ๋…์ด๋‹ค.

์ฆ‰, ์ž์ฒด์ ์ธ ์ƒํƒœ๋ฅผ ๊ฐ€์ง€๊ณ  ๋ฉ”์‹œ์ง€๋ฅผ ์ˆ˜์‹ ํ•˜๋ฉฐ ์ด๋ฅผ ์ฒ˜๋ฆฌํ•˜๋Š” ๋…ผ๋ฆฌ๋ฅผ ๊ฐ€์ง€๊ณ  ์žˆ๋Š” ์‹คํ–‰ ๋‹จ์œ„์ด๋‹ค.

์ฝ”๋ฃจํ‹ด์—์„œ๋„ ์Šค๋ ˆ๋“œ ๊ฐ„ ๋™๊ธฐํ™”๋ฅผ ์ง€์›ํ•˜๊ธฐ ์œ„ํ•ด ์ด๋ฅผ ์ง€์›ํ•ด ์ค€๋‹ค.

 

actor

Launches new coroutine that is receiving messages from its mailbox channel and returns a reference to its mailbox channel as a SendChannel. The resulting object can be used to send messages to this coroutine. The scope of the coroutine contains ActorScope

kotlinlang.org

์ƒํƒœ์˜ ์•ก์„ธ์Šค๋Š” ๋‹จ์ผ ์Šค๋ ˆ๋“œ๋กœ ํ•œ์ •๋˜๋ฉฐ, actor์™€์˜ ํ†ต์‹ ์€ SendChannel์„ ํ†ตํ•ด ์ด๋ค„์ง„๋‹ค.

Channel์„ ํ†ตํ•ด ์ „๋‹ฌ๋œ ๋ฉ”์‹œ์ง€๋Š” ํ•˜๋‚˜์”ฉ ์ˆœ์„œ๋Œ€๋กœ ์ฒ˜๋ฆฌ๋˜๊ฒŒ ๋œ๋‹ค.

 

๐Ÿ”ต ์ด๊ฑธ๋กœ ์–ด๋–ป๊ฒŒ ๋ฝ์„?

actor์˜ ๋‹จ์ผ ์Šค๋ ˆ๋“œ๋กœ ์ž‘๋™ํ•œ๋‹ค๋Š” ํŠน์ง•์„ ์ด์šฉํ•˜๋ฉด ๋  ๊ฒƒ ๊ฐ™์•˜๋‹ค.

๋‹จ์ผ ์Šค๋ ˆ๋“œ๋ผ์„œ ์ž‘์—…์„ ํ•˜๋‚˜์”ฉ ์ฒ˜๋ฆฌํ•˜๊ฒŒ ๋˜๊ณ , ์ด ๋•Œ๋ฌธ์— ๋ฝ ์ž‘์—…์— ์žˆ์–ด์„œ ๋™์‹œ์„ฑ ์ฒ˜๋ฆฌ๊ฐ€ ๊ฐ€๋Šฅํ•˜๋‹ค.

๋˜ํ•œ ์ฝ”๋ฃจํ‹ด์ด๋ผ์„œ ๋™์‹œ์„ฑ ์ฒ˜๋ฆฌ + suspend๊ฐ€ ๊ฐ€๋Šฅํ•˜์ง€ ์•Š์„๊นŒ ์ƒ๊ฐํ–ˆ๋‹ค.

 

 

๐Ÿ“Œ ๊ทธ๋ž˜์„œ ์–ด๋–ป๊ฒŒ ๋งŒ๋“ค์—ˆ๋‚˜?

๐Ÿ”ต Pub / Sub

๋ฝ์„ ๋งŒ๋“ค๋ ค ํ•  ๋•Œ, ํฌ๊ฒŒ 2๊ฐ€์ง€ ์„ ํƒ์ง€๊ฐ€ ์žˆ๋‹ค

  1. ์Šคํ•€๋ฝ
  2. pub / sub

์ฝ”๋ฃจํ‹ด์„ ์‚ฌ์šฉํ•˜๊ณ  ์žˆ๋Š” ์ƒํƒœ์—์„œ ์Šคํ•€๋ฝ์„ ์‚ฌ์šฉํ•œ๋‹ค๋ฉด, ์ฝ”๋ฃจํ‹ด์˜ suspend function์˜ ์žฅ์ ์„ ์‚ฌ์šฉํ•  ์ˆ˜ ์—†๋‹ค๊ณ  ํŒ๋‹จํ–ˆ๋‹ค.

์ด์— pub / sub์œผ๋กœ ๊ตฌํ˜„ํ•ด์•ผ ํ–ˆ๋‹ค.

 

๊ทธ๋ ‡๋‹ค๋ฉด ๋ฝ ํš๋“์„ ์š”์ฒญํ•˜๊ณ , ๋ฝ์„ ํš๋“ํ–ˆ์„ ๋•Œ ๋ฝ ํš๋“์„ ์–ด๋–ป๊ฒŒ ์•Œ๋ฆด ์ˆ˜ ์žˆ์„๊นŒ?๋ผ๋Š” ๊ณ ๋ฏผ์„ ํ•ด๊ฒฐํ•ด์•ผ ํ–ˆ๋‹ค.

Actor๋Š” SendChannel์„ ํ†ตํ•ด ๋ฉ”์‹œ์ง€๋ฅผ ์ „๋‹ฌํ•  ๋ฟ, Actor์ชฝ์—์„œ Message Sender์ชฝ์œผ๋กœ ํŠน์ • ์‘๋‹ต์„ ์ค„ ์ˆ˜ ์—†๋‹ค.

๊ทธ๋ ‡๋‹ค๋ฉด ํŠน์ • ์š”์ฒญ์— ํ•ด๋‹นํ•˜๋Š” Channel์„ Actor๋กœ ๋„˜๊ฒจ์ฃผ๊ณ , Actor์—์„œ ๋ฝ ๊ด€๋ จ ์š”์ฒญ์— ๊ด€ํ•œ ์‘๋‹ต์„ ์ „๋‹ฌํ•œ ์ฑ„๋„์„ ํ†ตํ•ด ๋ณด๋‚ด์ค€๋‹ค๋ฉด, ๋ฝ ํš๋“์— ๋Œ€ํ•œ ๋ฉ”์„ธ์ง€๋ฅผ ๋ฐ›์„ ์ˆ˜ ์žˆ์„ ๊ฒƒ์ด๋ผ ์ƒ๊ฐํ–ˆ๋‹ค.

 

๋˜ํ•œ ์ฝ”๋ฃจํ‹ด์„ ์ด์šฉํ•˜๊ธฐ ๋•Œ๋ฌธ์— non-blocking ํ•˜๊ฒŒ ๋ฝ์— ๋Œ€ํ•œ ์š”์ฒญ์„ ํ•  ์ˆ˜ ์žˆ์„ ๊ฒƒ์ด๋ผ ์ƒ๊ฐํ–ˆ๋‹ค.

 

๐Ÿ”ต LockActor

private enum class LockReturn {
    /**
     * ๋ฝ ์‹คํ–‰
     */
    PROCESS_LOCK,

    /**
     * ๋ฝ ํ•ด์ œ๋จ
     */
    UNLOCK,

    /**
     * ๋“ฑ๋ก๋œ ์ฑ„๋„ ์‚ญ์ œ
     */
    DELETE_CHANNEL,

    /**
     * ๋ฝ ํ๊ฐ€ ์•ˆ๋น„์—ˆ์Œ
     */
    NOT_EMPTY_QUEUE,

    /**
     * ๋ฝ ํ๊ฐ€ ๋น„์—ˆ์Œ
     */
    EMPTY_QUEUE,
    ;
}

private sealed class LockMsg {
    /** ๋ฝ ํš๋“ ์‹œ๋„ */
    class TryLock(val channel: SendChannel<LockReturn>) : LockMsg()

    /** ๋ฝ ํ•ด์ œ */
    class UnLock(val channel: SendChannel<LockReturn>) : LockMsg()

    /** ๋“ฑ๋ก๋œ ์ฑ„๋„ ์ง€์šฐ๊ธฐ */
    class DeleteChannel(val channel: SendChannel<LockReturn>) : LockMsg()

    /** ํ ๋น„์—ˆ๋Š”์ง€ ํ™•์ธ */
    class CheckQueueEmpty(val channel: SendChannel<LockReturn>) : LockMsg()
}

@OptIn(ObsoleteCoroutinesApi::class)
private fun lockActor() = CoroutineScope(Dispatchers.IO).actor<LockMsg>(capacity = 1000) {
    // queue ๋งจ ์•ž == ๋ฝ ์„ค์ •
    val lockQueue = LinkedList<SendChannel<LockReturn>>()

    for (msg in channel) {
        when (msg) {
            is LockMsg.TryLock -> {
                // ํ์— ์ฑ„๋„ ๋“ฑ๋กํ•˜๊ธฐ
                lockQueue.offer(msg.channel)

                // ๋งŒ์•ฝ ๋ฐฉ๊ธˆ ๋“ฑ๋กํ•œ ์ฑ„๋„์ด ํ์˜ ๋งจ ์•ž์ด๋ผ๋ฉด ๋ฐ”๋กœ ์‹คํ–‰
                if (lockQueue.peek() == msg.channel) {
                    msg.channel.send(LockReturn.PROCESS_LOCK)
                }
            }

            is LockMsg.UnLock -> {
                // ํ˜„์žฌ ๋ฝ์„ ํš๋“ํ•œ ์ฑ„๋„์„ ํ์—์„œ ์‚ญ์ œ
                lockQueue.poll()

                // ๋‹ค์Œ ๋ฝ ํš๋“ ๋Œ€์ƒ notifyํ•˜๊ธฐ
                if (lockQueue.peek() != null) {
                    lockQueue.peek().send(LockReturn.PROCESS_LOCK)
                }

                // ๋ฝ ํ•ด์ œ ๋ฐ ํ ์‚ญ์ œ ์™„๋ฃŒ ์•Œ๋ฆฌ๊ธฐ
                msg.channel.send(LockReturn.UNLOCK)
            }

            is LockMsg.DeleteChannel -> {
                if (lockQueue.peek() == msg.channel) {
                    // ์‚ญ์ œํ•˜๋ ค๋Š” ์ฑ„๋„์ด ํ์˜ ๋งจ ์•ž์ผ ๋•Œ, ํ์—์„œ ์‚ญ์ œํ•˜๊ณ  ๋‹ค์Œ๊บผ ์‹คํ–‰
                    lockQueue.poll()
                    if (lockQueue.peek() != null) {
                        lockQueue.peek().send(LockReturn.PROCESS_LOCK)
                    }
                } else {
                    // ์‚ญ์ œํ•˜๋ ค๋Š” ์ฑ„๋„์ด ํ์˜ ๋งจ ์•ž์ด ์•„๋‹ ๋•Œ, ํ์—์„œ๋งŒ ์‚ญ์ œ
                    lockQueue.remove(msg.channel)
                }

                // ์‚ญ์ œ ์™„๋ฃŒ ์ฒ˜๋ฆฌ ์•Œ๋ฆฌ๊ธฐ
                msg.channel.send(LockReturn.DELETE_CHANNEL)
            }

            is LockMsg.CheckQueueEmpty -> {
                if (lockQueue.peek() == null) {
                    msg.channel.send(LockReturn.EMPTY_QUEUE)
                } else {
                    msg.channel.send(LockReturn.NOT_EMPTY_QUEUE)
                }
            }
        }
    }
}

 

actor์˜ ๊ธฐ๋Šฅ์€ ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค

  1. ๋ฝ ์„ค์ •
  2. ๋ฝ ํ•ด์ œ
  3. ๋“ฑ๋ก๋œ ์ฑ„๋„ ์‚ญ์ œ
  4. ๋“ฑ๋ก๋œ ์ฑ„๋„์ด ์žˆ๋Š”์ง€ ํ™•์ธ

actor๋Š” ๋ฝ ํš๋“ ์š”์ฒญ์— ๋Œ€ํ•œ ๊ด€๋ฆฌ์™€ ํ˜„์žฌ ๋ฝ์„ ํš๋“ํ•œ ์š”์ฒญ์—๊ฒŒ ๋ฝ ํš๋“ ๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋‚ด์ฃผ๋Š” ์—ญํ• ์„ ํ•œ๋‹ค.

subscriber์˜ ์ฑ„๋„์„ ํ์— ๋„ฃ์–ด๋‘๊ณ , ๋ฝ ํš๋“์— ์ •๋ณด๋ฅผ ์ฑ„๋„์„ ํ†ตํ•ด ๋ฉ”์„ธ์ง€๋ฅผ ์ „๋‹ฌํ•˜๋Š” ์ผ์„ ํ•œ๋‹ค.

Lock ํš๋“์„ ์š”์ฒญํ•˜๋ฉด ์ฑ„๋„์„ queue์— ๋„ฃ์–ด๋‘๊ณ , Lock ํ•ด์ œ๋ฅผ ์š”์ฒญํ•˜๋ฉด queue์—์„œ ๋ณธ์ธ ์ฑ„๋„์„ ์ œ๊ฑฐํ•œ ํ›„ ๋‹ค๋ฅธ ์ฑ„๋„์— ๋ฝ ํš๋“ ๋ฉ”์„ธ์ง€๋ฅผ ์ „ํ•œ๋‹ค.

๋˜ํ•œ ์—๋Ÿฌ ์ฒ˜๋ฆฌ์™€ ๋ฉ”๋ชจ๋ฆฌ ๊ด€๋ฆฌ๋ฅผ ํ์— ์œ„์น˜ํ•˜๋Š” ์ฑ„๋„ ์ œ๊ฑฐ ๋ฐ ํ์— ๋“ฑ๋ก๋œ ์ฑ„๋„์ด ์žˆ๋Š”์ง€ ํ™•์ธํ•˜๋Š” ๊ธฐ๋Šฅ์„ ๊ตฌํ˜„ํ–ˆ๋‹ค.

 

'๋ฝ์„ ํš๋“ํ–ˆ๋‹ค'์˜ ๊ธฐ์ค€์€ queue์˜ ๋งจ ์•ž์— ๋ฌด์—‡์ด ์žˆ๋Š”๊ฐ€์ด๋‹ค.

queue ๋งจ ์•ž์— ์œ„์น˜ํ•œ ์ฑ„๋„์ด ๋ฝ์„ ์†Œ์œ ํ–ˆ๋‹ค๊ณ  ์—ฌ๊ธด๋‹ค.

lock์„ ํš๋“ํ•˜๋ ค ํ•  ๋•Œ queue์— ์ฑ„๋„์„ ์ถ”๊ฐ€ํ•œ ํ›„ ๋Œ€๊ธฐํ•œ๋‹ค.

๋˜ํ•œ lock์„ ํ•ด์ œํ•  ๋•Œ ๋งจ ์•ž์— ์œ„์น˜ํ•œ ์ฑ„๋„, ์ฆ‰, ํ˜„์žฌ ๋ฝ์„ ์†Œ์œ ํ•œ ์ฑ„๋„์„ ์ œ๊ฑฐํ•˜๊ณ  ๋‹ค์Œ ๋ฝ ํš๋“ ๋Œ€์ƒ์— ๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋‚ด๊ฒŒ ๋œ๋‹ค.

์ด๋Ÿฌํ•œ ๋ฐฉ์‹์œผ๋กœ ๋ฝ์˜ ํš๋“๊ณผ ํ•ด์ œ๊ฐ€ ์ด๋ค„์ง„๋‹ค.

 

์šฐ์„  actor์™€ ๋ฝ ์„œ๋น„์Šค ๊ฐ„ ํ†ต์‹ ์€ ์ฑ„๋„์„ ๊ธฐ๋ฐ˜์œผ๋กœ ์ง„ํ–‰๋œ๋‹ค.

lockActor์˜ ๊ธฐ๋Šฅ์˜ ํŒŒ๋ฆฌ๋ฏธํ„ฐ์— channel์„ ํฌํ•จํ•œ๋‹ค.

์ด channel์ด actor์—์„œ ๋ฝ ์„œ๋น„์Šค๋กœ ๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋‚ด๋Š” ์—ญํ• ์„ ํ•œ๋‹ค.

๊ทธ๋ž˜์„œ actor์™€์˜ ์š”์ฒญ ์ฒ˜๋ฆฌ๋Š” ๋ชจ๋‘ ์•„๋ž˜์™€ ๊ฐ™์€ ๋ฐฉ์‹์œผ๋กœ ์ง„ํ–‰๋œ๋‹ค.

actor.send(LockMsg.TryLock(channel))
channel.receive()

 

๐Ÿ”ต ๋ฝ ์ฒ˜๋ฆฌ

@Component
class SuspendableLockManager : LockManager {
    companion object {
        private const val WAIT_TIME = 3000L
        private const val LEASE_TIME = 3000L
    }

    private val actorMap = ConcurrentHashMap<String, SendChannel<LockMsg>>()

    override suspend fun <T> lock(key: String, block: suspend () -> T): T {
        // lock ๊ด€๋ จ ๋ฆฌํ„ด ๋ฐ›์„ ์ฑ„๋„
        Channel<LockReturn>().run {
            val channel = this

            // ๋ฝ ์„ค์ •
            val actor = tryLock(key, channel)

            try {
                // ๋กœ์ง ์‹คํ–‰
                return withTimeout(LEASE_TIME) {
                    block()
                }
            } catch (e: TimeoutCancellationException) {
                // ๋ฝ ๋ณด์œ  ์‹œ๊ฐ„ ์—๋Ÿฌ ์ฒ˜๋ฆฌ
                throw FailToExecuteException(ErrorCode.LOCK_TIMEOUT_ERROR)
            } catch (e: Exception) {
                // ๋‚˜๋จธ์ง€ ์—๋Ÿฌ ์ฒ˜๋ฆฌ
                throw e
            } finally {
                // ๋ฝ ํ•ด์ œ
                releaseLock(actor, channel)

                // ํ๊ฐ€ ๋นˆ ์•กํ„ฐ ์‚ญ์ œ
                deleteEmptyQueueActor(channel, key)

                logger.info { actorMap }
            }
        }
    }

    private suspend fun tryLock(key: String, channel: Channel<LockReturn>): SendChannel<LockMsg> {
        val actor = actorMap.compute(key) { _, value ->
            val actor = value ?: lockActor()

            runBlocking(Dispatchers.Unconfined) {
                actor.send(LockMsg.TryLock(channel))
            }

            actor
        } ?: throw FailToExecuteException(ErrorCode.FAIL_TO_GET_LOCK)

        try {
            withTimeout(WAIT_TIME) {
                channel.receive()
            }
        } catch (e: TimeoutCancellationException) {
            // ๋ฝ ํš๋“ ์‹œ๊ฐ„ ์—๋Ÿฌ ์ฒ˜๋ฆฌ
            throw FailToExecuteException(ErrorCode.ACQUIRE_LOCK_TIMEOUT)
        } catch (e: Exception) {
            // ์ˆ˜์‹  ์ฑ„๋„ ์ง€์šฐ๊ธฐ
            actor.send(LockMsg.DeleteChannel(channel))
            channel.receive()

            throw FailToExecuteException(ErrorCode.FAIL_TO_EXECUTE_LOCK)
        }

        return actor
    }

    private suspend fun releaseLock(actor: SendChannel<LockMsg>, channel: Channel<LockReturn>) {
        actor.send(LockMsg.UnLock(channel))
        channel.receive()
    }

    private suspend fun deleteEmptyQueueActor(channel: Channel<LockReturn>, key: String) {
        actorMap.computeIfPresent(key) { _, value ->
            val rtn = runBlocking(Dispatchers.Unconfined) {
                value.send(LockMsg.CheckQueueEmpty(channel))
                channel.receive()
            }

            if (rtn == LockReturn.EMPTY_QUEUE) {
                null
            } else {
                value
            }
        }
    }
}

์ฝ”๋“œ๋Š” ํฌ๊ฒŒ 4๊ฐœ ๋ถ€๋ถ„์œผ๋กœ ๋‚˜๋‰œ๋‹ค.

  1. ๋ฝ ํš๋“
  2. ๋ฝ ํ•ด์ œ
  3. actor ์‚ญ์ œ

์ž์„ธํ•œ ์„ค๋ช…์€ ํ›„์ˆ  ํ•˜๊ฒ ๋‹ค.

 

๐Ÿ”ต ๋ฝ ํš๋“

๋ฝ์˜ ๋ฒ”์œ„๋ฅผ ์ตœ์†Œํ™”ํ•˜๊ณ  ์‹ถ์—ˆ๋‹ค.

๊ทธ๋ž˜์„œ ํ‚ค๋ฅผ ๊ธฐ์ค€์œผ๋กœ actor๋ฅผ ์ƒ์„ฑํ•ด์„œ ์‚ฌ์šฉํ–ˆ๋‹ค.

์ด๋•Œ, actorMap์˜ ์กฐํšŒ ๋ฐ ์—…๋ฐ์ดํŠธ์— ์žˆ์–ด์„œ ๋™์‹œ์„ฑ์„ ๋ณด์žฅํ•  ํ•„์š”๊ฐ€ ์žˆ์—ˆ๋‹ค.

๊ทธ๋ž˜์„œ ConcurrentHashMap์˜ compute๋ฅผ ์ด์šฉํ–ˆ๋‹ค.

ํ‚ค๊ฐ€ ๋‹ค๋ฅด๋‹ค๋ฉด ๋‹ค๋ฅธ actor๋ฅผ ์‚ฌ์šฉํ•˜๋ฏ€๋กœ ์„œ๋กœ ๋‹ค๋ฅธ ํ‚ค๋ฅผ ์ด์šฉํ•œ ๋ฝ์ด ์„œ๋กœ์—๊ฒŒ ์˜ํ–ฅ์„ ์ค„ ์ˆ˜ ์—†๊ฒŒ ๋ผ์„œ, ๋ฝ์˜ ๋ฒ”์œ„๋ฅผ ํ•ด๋‹น ํ‚ค๋ฅผ ์ด์šฉํ•˜๋Š” ๋กœ์ง์œผ๋กœ ํ•œ์ •ํ•  ์ˆ˜ ์žˆ์—ˆ๋‹ค.
๋˜ํ•œ compute ์—ฐ์‚ฐ์—์„œ actor์— lock ์š”์ฒญ ๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋‚ด๋„๋ก ํ–ˆ๋‹ค.

๋ฌผ๋ก  ConcurrentHashMap์—์„œ actor๋ฅผ ๊ฐ€์ ธ์˜ฌ ๋•Œ, compute๋ฅผ ํ†ตํ•œ ์—ฐ์‚ฐ์„ ํ•˜๊ธฐ ๋•Œ๋ฌธ์— ์„œ๋กœ ๊ฐ„ ์˜ํ–ฅ์ด ์žˆ์ง€๋งŒ, compute ๋‚ด๋ถ€ ๋กœ์ง์—์„œ ๋ฝ ํš๋“์— ๋Œ€ํ•ด ๋Œ€๊ธฐํ•˜์ง€ ์•Š๊ธฐ ๋•Œ๋ฌธ์—, ์—ฐ์‚ฐ ์†๋„์— ์žˆ์–ด์„œ ํฐ ์˜ํ–ฅ์€ ์—†์„ ๊ฑฐ๋ผ ์ƒ๊ฐํ•œ๋‹ค.

 

์ด ์š”์ฒญ์„ ๋ฐ›์€ actor๋Š” ๋ฝ ํš๋“ ๊ด€๋ จ ์ฑ„๋„์„ queue์— ๋„ฃ๋Š”๋‹ค.

์ด ๊ณผ์ •์ด ๋‹ค ์ˆ˜ํ–‰๋˜๋”๋ผ๋„ channel.receive()๊ฐ€ ์ฒ˜๋ฆฌ๋˜์ง€ ์•Š๊ธฐ ๋•Œ๋ฌธ์—, ๋ฝ์„ ์š”์ฒญํ•œ ์ฝ”๋ฃจํ‹ด์€ suspend ๋œ ์ƒํƒœ์ด๋‹ค.

 

๋งŒ์•ฝ ๋‹ค๋ฅธ ๋ฝ์ด unlock ํ•˜๋ฉด์„œ ์ด ์ฑ„๋„์— ๋ฝ ํš๋“ํ•œ๋‹ค๋ฉด, actor์—์„œ ์ฑ„๋„์„ ํ†ตํ•ด ๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋‚ด๊ฒŒ ๋˜๊ณ , ์ด๋Š” ํ•ด๋‹น ์ฝ”๋ฃจํ‹ด์ด ๋‹ค์‹œ ์‹คํ–‰๋จ์„ ์˜๋ฏธํ•œ๋‹ค.

๊ทธ๋Ÿฌ๋ฏ€๋กœ ๋ฝ์„ ์–ป์€ ์ƒํƒœ์—์„œ ์ฃผ์–ด์ง„ ๋กœ์ง์„ ์ง„ํ–‰ํ•  ์ˆ˜ ์žˆ๊ฒŒ ๋œ๋‹ค.

 

๋ฝ ํš๋“ ๊ณผ์ •์—์„œ ์—๋Ÿฌ๊ฐ€ ๋ฐœ์ƒํ•˜๊ฑฐ๋‚˜, ๋ฝ ํš๋“์— ์˜ค๋žœ ์‹œ๊ฐ„์ด ์†Œ์š”๋  ๊ฒฝ์šฐ, ์—๋Ÿฌ์ฒ˜๋ฆฌ๊ฐ€ ํ•„์š”ํ–ˆ๋‹ค.

๋จผ์ € ๋ฝ ํš๋“์— ์˜ค๋žœ ์‹œ๊ฐ„์ด ์†Œ์š”๋˜๋Š” ๊ฒฝ์šฐ๋Š” withTimeout()์œผ๋กœ ์ฒ˜๋ฆฌํ–ˆ๋‹ค.

๋˜ํ•œ ํš๋“ ๊ณผ์ •์—์„œ์˜ ์—๋Ÿฌ๋Š” actor์˜ queue์—์„œ ์ฑ„๋„์„ ์ œ๊ฑฐํ•˜๋Š” ๋ฐฉ์‹์œผ๋กœ ์ฒ˜๋ฆฌํ–ˆ๋‹ค.

์ด๋ฅผ ํ†ตํ•ด actor๊ฐ€ ์—๋Ÿฌ๊ฐ€ ๋ฐœ์ƒํ•œ ์š”์ฒญ์— ๊ด€ํ•ด ์ฒ˜๋ฆฌํ•˜์ง€ ์•Š๋„๋ก ํ–ˆ๋‹ค.

 

๐Ÿ”ต ๋ฝ ํ•ด์ œ

๋ฝ ํ•ด์ œ๋Š” ๋น„๊ต์  ๊ฐ„๋‹จํ•˜๋‹ค.

๋ฝ์„ ํ˜„์žฌ ๋ณด์œ ํ•œ ๋Œ€์ƒ ์ฆ‰, queue์˜ ๋งจ ์•ž ์ฑ„๋„์„ ์ œ๊ฑฐํ•œ ํ›„, queue์˜ ๋งจ ์•ž์— ์œ„์น˜ํ•œ ์ฑ„๋„์— ๋ฝ ํš๋“ ๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋‚ธ๋‹ค.

 

๐Ÿ”ต Actor ์‚ญ์ œ

์ด ๋ถ€๋ถ„์ด ์—†์–ด๋„ ์ž˜ ์ž‘๋™ํ•  ๊ฒƒ์ด๋‹ค.

ํ•˜์ง€๋งŒ ์šฐ๋ฆฌ๋Š” ์„œ๋ฒ„ ๋ฉ”๋ชจ๋ฆฌ ๋˜ํ•œ ๊ณ ๋ คํ•ด์•ผ ํ•œ๋‹ค.

์ฒ˜๋ฆฌํ•ด์•ผ ํ•˜๋Š” ์š”์ฒญ์ด ์—†๋”๋ผ๋„ ํ•ด๋‹น ์š”์ฒญ์— ๋Œ€ํ•œ ํ‚ค์˜ ์•กํ„ฐ๊ฐ€ ์ƒ์„ฑ๋˜์–ด ์žˆ๋‹ค๋ฉด, ์ด๋Š” ์‚ญ์ œ๋˜์ง€ ์•Š๊ณ  ๋‚จ์•„์„œ ๋ฉ”๋ชจ๋ฆฌ๋งŒ ์ฐจ์ง€ํ•˜๊ฒŒ ๋œ๋‹ค.

๊ทธ๋ž˜์„œ OOM์„ ๋ง‰๊ธฐ ์œ„ํ•ด ์‚ฌ์šฉํ•˜์ง€ ์•Š๋Š” ์•กํ„ฐ๋ฅผ ์ง€์›Œ์•ผ ํ–ˆ๋‹ค.

 

์ด๋•Œ actor๋ฅผ ์กฐํšŒํ•˜๋Š” ๋ถ€๋ถ„๊ณผ์˜ ๋ฝ๋„ ๊ณ ๋ คํ•ด์•ผ ํ–ˆ๋‹ค. 

actor๋ฅผ ์‚ญ์ œํ•˜๋Š” ๋กœ์ง๊ณผ actor๋ฅผ ์กฐํšŒ, ํ์— ์ฑ„๋„์„ ๋“ฑ๋กํ•˜๋Š” ๋กœ์ง์ด ๋™์‹œ์— ์ž‘๋™ํ•œ๋‹ค๋ฉด, ์—๋Ÿฌ๊ฐ€ ๋ฐœ์ƒํ•  ๊ฒƒ์ด ๋ถ„๋ช…ํ–ˆ๋‹ค.

๊ทธ๋ž˜์„œ ์ด ๋‘ ์ž‘์—…์— ๋Œ€ํ•œ ๋™์‹œ์„ฑ์„ ๊ณ ๋ คํ•˜๊ธฐ ์œ„ํ•ด ConcurrentHashmap์˜ compute๋ฅผ ์ด์šฉํ–ˆ๋‹ค.

 

์ด๋ฅผ ํ†ตํ•ด ์‚ฌ์šฉํ•˜์ง€ ์•Š๋Š” actor๋ฅผ ์ง€์›Œ์คŒ์œผ๋กœ์จ ๋ถˆํ•„์š”ํ•œ ๋ฉ”๋ชจ๋ฆฌ ๋‚ญ๋น„๋ฅผ ๋ฐฉ์ง€ํ–ˆ๋‹ค.

 

๐Ÿ”ต ์‚ฌ์šฉ

suspend fun vote(user: AuthUser, id: Long, request: CreateVoteHistoryRequest) {
    lockManager.lock(LockKey.getVoteKey(id)) {
        when (request.isCancel) {
            true -> cancelVote(user.uid, id, request.optionId)
            false -> castVote(user.uid, id, request.optionId)
        }
    }
}

lock ํ•จ์ˆ˜ ํŒŒ๋ผ๋ฏธํ„ฐ๋กœ ์„œ๋น„์Šค ๋กœ์ง์„ ๋„˜๊ฒจ์ฃผ๋ฉด, ํ•ด๋‹น ๋กœ์ง์— ๊ด€ํ•ด ๋ฝ์„ ์ ์šฉํ•˜์—ฌ ๋™์ž‘ํ•˜๊ฒŒ ๋œ๋‹ค.

ํ‚ค ์ƒ์„ฑ์—๋Š” LockKey๋ผ๋Š” ํด๋ž˜์Šค๋ฅผ ๋”ฐ๋กœ ๋งŒ๋“ค์–ด์„œ ์ฒ˜๋ฆฌํ–ˆ๋‹ค.

 

 

๐Ÿ“Œ ๋งˆ๋ฌด๋ฆฌ

์ผ๋‹จ ์ฝ”๋“œ ๋จธ์ง€๋Š” ๋˜์—ˆ์œผ๋‚˜, ๊ฐœ์„ ํ•ด์•ผ ํ•  ๋ถ€๋ถ„์€ ๋‚จ์•˜๋‹ค ์ƒ๊ฐํ•œ๋‹ค.

์•„์ง์€ ์–ด๋”˜์ง€ ๋ชจ๋ฅด๊ฒ ์ง€๋งŒ, ๋ฐ๋“œ๋ฝ์ด ๋ฐœ์ƒํ•  ๊ฒƒ ๊ฐ™์€ ๋ถˆ์•ˆํ•จ(?)์ด ์žˆ๋‹ค.

์ผ๋‹จ ๋‹ค ์ ์šฉํ•˜์ง„ ์•Š์•˜๊ณ , ์‚ฌ์šฉํ•˜๋ฉด์„œ ์•ˆ์ •์„ฑ์ด ๊ฒ€์ฆ๋œ๋‹ค๋ฉด, ๋ฝ์ด ํ•„์š”ํ•œ ๊ณณ์— ์ ์ฐจ ์ ์šฉํ•ด ๋‚˜๊ฐˆ ์ƒ๊ฐ์ด๋‹ค.