๐ SUSU ํ๋ก์ ํธ
YAPP 23๊ธฐ์์ ์งํํ ๊ฒฝ์กฐ์ฌ๋น ๊ด๋ฆฌ ์๋น์ค์ด๋ค.
์์ธํ ๋ด์ฉ์ ์๋ ๋งํฌ๋ฅผ ์ด์ฉํ๋ฉด ๋๋ค.
๊ฒฐํผ์, ๋์์น, ์ฅ๋ก์, ์์ผ ๊ฐ์ ๊ฒฝ์กฐ์ฌ๋ฅผ ์ฑ๊ธด ์ ์๋์?
์ฃผ๊ณ ๋ฐ์ ์์คํ ๋ง์๋ค, ์์์ ํจ๊ป ๊ฒฝ์กฐ์ฌ๋น๋ฅผ ๋๋ํ๊ฒ ๊ด๋ฆฌํด์!
Disquiet*
IT ์๋น์ค ๋ฉ์ด์ปค๋ค์ ์์ ๋คํธ์ํฌ. ๋์ค์ฝฐ์ด์์์ ์๋ก์ ํ๋ก์ ํธ๋ฅผ ๊ณต์ ํด ๋ณด์ธ์!
disquiet.io
๋ง๊ด๋ถ ใ ใ
๐ ์ Coroutine์ ์ผ์๊น
๐ต Coroutine ์ด๋
์์ธํ ์ค๋ช ์ ์ด ๊ธ์์ ์ ํ ๊ฒ์ด๋ค.
๊ฐ๋จํ ์๊ฐ๋ง ํด๋ณด๊ฒ ๋ค.
์ํค๋ฐฑ๊ณผ์๋ ์๋์ ๊ฐ์ด ์ฐ์ฌ์๋ค.
Coroutines are computer program components that allow execution to be suspended and resumed, generalizing subroutines for cooperative multitasking.
์ค์ง, ์ฌ๊ฐ๋ฅผ ์ง์ํ๋ ์ปดํจํฐ ํ๋ก๊ทธ๋จ ๊ตฌ์ฑ์์
ํ๋ ฅ์ ๋ฉํฐํ ์คํน์ ์ํ ์ผ๋ฐํ๋ ์๋ธ ๋ฃจํด
์ด๋ฅผ ๋ง๋ ๋ชฉํ๋ ์ด๋ฌํ๋ค
- No dependency on a particular implementation of Futures or other such rich library;
- Cover equally the "async/await" use case and "generator blocks";
- Make it possible to utilize Kotlin coroutines as wrappers for different existing asynchronous APIs (such as Java NIO, different implementations of Futures, etc).
๋น๋๊ธฐ ์ฒ๋ฆฌ๋ฅผ Coroutine์ ํตํด ์ฒ๋ฆฌํ ์ ์๋๋ก ํ๋ ๊ฒ ์ด๊ฒ์ ๋ชฉํ์ด๋ค.
์ฌ๋๋ค์ ํํ Coroutine์ ๊ฒฝ๋ ์ฐ๋ ๋๋ผ๊ณ ํ๋ค.
์ด๋ ์๋์ ๊ฐ์ ํน์ง ๋๋ฌธ์ด๋ค.
- context-switching ๋น์ฉ์ด ์ ๋ค
- suspend๋ฅผ ์ด์ฉํ ๋ฃจํด์ ์ค์ง์ ์ฌ๊ฐ๋ฅผ ์ง์ํ๋ค.
- ์ ํ๋ฆฌ์ผ์ด์ ์์ค์์ ์ค์ผ์ค๋ง์ ๊ด๋ฆฌํ๋ค.
์ ๋ฆฌํด๋ณด์๋ฉด ์ฝ๋ฃจํด์ ์ด๋ฌํ๋ค.
- ์คํ์ ์ง์ฐ๊ณผ ์ฌ๊ฐ๋ฅผ ํ์ฉํจ์ผ๋ก์จ, ๋น์ ์ ์ ๋ฉํฐํ์คํน์ ์ํ ์๋ธ ๋ฃจํด์ ์ผ๋ฐํํ ์ปดํจํฐ ํ๋ก๊ทธ๋จ ๊ตฌ์ฑ์์
- ๋น๋๊ธฐ + non-blocking ์ง์ํด ์ฃผ๋ ๊ฒฝ๋ ์ค๋ ๋
๐ต ์์์ ๊ธฐ์ ์คํ
์์๋ Kotlin, Spring์ ์ด์ฉํ๋ค.
Spring์ Reactive Stack์ผ๋ก ๊ตฌ์ฑํ๋ค.
ํ์ง๋ง R2DBC๋ ์ฌ์ฉํ์ง ์์๋ค.
๊ฐ๋ฐ ์์ฐ์ฑ์ ๋์ด๊ธฐ ์ํด JPA๋ฅผ ์ฌ์ฉํ๋ค.
์ด๋ฅผ ์ํด ๋น๋๊ธฐ ์ฒ๋ฆฌ๋ฅผ ํด์ผ ํ๊ณ , ์ด๋ฅผ Coroutine์ ์ด์ฉํด ์ฒ๋ฆฌํ๋ค.
๐ต Webflux์ Blocking I/O
Webflux๋ Netty๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ์๋ํ๋ค.
Netty๋ ๋ด๋ถ์ ์ผ๋ก ์์์ EventLoop๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ์๋ํ๋ค.
EventLoop๋ฅผ ์ด์ฉํ์ฌ ์ด๋ฒคํธ ํ์ ์์ธ ์์ ์ ์์ฐจ์ ์ผ๋ก ์ฒ๋ฆฌํ๋ ๋ฐฉ์์ด๋ค.
์ด ์์คํ ์ Blocking ๋ก์ง์ด ์ฃผ์ด์ง๋ฉด ์ด๋ป๊ฒ ๋ ๊น?
Reactor MeltDown์ด ๋ฐ์ํ๋ค.
์ด๋ ์์ ์์ ์ด ๋๋์ง ์๊ณ EventLoop๋ฅผ ์ ์ ํด ๋ฒ๋ฆฐ๋ค๋ฉด, ํ์ ์์ธ ์์ ์ ๋๊ธฐํ๋ ์ํฉ์ด ๋๋ค.
์ด๋ ๊ณง ํ๋ก๊ทธ๋จ์ ์ ์ฒด์ ์ธ ์ฑ๋ฅ ์ ํ๋ก ์ด์ด์ง๋ค.
๊ทธ๋ฌ๋ฏ๋ก Blocking ๋ก์ง์ด EventLoop์์ ๋์ํ๋ ์ํฉ์ ๋ง๋ค๋ฉด ์ ๋๋ค.
์ค์ ๋ก ๊ณต์ ๋ฌธ์์๋ ์ด๋ ๊ฒ ์ ํ์๋ค.
Blocking API๋ ๋์์ฑ ๋ชจ๋ธ์ ์ข์ง ์๋ค.
์ฒ๋ฆฌํ ๊ฑฐ๋ฉด ๋ค๋ฅธ ์ค๋ ๋์์ ์ฒ๋ฆฌํด๋ผ.
์ด๊ฒ ๊ณต์ ๋ฌธ์๊ฐ ๊ถํ๋ Webflux์์ Blocking ๋ก์ง์ ์ฒ๋ฆฌํ๋ ๋ฒ์ด๋ค.
SUSU ์๋น์ค์์๋ Blocking ๋ก์ง์ด ์กด์ฌํ๋ค.
JPA๊ฐ ์ด์ ํด๋นํ๋ค.
JPA๊ฐ JDBC ๊ธฐ๋ฐ์ผ๋ก ์๋ํด์ Blocking ํ๊ฒ ์๋ํ๋ค.
๊ทธ๋์ ์ด ๋ถ๋ถ์ ๋น๋๊ธฐ์ฒ๋ฆฌ ํด์ผ ํ๊ณ , ์ด๋ฅผ ์ฝ๋ฃจํด์ ์ด์ฉํด ์ฒ๋ฆฌํ๋ค.
๐ต Webflux, Reactor์ Coroutine
Webflux๋ Project Reactor๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ๊ตฌํ๋์๋ค.
๊ทธ๋์ ๋ด๋ถ ๊ตฌ์กฐ๋ฅผ ์ดํด๋ณด๋ฉด Mono, Flux๋ฅผ ์ด์ฉํ๋ ๊ฒ์ ๋ณผ ์ ์๋ค.
Coroutine์ ์ด์ฉํ์ง ์๊ณ ๊ตฌํํ๋ค๋ฉด, Project Reactor๋ฅผ ์ด์ฉํด ๊ตฌํํ๋ฉด ๋๋ค.
๊ทธ๋ ๋ค๋ฉด Webflux + Reactor์ Coroutine์ ์ด๋ป๊ฒ ๊ณต์กดํ ์ ์๋๊ฐ?
์ด๋ Webflux์ RequestMappingHandlerAdapter handle(...)์ํ์ธํด ๋ณด๋ฉด ์ ์ ์๋ค.
@Override
public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
... // ์๋ต
InvocableHandlerMethod invocableMethod = this.methodResolver.getRequestMappingMethod(handlerMethod);
... // ์๋ต
Mono<HandlerResult> resultMono = this.modelInitializer
.initModel(handlerMethod, bindingContext, exchange)
.then(Mono.defer(() -> invocableMethod.invoke(exchange, bindingContext)))
.doOnNext(result -> result.setExceptionHandler(exceptionHandler))
.onErrorResume(ex -> exceptionHandler.handleError(exchange, ex));
... // ์๋ต
}
์์ฒญ์ ์๋ง์ Method๋ฅผ ์ฐพ์์ ์คํํ๋ค.
ํด๋น invoke๋ฅผ ๋ฐ๋ผ ๋ค์ด๊ฐ๋ค ๋ณด๋ฉด, InvocableHandlerMethod์ invokeFunction(...)์ผ๋ก ์ด์ด์ง๋ค.
@Nullable
public static Object invokeFunction(Method method, Object target, Object[] args, boolean isSuspendingFunction, ServerWebExchange exchange) throws InvocationTargetException, IllegalAccessException {
if (isSuspendingFunction) {
Object coroutineContext = exchange.getAttribute("org.springframework.web.server.CoWebFilter.context");
return coroutineContext == null ? CoroutinesUtils.invokeSuspendingFunction(method, target, args) : CoroutinesUtils.invokeSuspendingFunction((CoroutineContext)coroutineContext, method, target, args);
} else {
KFunction<?> function = ReflectJvmMapping.getKotlinFunction(method);
if (function == null) {
return method.invoke(target, args);
} else {
... // ์๋ต
}
}
}
์ด ํจ์์์ suspend ํจ์๋ ์๋๋ฉด ์ผ๋ฐ ํจ์๋์ ๋ฐ๋ผ์ ๋ถ๊ธฐ๊ฐ ๋ฐ์ํ๋ค.
๋ง์ฝ suspend function์์ ๊ฐ์ ํ๊ณ ์งํํ๋ค๋ฉด, CoroutinesUtils์ invokeSuspendingFunction(...)๋ฅผ ๋ง์ฃผํ๊ฒ ๋๋ค.
/**
* Invoke a suspending function and converts it to {@link Mono} or
* {@link Flux}.
* @param context the coroutine context to use
* @param method the suspending function to invoke
* @param target the target to invoke {@code method} on
* @param args the function arguments. If the {@code Continuation} argument is specified as the last argument
* (typically {@code null}), it is ignored.
* @return the method invocation result as reactive stream
* @throws IllegalArgumentException if {@code method} is not a suspending function
* @since 6.0
*/
@SuppressWarnings("deprecation")
public static Publisher<?> invokeSuspendingFunction(CoroutineContext context, Method method, Object target,
Object... args) {
Assert.isTrue(KotlinDetector.isSuspendingFunction(method), "'method' must be a suspending function");
KFunction<?> function = Objects.requireNonNull(ReflectJvmMapping.getKotlinFunction(method));
if (method.isAccessible() && !KCallablesJvm.isAccessible(function)) {
KCallablesJvm.setAccessible(function, true);
}
Mono<Object> mono = MonoKt.mono(context, (scope, continuation) ->
KCallables.callSuspend(function, getSuspendedFunctionArgs(method, target, args), continuation))
.filter(result -> !Objects.equals(result, Unit.INSTANCE))
.onErrorMap(InvocationTargetException.class, InvocationTargetException::getTargetException);
... // ์๋ต
return mono;
}
์ค๋ช ์๋ ๋์์๋ฏ์ด, suspend function์ ์คํ์ํค๊ณ Mono, Flux๋ก ๋ณํํ์ฌ ๋ฐํํ๋ค.
Mono๋ฅผ Coroutine์ ๋ง์ถฐ ๋ณํํ ๋ค์ suspend function์ ์คํํ๋ ๊ฒ์ด๋ค.
์ด์ ๋๋ถ์ด Reactor Context๋ฅผ Coroutine Context๋ก ๋ณํํด ์ฃผ๋ ์์ ๋ ํด์ค๋ค.
public fun <T> mono(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.() -> T?
): Mono<T> {
require(context[Job] === null) { "Mono context cannot contain job in it." +
"Its lifecycle should be managed via Disposable handle. Had $context" }
return monoInternal(GlobalScope, context, block)
}
private fun <T> monoInternal(
scope: CoroutineScope, // support for legacy mono in scope
context: CoroutineContext,
block: suspend CoroutineScope.() -> T?
): Mono<T> = Mono.create { sink ->
val reactorContext = context.extendReactorContext(sink.currentContext())
val newContext = scope.newCoroutineContext(context + reactorContext)
val coroutine = MonoCoroutine(newContext, sink)
sink.onDispose(coroutine)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}
internal fun CoroutineContext.extendReactorContext(extensions: ContextView): CoroutineContext =
(this[ReactorContext]?.context?.putAll(extensions) ?: extensions).asCoroutineContext()
์ด๋ฅผ ํตํด Reactor + Webflux์ Coroutine์ ๊ณต์กดํ ์ ์๊ฒ ๋๋ค.
๐ ์ด๋์ ์ด๋ป๊ฒ Coroutine์ ์ผ์๊น
SUSU ํ๋ก์ ํธ์ ์ฝ๋ฃจํด์ด ์ฒ๋ฆฌ๋ ๊ณณ์ ํฌ๊ฒ 3๊ฐ์ด๋ค.
- Blocking I/O ์ฒ๋ฆฌ
- ๋น๋๊ธฐ ์ฒ๋ฆฌ
- ์ด๋ฒคํธ ๋น๋๊ธฐ ์ฒ๋ฆฌ
๐ต Blocking I/O ์ฒ๋ฆฌ
Coroutine์ ์ฌ๋ฌ Dispatcher๋ฅผ ์ง์ํด ์ค๋ค.
Dispatcher๋ฅผ ๋ณ๊ฒฝํ์ฌ ์ฝ๋ฃจํด์ ์คํํ๊ณ ์ถ์ ๋, withContext๋ฅผ ์ฌ์ฉํ๋ค.
SUSU์์๋ MDC ๊ฐ์ ์์ ์ฝ๋ฃจํด์ ์ ํํ๊ธฐ ์ํด ์๋์ ๊ฐ์ ํจ์๋ฅผ ๋ง๋ค์ด์ ์ฌ์ฉํ๋ค.
suspend fun <T> withMDCContext(
context: CoroutineContext = Dispatchers.IO,
block: suspend () -> T,
): T {
val contextMap = MDC.getCopyOfContextMap() ?: emptyMap()
return withContext(context + MDCContext(contextMap)) { block() }
}
์ด ํจ์๋ ์๋์ ๊ฐ์ด ์ฌ์ฉ๋์๋ค.
suspend fun findByIdOrNull(uid: Long): User? {
return withMDCContext(Dispatchers.IO) { userRepository.findByIdOrNull(uid) }
}
IO ์์ ์ ์ต์ ํ๋ Dispatcher.IO๋ฅผ ํตํด IO ์์ ์ ์ฒ๋ฆฌํจ์ผ๋ก์จ ๋ ํจ์จ์ ์ธ ๋น๋๊ธฐ ์ฒ๋ฆฌ๋ฅผ ํ๊ธฐ ์ํด ๋ ธ๋ ฅํ๋ค
๐ต ๋น๋๊ธฐ ์ฒ๋ฆฌ
์๋น์ค์ latency๋ฅผ ์ค์ด๊ธฐ ์ํด ๋ค์์ ์์ ์ ๋น๋๊ธฐ๋ก ์ฒ๋ฆฌํ๋ค.
๋ํ์ ์ผ๋ก ์๋์ ๊ฐ๋ค.
val userEnvelopeStatistic = parZip(
{ envelopeStatisticService.getRecentSpentFor1Year(user.uid) },
{ envelopeStatisticService.getMostFrequentRelationship(user.uid) },
{ envelopeStatisticService.getMostFrequentCategory(user.uid) },
{ envelopeStatisticService.getMaxReceivedEnvelope(user.uid) },
{ envelopeStatisticService.getMaxSentEnvelope(user.uid) }
) {
... // ์๋ต
}
ํ ๊ฐ์ ๊ตฌํ๊ธฐ ์ํด 5๊ฐ์ ์์ ์ด ๋น๋๊ธฐ๋ก ์ฒ๋ฆฌ๋๊ณ ๊ฐ์ด ๋ฐํ๋๋๋ก ๊ตฌ์ฑํ๋ค.
๋ค์์ ์ฝ๋ฃจํด์ ์์ฑํ์ฌ ๋น๋๊ธฐ ์ฒ๋ฆฌํ ํ, ๊ฐ์ด ๋ชจ๋ ๋ฐํ๋ ๊ฒฝ์ฐ ์ดํ ๋ก์ง์ ์ฒ๋ฆฌํ๋๋ก ํ๋ค.
์์์ ์ฌ์ฉํ parzip์ ๊ฒฝ์ฐ ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ฅผ ์ด์ฉํ ๊ฒ์ด๋ค
defered์ awaitall์ ์ด์ฉํด๋ ๋์ง๋ง, ์ด๋ฅผ ์ด์ฉํ๋ฉด ๋ ํธํ๋ค.
์ฌ์ฉ์ ์ถ์ฒํ๋ค.
High-level concurrency | Arrow
Coroutines are one of the
arrow-kt.io
๐ต ์ด๋ฒคํธ ์ฒ๋ฆฌ
์ด๋ฒคํธ ์ฒ๋ฆฌ ํจ์์ ๊ฒฝ์ฐ, suspend function์ผ๋ก ๋ณํ์ด ๋ถ๊ฐ๋ฅํ๋ค.
suspend function์ด ๋ด๋ถ์ ์ผ๋ก ๋ณํ๋๋ฉฐ, Continuation ๊ฐ์ฒด๊ฐ ํ๋ผ๋ฏธํฐ์ ๋ค์ด๊ฐ๊ฒ ๋๋ฉด, ์๋ฌ๊ฐ ๋ฐ์ํ๋ค.
๊ทธ๋ฌ๋ฏ๋ก ์ด๋ฒคํธ ์ฒ๋ฆฌ๋ฅผ ๋น๋๊ธฐ์ ์ผ๋ก ์งํํ๋ ค๋ฉด CoroutineScope๋ฅผ ์ด์ฉํ์ฌ ์๋ก์ด ์ฝ๋ฃจํด ์ค์ฝํ๋ฅผ ๋ง๋ค์ด ์ฌ์ฉํด์ผ ํ๋ค.
CoroutineScope(...)๋ก ์ฌ์ฉํด๋ ๋์ง๋ง, ์ด ๋ํ MDC ๊ด๋ จ ์ด์๋ก ๋ฐ๋ก ๋ง๋ค์ด์ ์ฌ์ฉํ๋ค.
fun mdcCoroutineScope(context: CoroutineContext, traceId: String): CoroutineScope {
val contextMap = MDC.getCopyOfContextMap() ?: emptyMap()
contextMap.plus("traceId" to traceId)
return CoroutineScope(context + MDCContext(contextMap))
}
์ด ํจ์๋ ์๋์ ๊ฐ์ด ์ฌ์ฉ๋์๋ค.
@Component
class SystemActionLogEventListener(
private val systemActionLogService: SystemActionLogService,
) {
@EventListener
fun subscribe(event: SystemActionLogEvent) {
mdcCoroutineScope(Dispatchers.IO + Job(), event.traceId).launch {
SystemActionLog(
ipAddress = event.ipAddress,
path = event.path,
httpMethod = event.method,
userAgent = event.userAgent,
host = event.host,
referer = event.referer,
extra = event.extra
).run { systemActionLogService.record(this) }
}
}
}
์ด๋ฒคํธ ์ฒ๋ฆฌ์ ๊ฒฝ์ฐ ๋ค๋ฅธ ์ค๋ ๋์์ ์ฒ๋ฆฌ๋ ์ ์๋ค๋ ์ ์ ๊ฐ์ํ์ฌ, BaseEvent์ MDC traceId๊ฐ ๊ธฐ๋ณธ์ ์ผ๋ก ๋ด๊ธฐ๊ฒ ๊ตฌ์ฑํ๋ค.
์ด์ ์ด๋ฒคํธ ์ฒ๋ฆฌ๋ฅผ ์ด์ฉํ๋๋ผ๋, ์ด๋ค ์์ฒญ์ ์ด๋ฒคํธ์ธ์ง traceId๋ฅผ ํตํด ์ ์ ์๋๋ก ๊ตฌํํ๋ค.
์์ธํ ์ฝ๋๋ ์๋์์ ํ์ธํ๋ฉด ๋๋ค
GitHub - ok-su-su/oksusu-susu-api: super ๊ทน๋ฝ Server
super ๊ทน๋ฝ Server. Contribute to ok-su-su/oksusu-susu-api development by creating an account on GitHub.
github.com
'Backend' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
<Spring> Slack ๋ฉ์ธ์ง ์ ์ก ์๋ฌ ์ฒ๋ฆฌ (0) | 2024.08.04 |
---|---|
<Spring> Webflux + Coroutine vs MVC (0) | 2024.06.12 |
<Spring> Webflux + Coroutine + MDC (0) | 2024.05.10 |
<Spring> WARN ๋ ๋ฒจ ์ด์ ๋ก๊ทธ Slack ์๋ฆผ ๋ณด๋ด๊ธฐ (0) | 2024.03.17 |
<Spring> Spring Bean, IoC/DI ์ ๋ฆฌ 2 (0) | 2023.10.03 |