1. 개요

반응형 스트림 디버깅 은 아마도 이러한 데이터 구조를 사용하기 시작하면 직면하게 될 주요 과제 중 하나일 것입니다.

그리고 지난 몇 년 동안 Reactive Streams가 인기를 얻었다는 점을 염두에 두고 이 작업을 효율적으로 수행할 수 있는 방법을 아는 것이 좋습니다.

이것이 종종 문제가 되는 이유를 알아보기 위해 반응성 스택을 사용하여 프로젝트를 설정하는 것부터 시작하겠습니다.

2. 버그가 있는 시나리오

우리는 여러 비동기 프로세스가 실행되고 있고 결국 예외를 트리거할 코드의 일부 결함을 도입한 실제 시나리오를 시뮬레이트하려고 합니다.

큰 그림을 이해하기 위해 애플리케이션이 id , formattedName수량 필드 만 포함하는 간단한 Foo 개체 의 스트림을 소비하고 처리할 것이라고 언급할 것입니다 .

2.1. 로그 출력 분석

이제 스니펫과 처리되지 않은 오류가 나타날 때 생성되는 출력을 살펴보겠습니다.

public void processFoo(Flux<Foo> flux) {
    flux.map(FooNameHelper::concatFooName)
      .map(FooNameHelper::substringFooName)
      .map(FooReporter::reportResult)
      .subscribe();
}

public void processFooInAnotherScenario(Flux<Foo> flux) {
    flux.map(FooNameHelper::substringFooName)
      .map(FooQuantityHelper::divideFooQuantity)
      .subscribe();
}

응용 프로그램을 몇 초 동안 실행하면 때때로 예외를 기록하고 있음을 알게 됩니다.

오류 중 하나를 자세히 살펴보면 다음과 유사한 내용을 찾을 수 있습니다.

Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: 15
    at j.l.String.substring(String.java:1963)
    at com.baeldung.debugging.consumer.service.FooNameHelper
      .lambda$1(FooNameHelper.java:38)
    at r.c.p.FluxMap$MapSubscriber.onNext(FluxMap.java:100)
    at r.c.p.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
    at r.c.p.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:275)
    at r.c.p.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:849)
    at r.c.p.Operators$MonoSubscriber.complete(Operators.java:1476)
    at r.c.p.MonoDelayUntil$DelayUntilCoordinator.signal(MonoDelayUntil.java:211)
    at r.c.p.MonoDelayUntil$DelayUntilTrigger.onComplete(MonoDelayUntil.java:290)
    at r.c.p.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:118)
    at r.c.s.SchedulerTask.call(SchedulerTask.java:50)
    at r.c.s.SchedulerTask.call(SchedulerTask.java:27)
    at j.u.c.FutureTask.run(FutureTask.java:266)
    at j.u.c.ScheduledThreadPoolExecutor$ScheduledFutureTask
      .access$201(ScheduledThreadPoolExecutor.java:180)
    at j.u.c.ScheduledThreadPoolExecutor$ScheduledFutureTask
      .run(ScheduledThreadPoolExecutor.java:293)
    at j.u.c.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at j.u.c.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at j.l.Thread.run(Thread.java:748)

근본 원인을 기반으로 스택 추적에 언급된 FooNameHelper 클래스를 확인하면 경우에 따라 Foo 개체가 예상보다 짧은 formattedName 값으로 처리되고 있다고 상상할 수 있습니다 .

물론 이것은 단순화된 경우일 뿐이며 해결책은 다소 명백해 보입니다.

그러나 이것이 일부 컨텍스트 정보 없이는 예외 자체가 문제를 해결하는 데 도움이 되지 않는 실제 시나리오라고 상상해 봅시다.

예외가 processFoo 또는 processFooInAnotherScenario 메서드 의 일부로 트리거되었습니까 ?

이 단계에 도달하기 전에 다른 이전 단계가 formattedName 필드에 영향을 미쳤습니까?

로그 항목은 이러한 질문을 파악하는 데 도움이 되지 않습니다.

설상가상으로 우리 기능 내에서 예외가 발생하지 않는 경우도 있습니다.

예를 들어 Foo 객체 를 유지하기 위해 반응 저장소에 의존한다고 상상해보십시오 . 그 시점에서 오류가 발생하면 코드 디버깅을 시작할 위치에 대한 단서조차 없을 수 있습니다.

반응형 스트림을 효율적으로 디버깅하려면 도구가 필요합니다.

3. 디버그 세션 사용

우리 애플리케이션에서 무슨 일이 일어나고 있는지 알아내는 한 가지 옵션은 우리가 가장 좋아하는 IDE를 사용하여 디버깅 세션을 시작하는 것입니다.

두 개의 조건부 중단점을 설정하고 스트림의 각 단계가 실행될 때 데이터 흐름을 분석해야 합니다.

