Reactor, 제대로 사용하기 - Error Handling

2022. 11. 27. 23:54Spring

반응형

Project Reactor의 Reactor API 사용 중 발생하는 오류를 처리하기 위한 메소드를 이해하고 사용하는 것이 해당 포스팅의 목표입니다. 

 

onErrorXxx

onErrorXxx 형태의 메소드는 Reactor API 처리 중 오류가 발생하면 일어나는 onError Signal을 대상으로 처리합니다.

다룰 메소드를 훑어보면 아래와 같습니다.

 

onErrorComplete():
: onError Signal을 onComplete Signal로 변경 

onErrorContinue(BiConsumer<Throwable, Object>)
: 해당 Element 스킵 후 다음 Element로 로직 계속 진행. Consumer에서 적절한 처리 지정

onErrorMap()
: Exception을 다른 Exception으로 타입 전환

onErrorResume(Function<Throwable, Mono<T>>)
: 지정된 Fallback method 실행 

onErrorReturn(T) 
: 예외 발생한 Element를 Fallback Value로 대체

 

 

각 메소드의 형태는 대부분 비슷한 형태로 예외를 다룹니다.

크게 세 가지의 형태로 분류할 수 있습니다. 

(1) 조건 파라미터가 없는 메소드, (2) 클래스 타입을 조건 파라미터로 받는 메소드, (3) Predicate를 조건 파라미터로 받아 매칭하는 메소드가 있습니다.

 

(1) onErrorXxx()
(2) onErrorXxx(Class<?> type)
(3) onErrorXxx(Predicate<?> errorPredicate)

 

(1) 특정 조건이 없는 메소드면 모든 예외 발생 시 해당 메소드를 실행하고,

(2), (3) 특정 조건을 명시하면 해당 조건에 맞는 경우에만 해당 메소드를 실행시킵니다.

 

가령, onErrorXxx(RuntimeException.class) 와 같이 사용하면 RuntimeException만을 해당 메소드에서 처리하게 됩니다.

 

 

해당 포스팅에서는 위의 메소드들을 살피며 예문을 살피며 에러 핸들링에 대해 다루겠습니다.

 

 

 

 

 doOnEach

들어가기에 앞서 doOnEach를 먼저 소개해드리고자 합니다.

onErrorXxx 를 실행하기 전과 후에 로그를 남겨 메소드 실행을 눈으로 확인하려고 하는데요.

이 때 doOnEach()로 로그를 남기고자 합니다.

 

 

doOnNext vs doOnEach

Mono와 Flux를 사용하면서 흔히 doOnNext를 사용하곤 할텐데요.

그런데 왜 doOnNext가 아닌 doOnEach를 사용했을까요?

doOnNext는 오류가 발생했을 때는 실행되지 않으며, doOnEach는 오류가 발생했을 때에도 실행되는 메소드입니다.

따라서, 해당 포스팅에서는 오류를 위한 로그를 남기기 위해 doOnEach를 사용합니다.

 

 

 

 

📌 onErrorComplete

Simply complete the sequence by replacing an onError signal with an onComplete signal. if the error matches the given Class or Predicate.

 

 

onErrorComplete 메소드는 오류 발생 시 전달되는 onError Signal을 onComplete Signal로 변경합니다.

Sequence를 완료 조건으로 마무리하는 간단한 방법입니다.

메소드의 인자로 Class 나 Predicate 조건을 넘기면 해당 조건에 맞는 에러만 onComplete Signal로 변경합니다.

 

public final Mono<T> onErrorComplete()
public final Mono<T> onErrorComplete(Class<? extends Throwable> type)
public final Mono<T> onErrorComplete(Predicate<? super Throwable> predicate)

 

 

위의 그림을 보면 첫 번째 인자 다음으로 X 표시가 보이죠. X는 리액터 다이어그램에서 오류가 발생했다는 의미이고 onErrorComplete() 메소드를 거친 후 정상 완료 표시인 | 을 확인할 수 있습니다.

예시를 통해 같이 살펴보도록 하겠습니다.

 

@Test
public void onErrorComplete() {
	final AtomicInteger datasource = new AtomicInteger(0);
	Mono<Integer> monoWithError = Mono.just(datasource).map(i -> 100 / i.get());

	StepVerifier.create(monoWithError)
		.verifyErrorMessage("/ by zero");
}

 

위 코드는 Mono API 처리 중 발생한 오류를 검증verifyError합니다.

Mono<Integer> 타입의 monoWithError는 숫자를 0으로 나누어 오류를 발생시킵니다.

 

이번엔 여기에 onErrorComplete() 메소드를 추가해서 onComplete Signal로 마무리하도록 만들겠습니다.

 

@Test
public void onErrorComplete() {
	final AtomicInteger datasource = new AtomicInteger(0);
	Mono<Integer> mono = Mono.just(datasource)
		.map(i -> 100 / i.get())
		.onErrorComplete();
        
	StepVerifier.create(mono).verifyComplete();
}

 

verifyComplete()로 검증하여 onComplete Signal로 마무리된 것을 확인할 수 있습니다.

