Spring

Spring MVC Stack - WebFlux/WebClient Logging with Reactor Context + MDC

Lucas-Kim 2021. 5. 21. 15:42

Reactor Context

Reactor Context는 스트림을 따라 전달되는 인터페이스이며, Map과 유사하게 key/value store 구조이다.

Runtime 단계에서 필요한 Context 정보에 엑세스 할 수 있도록 하는 것이다.

즉, 스트림에 Context를 제공할 수 있는 유일한 방법일 뿐만 아니라, 조립/구독 단계를 포함해 전체 런타임 동안 사용 할 수 있는 데이터를 동적으로 제공

전체 생명 주기 중에서 각 Subscriber에게 별도의 컨텍스트가 제공될 수 있는 유일한 단계는 '구독단계'이다

Reactor With MDC

MDC 는 JAVA 로깅 프레임워크(logback, log4j 등) 에서 제공하는 로그 관련 라이브러리에서 여러 메타 정보를 넣을 수 있고 공유되는 Map으로 key, value 형식으로 저장하고 사용 가능하도록 하는 기능이며, 가장 많이 활용하는 것은 바로 요청에 대한 식별을 통해 로그를 쉽게 추출할 수 있습니다.

그러나 대부분의 MDC 동작은 단일 Thread에서만 정상 동작하고, Thread가 바뀌는 경우 일관성 있는 데이터를 전달 할 수 없다.

그렇기 때문에, Weflux를 사용하여 Asynchronous/Nonblock 기반으로 스트림을 처리할 경우, 수십번의 Thread의 변경이 일어 날 수 있다.

 

그렇기 때문에, Context 변경이 일어날 때마다 MDC를 복사하여, 로깅 추적이 가능하도록 설정해 주면 로그를 통해 쉽게 추적 및 추출이 가능하다.

 

아래 예제를 통해서, Spring MVC Stack에서 Webflux/Webclient의 로깅 추적을 가능하도록 하는 방법에 대하여 살펴보도록 하자.

Example

  • CoreSubscriber 구현
import java.util.Map;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Subscription;
import org.slf4j.MDC;
import reactor.core.CoreSubscriber;
import reactor.util.context.Context;

/** Helper that copies the state of Reactor [Context] to MDC on the #onNext function. */
@Slf4j
@RequiredArgsConstructor
public class MdcContextLifter<T> implements CoreSubscriber<T> {

  private final CoreSubscriber<T> coreSubscriber;

  @Override
  public void onSubscribe(Subscription subscription) {
    coreSubscriber.onSubscribe(subscription);
  }

  @Override
  public void onNext(T t) {
    copyToMdc(coreSubscriber.currentContext());
    coreSubscriber.onNext(t);
  }

  @Override
  public void onError(Throwable throwable) {
    coreSubscriber.onError(throwable);
  }

  @Override
  public void onComplete() {
    coreSubscriber.onComplete();
  }

  @Override
  public Context currentContext() {
    return coreSubscriber.currentContext();
  }

  /**
   * Extension function for the Reactor [Context]. Copies the current context to the MDC, if context
   * is empty clears the MDC. State of the MDC after calling this method should be same as Reactor
   * [Context] state. One thread-local access only.
   */
  void copyToMdc(Context context) {
    if (context != null && !context.isEmpty()) {
      Map<String, String> map =
          context.stream()
              .collect(Collectors.toMap(e -> e.getKey().toString(), e -> e.getValue().toString()));

      MDC.setContextMap(map);

    } else {
      MDC.clear();
    }
  }
}

 

 

  • Hooks.onEachOperator 셋팅
