ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Spring MVC Stack - WebFlux/WebClient Logging with Reactor Context + MDC
    Spring 2021. 5. 21. 15:42

    Reactor Context

    Reactor Context는 스트림을 따라 전달되는 인터페이스이며, Map과 유사하게 key/value store 구조이다.

    Runtime 단계에서 필요한 Context 정보에 엑세스 할 수 있도록 하는 것이다.

    즉, 스트림에 Context를 제공할 수 있는 유일한 방법일 뿐만 아니라, 조립/구독 단계를 포함해 전체 런타임 동안 사용 할 수 있는 데이터를 동적으로 제공

    전체 생명 주기 중에서 각 Subscriber에게 별도의 컨텍스트가 제공될 수 있는 유일한 단계는 '구독단계'이다

    Reactor With MDC

    MDC 는 JAVA 로깅 프레임워크(logback, log4j 등) 에서 제공하는 로그 관련 라이브러리에서 여러 메타 정보를 넣을 수 있고 공유되는 Map으로 key, value 형식으로 저장하고 사용 가능하도록 하는 기능이며, 가장 많이 활용하는 것은 바로 요청에 대한 식별을 통해 로그를 쉽게 추출할 수 있습니다.

    그러나 대부분의 MDC 동작은 단일 Thread에서만 정상 동작하고, Thread가 바뀌는 경우 일관성 있는 데이터를 전달 할 수 없다.

    그렇기 때문에, Weflux를 사용하여 Asynchronous/Nonblock 기반으로 스트림을 처리할 경우, 수십번의 Thread의 변경이 일어 날 수 있다.

     

    그렇기 때문에, Context 변경이 일어날 때마다 MDC를 복사하여, 로깅 추적이 가능하도록 설정해 주면 로그를 통해 쉽게 추적 및 추출이 가능하다.

     

    아래 예제를 통해서, Spring MVC Stack에서 Webflux/Webclient의 로깅 추적을 가능하도록 하는 방법에 대하여 살펴보도록 하자.

    Example

    • CoreSubscriber 구현
    import java.util.Map;
    import java.util.stream.Collectors;
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.reactivestreams.Subscription;
    import org.slf4j.MDC;
    import reactor.core.CoreSubscriber;
    import reactor.util.context.Context;
    
    /** Helper that copies the state of Reactor [Context] to MDC on the #onNext function. */
    @Slf4j
    @RequiredArgsConstructor
    public class MdcContextLifter<T> implements CoreSubscriber<T> {
    
      private final CoreSubscriber<T> coreSubscriber;
    
      @Override
      public void onSubscribe(Subscription subscription) {
        coreSubscriber.onSubscribe(subscription);
      }
    
      @Override
      public void onNext(T t) {
        copyToMdc(coreSubscriber.currentContext());
        coreSubscriber.onNext(t);
      }
    
      @Override
      public void onError(Throwable throwable) {
        coreSubscriber.onError(throwable);
      }
    
      @Override
      public void onComplete() {
        coreSubscriber.onComplete();
      }
    
      @Override
      public Context currentContext() {
        return coreSubscriber.currentContext();
      }
    
      /**
       * Extension function for the Reactor [Context]. Copies the current context to the MDC, if context
       * is empty clears the MDC. State of the MDC after calling this method should be same as Reactor
       * [Context] state. One thread-local access only.
       */
      void copyToMdc(Context context) {
        if (context != null && !context.isEmpty()) {
          Map<String, String> map =
              context.stream()
                  .collect(Collectors.toMap(e -> e.getKey().toString(), e -> e.getValue().toString()));
    
          MDC.setContextMap(map);
    
        } else {
          MDC.clear();
        }
      }
    }

     

     

    • Hooks.onEachOperator 셋팅
    Hooks.onEachOperator : Publisher Mono/Flux가 생성될때, 해당 Operator 실행  
    (ref :  https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Hooks.html#onEachOperator-java.util.function.Function- )
    import javax.annotation.PostConstruct;
    import javax.annotation.PreDestroy;
    import org.springframework.context.annotation.Configuration;
    import reactor.core.publisher.Hooks;
    import reactor.core.publisher.Operators;
    
    @Configuration
    public class MdcContextLifterConfiguration {
    
      public static final String MDC_CONTEXT_REACTOR_KEY =
          MdcContextLifterConfiguration.class.getName();
    
      @PostConstruct
      @SuppressWarnings("unchecked")
      public void contextOperatorHook() {
        Hooks.onEachOperator(
            MDC_CONTEXT_REACTOR_KEY,
            Operators.lift((scannable, subscriber) -> new MdcContextLifter(subscriber)));
      }
    
      @PreDestroy
      public void cleanupHook() {
        Hooks.resetOnEachOperator(MDC_CONTEXT_REACTOR_KEY);
      }
    }

     

    • Controller
      @GetMapping("/logging")
      public String logging() {
        Flux<Integer> integerFlux =
            Flux.range(2017, 2) // 2017, 2018
                .publishOn(Schedulers.boundedElastic()) // 다른 워커에서 작업 처리
                .log()  // 로깅 onNext, onComplete, onError
                .subscriberContext(ctx -> ctx.put(LogConstant.TRACING_ID, LoggingUtil.getTraceId())); // Context 전달
    
        integerFlux.subscribe(year -> log.info("year ::: {}", year));
        return "OK";
      }

     

    • Logging 결과

    아래 로그를 살펴보면 Schedulers.boundedElastic()를 통해 다른 워커 쓰레드에서 작업 처리를 하였지만,

    boundedElastic-1 에서 로깅을 찍었을 경우에도, trace.id를 통해 확인 가능

    * 14:13:30.598 [http-nio-6091-exec-1] [trace.id=20f728e2-7854-4930-af9d-78b4dcadd555] INFO  |reactor.Flux.LiftFuseable.1: | onSubscribe([Fuseable] FluxHide.SuppressFuseableSubscriber)
    * 14:13:30.599 [http-nio-6091-exec-1] [trace.id=20f728e2-7854-4930-af9d-78b4dcadd555] INFO  |reactor.Flux.LiftFuseable.1: | request(unbounded)
    * 14:13:30.602 [http-nio-6091-exec-1] [trace.id=20f728e2-7854-4930-af9d-78b4dcadd555] INFO  |reactor.Flux.LiftFuseable.1: | onContextUpdate(Context1{trace.id=20f728e2-7854-4930-af9d-78b4dcadd555})
    * 14:13:30.604 [boundedElastic-1] [trace.id=] INFO  |reactor.Flux.LiftFuseable.1: | onContextUpdate(Context1{trace.id=20f728e2-7854-4930-af9d-78b4dcadd555})
    * 14:13:30.604 [http-nio-6091-exec-1] [trace.id=20f728e2-7854-4930-af9d-78b4dcadd555] INFO  |reactor.Flux.LiftFuseable.1: | onContextUpdate(Context1{trace.id=20f728e2-7854-4930-af9d-78b4dcadd555})
    * 14:13:30.604 [boundedElastic-1] [trace.id=20f728e2-7854-4930-af9d-78b4dcadd555] INFO  |reactor.Flux.LiftFuseable.1: | onNext(2017)
    * 14:13:30.605 [boundedElastic-1] [trace.id=20f728e2-7854-4930-af9d-78b4dcadd555] INFO  |k.c.g.c.a.c.TimeAttackController: year ::: 2017
    * 14:13:30.605 [boundedElastic-1] [trace.id=20f728e2-7854-4930-af9d-78b4dcadd555] INFO  |reactor.Flux.LiftFuseable.1: | onContextUpdate(Context1{trace.id=20f728e2-7854-4930-af9d-78b4dcadd555})
    * 14:13:30.605 [boundedElastic-1] [trace.id=20f728e2-7854-4930-af9d-78b4dcadd555] INFO  |reactor.Flux.LiftFuseable.1: | onNext(2018)
    * 14:13:30.605 [boundedElastic-1] [trace.id=20f728e2-7854-4930-af9d-78b4dcadd555] INFO  |k.c.g.c.a.c.TimeAttackController: year ::: 2018
    * 14:13:30.605 [boundedElastic-1] [trace.id=20f728e2-7854-4930-af9d-78b4dcadd555] INFO  |reactor.Flux.LiftFuseable.1: | onComplete()

     

    Appendix - Use Spring Reactive Stack

    추가적으로 Spring Reactive Stack 을 사용할 경우, WebFilter에 Context를 추가하여, 모든 요청이 들어오면 trace.id 를 context에 담아서 전달하면면 일괄적으로 로깅 추적이 가능하다.

    public class LoggingFilter implements WebFilter {
        @Override
        public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
            return chain.filter(exchange).subscriberContext(ReactiveLoggingUtils::injectTraceId);
        }
    }

     

    마치며

    이전 글에서 살펴본 Spring MVC Stack에서의 Spring MVC Logging With  AOP + MDC 처리 방법 과

    Spring Reactive Stack (WebFlux/WebClient)에서 로깅 추적이 힘든 부분을, Reactor Context + MDC를 활용하여

    Spring MVC+Reactive Stack Application에서 로깅 처리 및 추적이 가능하게 되었습니다.

     

    빈약한 글이지만, 궁금증 있으시면 댓글 부탁드립니다:D

     

     

    #참조

    https://www.novatec-gmbh.de/en/blog/how-can-the-mdc-context-be-used-in-the-reactive-spring-applications/
    https://woowabros.github.io/experience/2020/02/19/introduce-shop-display.html

     

     

     

Designed by Tistory.