Spring MVC Stack - WebFlux/WebClient Logging with Reactor Context + MDC
Reactor Context
Reactor Context는 스트림을 따라 전달되는 인터페이스이며, Map과 유사하게 key/value store 구조이다.
Runtime 단계에서 필요한 Context 정보에 엑세스 할 수 있도록 하는 것이다.
즉, 스트림에 Context를 제공할 수 있는 유일한 방법일 뿐만 아니라, 조립/구독 단계를 포함해 전체 런타임 동안 사용 할 수 있는 데이터를 동적으로 제공
전체 생명 주기 중에서 각 Subscriber에게 별도의 컨텍스트가 제공될 수 있는 유일한 단계는 '구독단계'이다
Reactor With MDC
MDC 는 JAVA 로깅 프레임워크(logback, log4j 등) 에서 제공하는 로그 관련 라이브러리에서 여러 메타 정보를 넣을 수 있고 공유되는 Map으로 key, value 형식으로 저장하고 사용 가능하도록 하는 기능이며, 가장 많이 활용하는 것은 바로 요청에 대한 식별을 통해 로그를 쉽게 추출할 수 있습니다.
그러나 대부분의 MDC 동작은 단일 Thread에서만 정상 동작하고, Thread가 바뀌는 경우 일관성 있는 데이터를 전달 할 수 없다.
그렇기 때문에, Weflux를 사용하여 Asynchronous/Nonblock 기반으로 스트림을 처리할 경우, 수십번의 Thread의 변경이 일어 날 수 있다.
그렇기 때문에, Context 변경이 일어날 때마다 MDC를 복사하여, 로깅 추적이 가능하도록 설정해 주면 로그를 통해 쉽게 추적 및 추출이 가능하다.
아래 예제를 통해서, Spring MVC Stack에서 Webflux/Webclient의 로깅 추적을 가능하도록 하는 방법에 대하여 살펴보도록 하자.
Example
- CoreSubscriber 구현
import java.util.Map;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Subscription;
import org.slf4j.MDC;
import reactor.core.CoreSubscriber;
import reactor.util.context.Context;
/** Helper that copies the state of Reactor [Context] to MDC on the #onNext function. */
@Slf4j
@RequiredArgsConstructor
public class MdcContextLifter<T> implements CoreSubscriber<T> {
private final CoreSubscriber<T> coreSubscriber;
@Override
public void onSubscribe(Subscription subscription) {
coreSubscriber.onSubscribe(subscription);
}
@Override
public void onNext(T t) {
copyToMdc(coreSubscriber.currentContext());
coreSubscriber.onNext(t);
}
@Override
public void onError(Throwable throwable) {
coreSubscriber.onError(throwable);
}
@Override
public void onComplete() {
coreSubscriber.onComplete();
}
@Override
public Context currentContext() {
return coreSubscriber.currentContext();
}
/**
* Extension function for the Reactor [Context]. Copies the current context to the MDC, if context
* is empty clears the MDC. State of the MDC after calling this method should be same as Reactor
* [Context] state. One thread-local access only.
*/
void copyToMdc(Context context) {
if (context != null && !context.isEmpty()) {
Map<String, String> map =
context.stream()
.collect(Collectors.toMap(e -> e.getKey().toString(), e -> e.getValue().toString()));
MDC.setContextMap(map);
} else {
MDC.clear();
}
}
}
- Hooks.onEachOperator 셋팅
Hooks.onEachOperator : Publisher Mono/Flux가 생성될때, 해당 Operator 실행
(ref : https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Hooks.html#onEachOperator-java.util.function.Function- )
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.springframework.context.annotation.Configuration;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Operators;
@Configuration
public class MdcContextLifterConfiguration {
public static final String MDC_CONTEXT_REACTOR_KEY =
MdcContextLifterConfiguration.class.getName();
@PostConstruct
@SuppressWarnings("unchecked")
public void contextOperatorHook() {
Hooks.onEachOperator(
MDC_CONTEXT_REACTOR_KEY,
Operators.lift((scannable, subscriber) -> new MdcContextLifter(subscriber)));
}
@PreDestroy
public void cleanupHook() {
Hooks.resetOnEachOperator(MDC_CONTEXT_REACTOR_KEY);
}
}
- Controller
@GetMapping("/logging")
public String logging() {
Flux<Integer> integerFlux =
Flux.range(2017, 2) // 2017, 2018
.publishOn(Schedulers.boundedElastic()) // 다른 워커에서 작업 처리
.log() // 로깅 onNext, onComplete, onError
.subscriberContext(ctx -> ctx.put(LogConstant.TRACING_ID, LoggingUtil.getTraceId())); // Context 전달
integerFlux.subscribe(year -> log.info("year ::: {}", year));
return "OK";
}
- Logging 결과
아래 로그를 살펴보면 Schedulers.boundedElastic()를 통해 다른 워커 쓰레드에서 작업 처리를 하였지만,
boundedElastic-1 에서 로깅을 찍었을 경우에도, trace.id를 통해 확인 가능
* 14:13:30.598 [http-nio-6091-exec-1] [trace.id=20f728e2-7854-4930-af9d-78b4dcadd555] INFO |reactor.Flux.LiftFuseable.1: | onSubscribe([Fuseable] FluxHide.SuppressFuseableSubscriber)
* 14:13:30.599 [http-nio-6091-exec-1] [trace.id=20f728e2-7854-4930-af9d-78b4dcadd555] INFO |reactor.Flux.LiftFuseable.1: | request(unbounded)
* 14:13:30.602 [http-nio-6091-exec-1] [trace.id=20f728e2-7854-4930-af9d-78b4dcadd555] INFO |reactor.Flux.LiftFuseable.1: | onContextUpdate(Context1{trace.id=20f728e2-7854-4930-af9d-78b4dcadd555})
* 14:13:30.604 [boundedElastic-1] [trace.id=] INFO |reactor.Flux.LiftFuseable.1: | onContextUpdate(Context1{trace.id=20f728e2-7854-4930-af9d-78b4dcadd555})
* 14:13:30.604 [http-nio-6091-exec-1] [trace.id=20f728e2-7854-4930-af9d-78b4dcadd555] INFO |reactor.Flux.LiftFuseable.1: | onContextUpdate(Context1{trace.id=20f728e2-7854-4930-af9d-78b4dcadd555})
* 14:13:30.604 [boundedElastic-1] [trace.id=20f728e2-7854-4930-af9d-78b4dcadd555] INFO |reactor.Flux.LiftFuseable.1: | onNext(2017)
* 14:13:30.605 [boundedElastic-1] [trace.id=20f728e2-7854-4930-af9d-78b4dcadd555] INFO |k.c.g.c.a.c.TimeAttackController: year ::: 2017
* 14:13:30.605 [boundedElastic-1] [trace.id=20f728e2-7854-4930-af9d-78b4dcadd555] INFO |reactor.Flux.LiftFuseable.1: | onContextUpdate(Context1{trace.id=20f728e2-7854-4930-af9d-78b4dcadd555})
* 14:13:30.605 [boundedElastic-1] [trace.id=20f728e2-7854-4930-af9d-78b4dcadd555] INFO |reactor.Flux.LiftFuseable.1: | onNext(2018)
* 14:13:30.605 [boundedElastic-1] [trace.id=20f728e2-7854-4930-af9d-78b4dcadd555] INFO |k.c.g.c.a.c.TimeAttackController: year ::: 2018
* 14:13:30.605 [boundedElastic-1] [trace.id=20f728e2-7854-4930-af9d-78b4dcadd555] INFO |reactor.Flux.LiftFuseable.1: | onComplete()
Appendix - Use Spring Reactive Stack
추가적으로 Spring Reactive Stack 을 사용할 경우, WebFilter에 Context를 추가하여, 모든 요청이 들어오면 trace.id 를 context에 담아서 전달하면면 일괄적으로 로깅 추적이 가능하다.
public class LoggingFilter implements WebFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
return chain.filter(exchange).subscriberContext(ReactiveLoggingUtils::injectTraceId);
}
}
마치며
이전 글에서 살펴본 Spring MVC Stack에서의 Spring MVC Logging With AOP + MDC 처리 방법 과
Spring Reactive Stack (WebFlux/WebClient)에서 로깅 추적이 힘든 부분을, Reactor Context + MDC를 활용하여
Spring MVC+Reactive Stack Application에서 로깅 처리 및 추적이 가능하게 되었습니다.
빈약한 글이지만, 궁금증 있으시면 댓글 부탁드립니다:D
#참조
https://www.novatec-gmbh.de/en/blog/how-can-the-mdc-context-be-used-in-the-reactive-spring-applications/
https://woowabros.github.io/experience/2020/02/19/introduce-shop-display.html