사실 이것은 특히 우리가 많은 반응형 프로세스를 실행하고 리소스를 공유하는 경우 번거로운 작업이 될 수 있습니다.

또한 Security상의 이유로 디버깅 세션을 시작할 수 없는 경우가 많이 있습니다.

4. doOnErrorMethod 또는 Subscribe 매개변수를 사용하여 정보 로깅

때로는 subscribe 메소드 의 두 번째 매개변수로 Consumer 를 제공하여 유용한 컨텍스트 정보를 추가할 수 있습니다 .

public void processFoo(Flux<Foo> flux) {

    // ...

    flux.subscribe(foo -> {
        logger.debug("Finished processing Foo with Id {}", foo.getId());
    }, error -> {
        logger.error(
          "The following error happened on processFoo method!",
           error);
    });
}

참고: subscribe 메서드에서 추가 처리를 수행할 필요가 없는 경우 게시자에서 doOnError 함수를 연결할 수 있습니다.

flux.doOnError(error -> {
    logger.error("The following error happened on processFoo method!", error);
}).subscribe();

이제 예외를 생성한 실제 요소에 대한 정보가 많지 않더라도 오류가 발생할 수 있는 위치에 대한 몇 가지 지침이 있습니다.

5. Reactor의 전역 디버그 구성 활성화

Reactor 라이브러리는 Flux Mono 연산자 의 동작을 구성할 수 있는 Hooks 클래스를 제공합니다.

다음 문을 추가하기만 하면 애플리케이션이 게시자의 메서드에 대한 호출을 계측하고 연산자 구성을 래핑하고 스택 추적을 캡처합니다 .

Hooks.onOperatorDebug();

디버그 모드가 활성화되면 예외 로그에 몇 가지 유용한 정보가 포함됩니다.

16:06:35.334 [parallel-1] ERROR c.b.d.consumer.service.FooService
  - The following error happened on processFoo method!
java.lang.StringIndexOutOfBoundsException: String index out of range: 15
    at j.l.String.substring(String.java:1963)
    at c.d.b.c.s.FooNameHelper.lambda$1(FooNameHelper.java:38)
    ...
    at j.l.Thread.run(Thread.java:748)
    Suppressed: r.c.p.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.FluxMapFuseable] :
    reactor.core.publisher.Flux.map(Flux.java:5653)
    c.d.b.c.s.FooNameHelper.substringFooName(FooNameHelper.java:32)
    c.d.b.c.s.FooService.processFoo(FooService.java:24)
    c.d.b.c.c.ChronJobs.consumeInfiniteFlux(ChronJobs.java:46)
    o.s.s.s.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
    o.s.s.s.DelegatingErrorHandlingRunnable
      .run(DelegatingErrorHandlingRunnable.java:54)
    o.u.c.Executors$RunnableAdapter.call(Executors.java:511)
    o.u.c.FutureTask.runAndReset(FutureTask.java:308)
Error has been observed by the following operator(s):
    |_    Flux.map ⇢ c.d.b.c.s.FooNameHelper
            .substringFooName(FooNameHelper.java:32)
    |_    Flux.map ⇢ c.d.b.c.s.FooReporter.reportResult(FooReporter.java:15)

보시다시피 첫 번째 섹션은 비교적 동일하게 유지되지만 다음 섹션에서는 다음에 대한 정보를 제공합니다.

  1. 게시자의 어셈블리 추적 — 여기에서 processFoo 메서드 에서 오류가 처음 생성되었음을 확인할 수 있습니다 .
  2. 연결된 사용자 클래스와 함께 오류가 처음 트리거된 후 오류를 관찰한 연산자입니다.

참고: 이 예에서는 주로 이것을 명확하게 보기 위해 다른 클래스에 작업을 추가합니다.

언제든지 디버그 모드를 켜거나 끌 수 있지만 이미 인스턴스화된 FluxMono 객체에는 영향을 미치지 않습니다.

5.1. 다른 스레드에서 연산자 실행

명심해야 할 또 다른 측면은 스트림에서 작동하는 다른 스레드가 있더라도 어셈블리 추적이 제대로 생성된다는 것입니다.

다음 예를 살펴보겠습니다.

public void processFoo(Flux<Foo> flux) {
    flux.publishOn(Schedulers.newSingle("foo-thread"))
       // ...
      .publishOn(Schedulers.newSingle("bar-thread"))
      .map(FooReporter::reportResult)
      .subscribeOn(Schedulers.newSingle("starter-thread"))
      .subscribe();
}

이제 로그를 확인하면 이 경우 첫 번째 섹션이 약간 변경될 수 있지만 마지막 두 섹션은 거의 동일하게 유지된다는 것을 알 수 있습니다.