doOnEach를 이용해서 로그를 남겨서 이해를 더해보겠습니다.

 

 

 

✔ Logging: doOnEach

 

아래는 위의 코드와 동일하며 onErrorComplete() 실행 전과 후에 doOnEach()로 로그를 남기는 코드입니다.

 

@Test
public void onErrorComplete() {
	final AtomicInteger datasource = new AtomicInteger(0);
	Mono<Integer> mono = Mono.just(datasource)
		.map(i -> 100 / i.get())
		.doOnEach(signal -> log.info("before {}", signal.toString()))
		.onErrorComplete()
		.doOnEach(signal -> log.info("after {}", signal.toString()));
        
	StepVerifier.create(mono).verifyComplete();
}

 

위는 map 👉🏻 doOnEach 👉🏻 onError 👉🏻 doOnEach 와 같이 실행됩니다.

위의 코드를 실행해보면 아래와 같이 출력됩니다.

 

before onError(java.lang.ArithmeticException: / by zero)
after onComplete()

 

onError가 onComplete로 변경하는 것을 눈으로 확인할 수 있죠.

 

 

 Param: Class

특정 오류 타입만을 onComplete로 변경하고 싶다면, 아래와 같이 특정 타입을 명시할 수 있습니다.

 

@Test
public void onErrorComplete() {
	final AtomicInteger datasource = new AtomicInteger(0);
	Mono<Integer> mono = Mono.just(datasource)
		.map(i -> 100 / i.get())
		.onErrorComplete(ArithmeticException.class);
        
	StepVerifier.create(mono).verifyComplete();
}

 

ArithmeticException의 인스턴스가 발생했을 때만 onError Signal을 변경시킵니다.

가령, 조금 더 큰 범위를 지정하고자 RuntimeException을 지정할 수도 있겠죠.

 

만약 조금 더 복잡한 조건을 지정하고 싶다면 아래와 같이 Predicate를 지정할 수 있습니다.

 

 

 

 Param: Predicate

특정 조건에 따라 onComplete로 변경하고 싶다면, 아래와 같이 Predicate를 명시할 수 있습니다.

 

@Test
public void onErrorComplete() {
	final AtomicInteger datasource = new AtomicInteger(0);
	Mono<Integer> mono = Mono.just(datasource)
		.map(i -> 100 / i.get())
		.onErrorComplete(throwable -> throwable instanceof ArithmeticException));
        
	StepVerifier.create(mono).verifyComplete();
}

 

ArithmeticException과 동일한 예외일 때만 onComplete Signal로 변경합니다.

 

 

 

 

📌 onErrorContinue

Let compatible operators upstream recover from errors by dropping the incriminating element from the sequence and continuing with subsequent elements. The recovered error and associated value are notified via the provided BiConsumer. Alternatively, throwing from that biconsumer will propagate the thrown exception downstream in place of the original error, which is added as a suppressed exception to the new one.

 

onErrorContinue는 Publisher(Flux)의 수행 중 오류가 발생했을 때,

발생한 오류인 Throwable과 오류가 발생한 Element를 BiConsumer의 인자로써 넘겨줍니다.

 

onErrorContinue을 생각해보면, Mono에서는 사실상 이치에 맞지 않는 메소드입니다.

이유는, 해당 Element를 넘기고 다음을 처리하는 메소드인데, Mono에서는 다음에 처리할 Element가 없기 때문입니다.

 

따라서 인자로 받아 처리할 BiConsumer를 정의합니다.

BiConsumer를 정의해서 가령 예외가 발생했을 때 Third Party 라이브러리의 Message Queue에 전송한다던지의 적절한 처리를 할 수 있습니다.

 

public void onErrorContinue() {
    List<String> valueDropped = new ArrayList<>();
    List<Throwable> errorDropped = new ArrayList<>();

    Flux<String> test = Flux.just("foo", "", "bar", "baz")
        .filter(s -> 3 / s.length() == 1)
        .onErrorContinue(ArithmeticException.class,
            (t, v) -> {
                errorDropped.add(t);
                valueDropped.add((String) v);
            });

    StepVerifier.create(test)
        .expectNext("foo")
        .expectNext("bar")
        .expectNext("baz")
        .verifyComplete();

    assertThat(valueDropped).isEqualTo(List.of(""));
    assertThat(errorDropped.get(0).getMessage()).isEqualTo("/ by zero");
}

 

onErrorContinue() 메소드 내부를 살펴보면

errorDropped, valueDrroped 리스트에 각각 오류와 오류 발생 요소를 추가하는 로직입니다.

 

 

 

 

📌 onErrorMap

Transform an error emitted by this Mono by synchronously applying a function to it if the error matches the given predicate. Otherwise let the error pass through.

 

onErrorMap은 발생한 예외를 다른 예외로 변환합니다.

아래의 그림과 같이 예외 발생 표시인 빨간 ❌ 표시를 노란 ❌표시로 변경하는 것을 확인할 수 있습니다.

 

 

가령 Exception이 발생했을 때 RuntimeException으로 변경하고 싶을 때 해당 메소드를 통해 처리할 수 있습니다.