Hooks.onEachOperator : Publisher Mono/Flux가 생성될때, 해당 Operator 실행  
(ref :  https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Hooks.html#onEachOperator-java.util.function.Function- )
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.springframework.context.annotation.Configuration;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Operators;

@Configuration
public class MdcContextLifterConfiguration {

  public static final String MDC_CONTEXT_REACTOR_KEY =
      MdcContextLifterConfiguration.class.getName();

  @PostConstruct
  @SuppressWarnings("unchecked")
  public void contextOperatorHook() {
    Hooks.onEachOperator(
        MDC_CONTEXT_REACTOR_KEY,
        Operators.lift((scannable, subscriber) -> new MdcContextLifter(subscriber)));
  }

  @PreDestroy
  public void cleanupHook() {
    Hooks.resetOnEachOperator(MDC_CONTEXT_REACTOR_KEY);
  }
}

 

  • Controller
  @GetMapping("/logging")
  public String logging() {
    Flux<Integer> integerFlux =
        Flux.range(2017, 2) // 2017, 2018
            .publishOn(Schedulers.boundedElastic()) // 다른 워커에서 작업 처리
            .log()  // 로깅 onNext, onComplete, onError
            .subscriberContext(ctx -> ctx.put(LogConstant.TRACING_ID, LoggingUtil.getTraceId())); // Context 전달

    integerFlux.subscribe(year -> log.info("year ::: {}", year));
    return "OK";
  }

 

  • Logging 결과

아래 로그를 살펴보면 Schedulers.boundedElastic()를 통해 다른 워커 쓰레드에서 작업 처리를 하였지만,

boundedElastic-1 에서 로깅을 찍었을 경우에도, trace.id를 통해 확인 가능

* 14:13:30.598 [http-nio-6091-exec-1] [trace.id=20f728e2-7854-4930-af9d-78b4dcadd555] INFO  |reactor.Flux.LiftFuseable.1: | onSubscribe([Fuseable] FluxHide.SuppressFuseableSubscriber)
* 14:13:30.599 [http-nio-6091-exec-1] [trace.id=20f728e2-7854-4930-af9d-78b4dcadd555] INFO  |reactor.Flux.LiftFuseable.1: | request(unbounded)
* 14:13:30.602 [http-nio-6091-exec-1] [trace.id=20f728e2-7854-4930-af9d-78b4dcadd555] INFO  |reactor.Flux.LiftFuseable.1: | onContextUpdate(Context1{trace.id=20f728e2-7854-4930-af9d-78b4dcadd555})
* 14:13:30.604 [boundedElastic-1] [trace.id=] INFO  |reactor.Flux.LiftFuseable.1: | onContextUpdate(Context1{trace.id=20f728e2-7854-4930-af9d-78b4dcadd555})
* 14:13:30.604 [http-nio-6091-exec-1] [trace.id=20f728e2-7854-4930-af9d-78b4dcadd555] INFO  |reactor.Flux.LiftFuseable.1: | onContextUpdate(Context1{trace.id=20f728e2-7854-4930-af9d-78b4dcadd555})
* 14:13:30.604 [boundedElastic-1] [trace.id=20f728e2-7854-4930-af9d-78b4dcadd555] INFO  |reactor.Flux.LiftFuseable.1: | onNext(2017)
* 14:13:30.605 [boundedElastic-1] [trace.id=20f728e2-7854-4930-af9d-78b4dcadd555] INFO  |k.c.g.c.a.c.TimeAttackController: year ::: 2017
* 14:13:30.605 [boundedElastic-1] [trace.id=20f728e2-7854-4930-af9d-78b4dcadd555] INFO  |reactor.Flux.LiftFuseable.1: | onContextUpdate(Context1{trace.id=20f728e2-7854-4930-af9d-78b4dcadd555})
* 14:13:30.605 [boundedElastic-1] [trace.id=20f728e2-7854-4930-af9d-78b4dcadd555] INFO  |reactor.Flux.LiftFuseable.1: | onNext(2018)
* 14:13:30.605 [boundedElastic-1] [trace.id=20f728e2-7854-4930-af9d-78b4dcadd555] INFO  |k.c.g.c.a.c.TimeAttackController: year ::: 2018
* 14:13:30.605 [boundedElastic-1] [trace.id=20f728e2-7854-4930-af9d-78b4dcadd555] INFO  |reactor.Flux.LiftFuseable.1: | onComplete()

 

Appendix - Use Spring Reactive Stack

추가적으로 Spring Reactive Stack 을 사용할 경우, WebFilter에 Context를 추가하여, 모든 요청이 들어오면 trace.id 를 context에 담아서 전달하면면 일괄적으로 로깅 추적이 가능하다.

public class LoggingFilter implements WebFilter {
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        return chain.filter(exchange).subscriberContext(ReactiveLoggingUtils::injectTraceId);
    }
}

 

마치며

이전 글에서 살펴본 Spring MVC Stack에서의 Spring MVC Logging With  AOP + MDC 처리 방법 과

Spring Reactive Stack (WebFlux/WebClient)에서 로깅 추적이 힘든 부분을, Reactor Context + MDC를 활용하여

Spring MVC+Reactive Stack Application에서 로깅 처리 및 추적이 가능하게 되었습니다.

 

빈약한 글이지만, 궁금증 있으시면 댓글 부탁드립니다:D

 

 

#참조

https://www.novatec-gmbh.de/en/blog/how-can-the-mdc-context-be-used-in-the-reactive-spring-applications/
https://woowabros.github.io/experience/2020/02/19/introduce-shop-display.html