첫 번째 부분은 스레드 스택 추적이므로 특정 스레드가 수행하는 작업만 표시합니다.

우리가 본 것처럼 애플리케이션을 디버깅할 때 가장 중요한 섹션이 아니므로 이 변경을 허용할 수 있습니다.

6. 단일 프로세스에서 디버그 출력 활성화

모든 단일 반응 프로세스에서 스택 추적을 계측하고 생성하는 데 비용이 많이 듭니다.

따라서 중요한 경우에만 전자의 접근 방식을 구현해야 합니다 .

어쨌든 Reactor는 단일 중요 프로세스에서 디버그 모드를 활성화하는 방법을 제공합니다. 이는 메모리 소모가 적습니다 .

우리는 체크포인트 연산자를 참조하고 있습니다.

public void processFoo(Flux<Foo> flux) {
    
    // ...

    flux.checkpoint("Observed error on processFoo", true)
      .subscribe();
}

이러한 방식으로 어셈블리 추적은 검사점 단계에서 기록됩니다.

Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: 15
	...
Assembly trace from producer [reactor.core.publisher.FluxMap],
  described as [Observed error on processFoo] :
    r.c.p.Flux.checkpoint(Flux.java:3096)
    c.b.d.c.s.FooService.processFoo(FooService.java:26)
    c.b.d.c.c.ChronJobs.consumeInfiniteFlux(ChronJobs.java:46)
    o.s.s.s.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
    o.s.s.s.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    j.u.c.Executors$RunnableAdapter.call(Executors.java:511)
    j.u.c.FutureTask.runAndReset(FutureTask.java:308)
Error has been observed by the following operator(s):
    |_    Flux.checkpoint ⇢ c.b.d.c.s.FooService.processFoo(FooService.java:26)

리액티브 체인의 끝을 향해 체크포인트 방식을 구현해야 합니다 .

그렇지 않으면 운영자는 다운스트림에서 발생하는 오류를 관찰할 수 없습니다.

또한 라이브러리는 오버로드된 메서드를 제공합니다. 우리는 다음을 피할 수 있습니다:

  • no-args 옵션을 사용하는 경우 관찰된 오류에 대한 설명 지정
  • 사용자 지정 설명만 제공하여 채워진 스택 추적(가장 비용이 많이 드는 작업) 생성

7. 요소 시퀀스 로깅

마지막으로 Reactor 게시자는 경우에 따라 잠재적으로 유용할 수 있는 방법을 하나 더 제공합니다.

반응 체인에서 log 메서드 를 호출하면 애플리케이션은 해당 단계에 있는 상태로 흐름의 각 요소를 기록합니다 .

예제에서 시도해 봅시다.

public void processFoo(Flux<Foo> flux) {
    flux.map(FooNameHelper::concatFooName)
      .map(FooNameHelper::substringFooName)
      .log();
      .map(FooReporter::reportResult)
      .doOnError(error -> {
        logger.error("The following error happened on processFoo method!", error);
      })
      .subscribe();
}

그리고 로그를 확인하십시오.

INFO  reactor.Flux.OnAssembly.1 - onSubscribe(FluxMap.MapSubscriber)
INFO  reactor.Flux.OnAssembly.1 - request(unbounded)
INFO  reactor.Flux.OnAssembly.1 - onNext(Foo(id=0, formattedName=theFo, quantity=8))
INFO  reactor.Flux.OnAssembly.1 - onNext(Foo(id=1, formattedName=theFo, quantity=3))
INFO  reactor.Flux.OnAssembly.1 - onNext(Foo(id=2, formattedName=theFo, quantity=5))
INFO  reactor.Flux.OnAssembly.1 - onNext(Foo(id=3, formattedName=theFo, quantity=6))
INFO  reactor.Flux.OnAssembly.1 - onNext(Foo(id=4, formattedName=theFo, quantity=6))
INFO  reactor.Flux.OnAssembly.1 - cancel()
ERROR c.b.d.consumer.service.FooService 
  - The following error happened on processFoo method!
...

이 단계에서 각 Foo 객체의 상태와 예외가 발생할 때 프레임워크가 흐름을 취소하는 방법 을 쉽게 볼 수 있습니다 .

물론 이 방법도 비용이 많이 들고 적당히 사용해야 합니다.

8. 결론

응용 프로그램을 올바르게 디버깅하는 도구와 메커니즘을 모르면 문제 해결에 많은 시간과 노력을 들일 수 있습니다.

이는 우리가 반응형 및 비동기 데이터 구조를 처리하는 데 익숙하지 않고 작동 방식을 파악하기 위해 추가 도움이 필요한 경우에 특히 그렇습니다.

항상 그렇듯이 전체 예제는 GitHub 저장소 에서 사용할 수 있습니다 .

Generic footer banner