코드를 통해 예시를 살펴보겠습니다.

 

@Test
public void onErrorMapSimple() {
	Mono<Integer> data = Mono.<Integer>error(new Exception())
		.onErrorMap(t -> new MonoException(t.getMessage()));

	StepVerifier.create(data)
		.expectError(MonoException.class)
		.verify();
}

 

위는 예외가 발생하면 MonoException으로 전환하는 코드입니다.

 

이번에는 메소드에 조건을 인자로 넘겨 특정 조건에 맞는 에러 핸들링을 처리해보겠습니다.

이 외에도 인자로 Class 타입이나 Predicate를 명시해서 특정 예외만 처리할 수 있습니다.

 

 

onErrorMap(Class type, Function mapper)
onErrorMap(Predicate predicate, Function mapper)

 

 

위 의 코드를 테스트 코드에 적용해본다면 아래와 같이 작성해 볼 수 있습니다.

 

// Mono<T> onErrorMap(Class<E> type, Function<? super E,? extends Throwable> mapper)
Mono.<Integer>error(new Exception())
	.onErrorMap(RuntimeException.class, t -> new MonoException(t.getMessage()))


// Mono<T> onErrorMap(Predicate predicate, Function<? super Throwable,? extends Throwable> mapper)
Mono.<Integer>error(new Exception())
    .onErrorMap(t -> t.getMessage().contains("ERROR"), t -> new MonoException(t.getMessage()));

 

 

 

 

 

📌 onErrorResume

Subscribe to a fallback publisher when any error occurs, using a function to choose the fallback depending on the error.

 

onErrorResume() 메소드는 로직 실행 중 예외가 발생했을 때 정상적인 처리로 계속 진행을 위해 fallback 을 지정합니다.

로직 처리 중 예외가 발생해면 onErrorResume()이 실행되고, 지정한 Fallback이 실행됩니다.

 

아래 다이어그램을 확인해 보면 ❌ (예외)가 발생했을 때, onErrorResume() 내부에서

새로운 Publisher(→, Fallback)을 subscribe하여 다시 다른 Element 🔴를 가져오는 것을 확인할 수 있습니다.

 

 

 

예시 코드로 다시 한 번 살펴보겠습니다.

 

public void onErrorResume() {
	Function<Throwable, Mono<String>> fallback = (thr) -> {		// Error occur: '<<ERROR>>'. ignore current element 'MONO_TEST'.
		return Mono.just("FALLBACK_MESSAGE");
	};

	Mono<String> data = Mono.just("MONO_TEST")
		.map(str -> { throw new MonoException("<<ERROR>>"); })
		.doOnEach(signal -> log.info("before {}", signal.toString()))
		.onErrorResume(fallback)
		.doOnEach(signal -> log.info("after {}", signal.toString()))
		.subscribe();
}

 

위의 코드를 실행시키면 아래와 같은 출력 값을 확인할 수 있습니다.

 

before onError(com.gngsn.webClient.reactor.MonoOnErrorXxxTest$MonoException: <<ERROR>>)
after doOnEach_onNext(FALLBACK_MESSAGE)
after onComplete()

 

조건 값을 넣고 싶다면 아래와 같은 형식으로 지정할 수 있습니다.

 

// <E extends Throwable> Mono<T> onErrorResume(Class<E> type, Function<? super E,? extends Mono<? extends T>> fallback)
Mono.<String>error(new Exception())
	.onErrorResume(RuntimeException.class, t -> Mono.just("FALLBACK_MESSAGE"))

// onErrorResume(Predicate<? super Throwable> predicate, Function<? super Throwable,? extends Mono<? extends T>> fallback)
Mono.<String>error(new Exception())
    .onErrorResume(
    	t -> t instanceof MonoException), 
    	t -> Mono.just("FALLBACK_MESSAGE"))
    );

 

 

 

 

📌 onErrorReturn

Simply emit a captured fallback value when any error is observed on this Mono.

 

onErrorReturn() 메소드는 만약 예외가 발생하면, 대신 반환할 Element를 미리 지정합니다.

예외가 발생하면 미리 지정해둔 FallbackValue가 대신 반환됩니다.

 

 

예시 코드를 확인해보도록 하겠습니다.

 

	@Test
	public void onErrorReturn() {
		Mono<String> data = Mono.just("MONO_TEST");
		String fallbackValue = "FALLBACK_MESSAGE";

		data.map(str -> { throw new MonoException("<<ERROR>>"); })
			.doOnEach(signal -> log.info("before {}.", signal.toString()))		// before onError(com.gngsn.webClient.reactor.MonoOnErrorXxxTest$MonoException: <<ERROR>>).
			.onErrorReturn(fallbackValue)										// onError -> onComplete: all Exception will be transformed
			.doOnEach(signal -> log.info("after {}", signal.toString()))		// it is called twice. (1) after doOnEach_onNext(FALLBACK_MESSAGE). => (2) after onComplete()
			.subscribe(str -> log.info("fallback message is '{}'.", str.toString()));
	}

 

 

 

 

 

 

 

 

 

 

 

 

 

반응형