Reactor, 제대로 이해하기, zip() method

2022. 12. 18. 21:29Spring

반응형

Reactor Publisher인 Mono와 Flux의 요소를 결합하는 zip를 이해하고 예제 코드로 직접 코드를 확인하는 것이 해당 포스팅의 목적입니다.

 

 

| Reactor Series | 

Reactive Programming, 제대로 이해하기

Reactor 제대로 이해하기, Marble Diagram

- Reactor, 제대로 이해하기 - Flux Create

👉🏻 Reactor, 제대로 이해하기, zip() method

 


 

Zip method?

Reactor가 zip method를 사용하면 두 개의 소스를 병합할 수 있습니다. 다양한 형태의 zip 메소드들을 지원하고 있으며, 모든 zip 메소드는 '주어진 입력 소스를 병합하는' 동일한 역할을 합니다. 아래의 그림에서 볼 수 있듯이, zip 메소드는 여러 Source나 Provider를 받습니다.

 

해당 그림의 C3는 두 번째 소스 '3' 아래에 방출됩니다

 

 

그림의 표현된 내용을 보면, 첫 번째 소스는 A부터 E까지의 다섯 개의 소스(🟢)를 방출하고, 두 번째 소스는 1부터4까지의 네 개의 소스 (🟥)를 방출합니다. 모든 소스는 다른 소스에서 방출되는 요소가 있을 때 방출되어 병합됩니다.

각 소스는 병합되어 하나씩 방출되어야 하기 때문에, 다른 소스에서 방출되는 요소가 없다면 대기합니다. 때문에 위의 그림에서 A1, B2, C3, D4이 출력되고 첫 번째 소스에서 방출된 E는 결합되지 않은 것을 알 수 있습니다.

실생활에서 사용하는 '지퍼'와 매우 동일하지 않나요? 치아처럼 생긴 두 부분이 서로 감겨져서 마치 하나를 이루는 모습과 동일하다고 비유할 수 있습니다. 지금부터는 Reactor에서 제공하는 zip 메소드들의 종류를 알아보도록 하겠습니다.

 

 

📌 zip( ) returning Tuple

: 🟢 +🟥 -> { 🟢, 🟥 }

 

가장 간단한 형태의 함수는 두 개의 소스를 받아 하나의 Tuple로 반환하는 zip 함수입니다. 해당 메소드는 입력한 소스의 개수와 동일한 개수의 요소를 가지는 Tuple을 반환합니다. 

 

🟢 +🟥 -> {🟢, 🟥}

 

소스 A와 소스 1을 Tuple로 결합해서 Tuple{A, 1}을 생성한 그림입니다. 입력 값의 개수이자 Tuple에 포함된 요소의 개수는 최소 두 개부터 최대 여덟 개를 취할 수 있습니다. 아래는 관련 메소드의 정의입니다.

 

Mono<Tuple2<T1,T2>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2)
Mono<Tuple3<T1,T2,T3>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3)
Mono<Tuple4<T1,T2,T3,T4>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2,..., Mono<? extends T4> p4)
Mono<Tuple5<T1,T2,T3,T4,T5>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, ..., Mono<? extends T5> p5)
Mono<Tuple6<T1,T2,T3,T4,T5,T6>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, ..., Mono<? extends T6> p6)
Mono<Tuple7<T1,T2,T3,T4,T5,T6,T7>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, ..., Mono<? extends T7> p7)
Mono<Tuple8<T1,T2,T3,T4,T5,T6,T7,T8>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, ...,  Mono<? extends T8> p8)

 

위의 함수 정의를 보면 알겠지만, 가령 4개의 입력 값을 전달하면 Tuple4라는 4개의 요소를 가지는 Tuple을 반환 받습니다. 실제 코드를 확인해보며 예시를 살펴보겠습니다.

 

Mono<Tuple4<String, String, String, String>> mono = Mono.zip(
    Mono.just("hello"),
    Mono.just("world"),
    Mono.just("hello"),
    Mono.just("reator")
);
// output: [hello,world,hello,reator]

 

입력 값을 모두 String 타입으로 네 개의 인자를 전달했으며, 그 결과 Tuple4를 얻을 수 있습니다.

 

 

 

 

📌 Mono.zip( combinator )

: 🟢 +🟨 -> {🛑}

 

해당 함수는 여러 Mono 소스를 원하는 타입과 값으로 결합할 수 있습니다. 위의 zip 함수와 동일한 이름이지만, 지정된 입력 형식과 지정된 결과 값을 도출하지 않는다는 점이 다릅니다. 해당 함수는 여러 Mono 타입의 소스를 다루며, 원하는 방식으로 결합할 수 있도록 combinator를 정의할 수 있습니다. 

 

🟢 +🟨 -> {🛑}

 

그림을 보면, 위의 함수에서는 결과 값을 Tuple{A, 1}이었지만, 해당 함수는 A1을 하나의 Element로 표시하는 것을 확인할 수 있습니다. 해당 함수 정의를 살펴보도록 해봅시다.

 

/*
  R     -> Resource Type
  R'    -> ? extends R
  O'[]  -> ? super Object[]
  M'    -> ? extends Mono<?>
*/
Mono<R> zip(Iterable<M> monos, Function<O'[], R'> combinator)
Mono<R> zip(Function<O'[], R'> combinator, Mono<?>... monos)

 

위의 두 메소드는 다른 입력 값을 받는 것 같지만 실제로는 다수의 Mono 데이터와 해당 Mono 데이터를 결합하는 combinator를 정의할 수 있습니다. 예시 코드를 통해 이해를 더해보도록 해봅시다.

 

 

Example1. with Iterable Monos

Mono<Tuple3> mono = Mono.zip(
    Arrays.asList(Mono.just("foo"), Mono.just("foo2"), Mono.just("foo3")), // (1)
    Tuples.fn3()	// (2)
);
// output: [foo,foo2,foo3]	// (3)

 

(1) Mono로 감싼 세 개의 소스 foo, foo2, foo3 (monos)와

(2) 해당 소스들을 변환할 Tuples 클래스의 fn3 (combinator)를 입력 값으로 넣어주어

(3) Mono에 감싸진 Tuple3{foo, foo2, foo3} 를 반환받습니다.

 

 

Example2. with spread of multiple Monos

Function<Object[], String> conbinator = obArr -> Arrays.stream(obArr)
    .map(Object::toString)
    .reduce((a,b) -> a.concat(" " + b))
    .orElse("");

Mono<String> mono = Mono.zip(conbinator,
    Mono.just("hello"),
    Mono.just("world,"),
    Mono.just("hello"),
    Mono.just("reator")
);
// output: hello world, hello reator

 

첫 번째 예시에서 사용한 것처럼 combinator가 미리 정의된 함수될 수도 있고, 두 번째 예시에서 처럼 사용자의 입맛대로 정의할 수 있습니다.

 

 

 

📌 Flux.zip( combinator )

: 🟢 + 🟨 -> {🛑}

 

Sources 혹은 Publisher를 입력받아 combinator로 변환하여 Flux로 감싼 결과 값으로 반환합니다. Mono.zip과 비슷하게 Flux에서도 결합 방식을 combinator로 정의하기 때문에, 방출되는 소스를 원하는 타입과 값으로 결합할 수 있습니다. 

 

🟢 +🟨 -> {🛑}

 

Flux의 zip메소드는 Mono보다 더 다양하게 정의되어 있습니다. 하지만 모두 입력 소스와 combinator 를 받는 의미에서 동일합니다.

 

Flux<O> zip(Function<? super Object[],? extends O> combinator, Publisher<? extends I>... sources)
Flux<O> zip(Function<? super Object[],? extends O> combinator, int prefetch, Publisher<? extends I>... sources)

Flux<O> zip(Iterable<? extends Publisher<?>> sources, Function<? super Object[],? extends O> combinator)
Flux<O> zip(Iterable<? extends Publisher<?>> sources, int prefetch, Function<? super Object[],? extends O> combinator)
Flux<V> zip(Publisher<? extends Publisher<?>> sources, Function<? super TUPLE,? extends V> combinator)

 

처음보기에 낯선 단어인 prefetch가 등장합니다. 

 

✔️ Prefetch

Prefetch는 Flux에서 Backpressure를 위해 실제 메소드 역할을 수행하기 전에 미리 실행을 수행합니다. Prefetch는 zip 메소드 뿐만 아니라, 몇몇의 Flux 메소드에서 확인할 수 있는 범용적인 개념입니다. downstream이 upstream으로 request를 보내 downstream이 요구한 데이터를 upstream이 미리 방출합니다. 만약 operator가 prefetch의 75%정도를 수행했다면, prefetch의 75%정도를 다시 upstream에게 미리 요청합니다. 

해당 포스팅에서는 여기까지만 다루고, 다음에 기회가 생기면 다뤄보도록 하겠습니다. 해당 내용은 공식 문서에서 확인할 수 있습니다.

 

 

Flux.zip()의 예시 코드를 확인해보도록 하겠습니다.

 

Flux<Integer> evenNumbers = Flux
    .range(1, 5)
    .filter(x -> x % 2 == 0);

Flux<Integer> oddNumbers = Flux
    .range(1, 5)
    .filter(x -> x % 2 > 0);

Flux<Integer> fluxOfIntegers = Flux.zip(
    evenNumbers,
    oddNumbers,
    (a, b) -> a + b); // 3 7
 
 

📌 zipWith()

: 🟢.zipWith(🟨) -> {🛑}

 

zipWith()는 Flux에서 방출되는 요소를 파라미터로 받은 또 다른 Publisher 소스와 병합합니다.

 

🟢.zipWith(🟨) -> {🛑}
 
해당 메소드는 위에서 살펴본 방식대로 해당 메소드가 정의된 방식을 두 가지로 나눠보면 아래와 같습니다.
- 입력된 소스들을 병합해서 Tuple2 (두 개의 요소를 갖는 Tuple)로 방출
- 입력된 소스들을 병합하는 방식을 combinator 정의해서 원하는 형식으로 방출
 
// # Mono
Mono<Tuple2<T,T2>> zipWith(Mono<? extends T2> other)
Mono<O> zipWith(Mono<? extends T2> other, BiFunction<? super T,? super T2,? extends O> combinator)


// # Flux
Flux<Tuple2<T,T2>> zipWith(Publisher<? extends T2> source2)
Flux<Tuple2<T,T2>> zipWith(Publisher<? extends T2> source2, int prefetch)
Flux<V> zipWith(Publisher<? extends T2> source2, BiFunction<? super T,? super T2,? extends V> combinator)
Flux<V> zipWith(Publisher<? extends T2> source2, int prefetch, BiFunction<? super T,? super T2,? extends V> combinator)

 

combinator는 BiFunction 타입을 가지며 ((🟢, 🟨) -> 🛑), 해당 메소드를 호출하는 원본 소스와 해당 메소드의 파라미터로 입력받은 소스를 병합합니다. 

 

아래의 예시를 확인해보도록 해볼게요.

 

Flux<Integer> evenNumbers = Flux
    .range(1, 5)
    .filter(x -> x % 2 == 0);

Flux<Integer> oddNumbers = Flux
    .range(1, 5)
    .filter(x -> x % 2 > 0);

Flux<Integer> fluxOfIntegers = evenNumbers
            .zipWith(oddNumbers, (a, b) -> a * b); // 2  12

 

 

 

중첩 flatMap 해결

zipWith (+ 아래에서 다룰 zipWhen)은 종종 발생하는 중첩 flatMap을 해결하는데 큰 도움이 됩니다.

 

fun getProductById(productId: Int): Mono<Product> {
    return productRepository.findById(productId)
        .flatMap { product ->
            colorRepository.findByProduct(productId).collectList()
                .flatMap { colors ->
                    sizeRepository.findByProduct(productId).collectList()
                        .map { sizes ->
                            product.copy(colors = colors, sizes = sizes)
                        }
        }
    }
}

 

위와 같이 중첩된 flatMap을 zipWith을 통해 간단히 줄일 수 있습니다.

fun getProductById(productId: Int): Mono<Product> {
    return productRepository.findById(productId)
        .zipWith(colorRepository.findByProduct(productId).collectList())
        .map { it.t1.copy(colors = it.t2) }
        .zipWith(sizeRepository.findByProduct(productId).collectList())
        .map { it.t1.copy(sizes = it.t2) }
}

 

 

참고로, 코드는  🔗 Avoiding nested streams in Spring Webflux & Kotlin 에서 참고했습니다.

상세 내용을 참고하면 이해에 큰 도움이 될 듯합니다.

 

 

 

📌 zipWithIterable()

: 🟢.zipWithIterable(🟨, 🟨, 🟨, …) -> {🛑}

 

zipWithIterable() 메소드는 Iterable이 방출하는 요소를 결합합니다.

 

🟢.zipWithIterable(🟨, 🟨, 🟨, &hellip;) -> {🛑}

 

 Flux 클래스에서만 정의되어 있는데, 여러 요소를 방출하는 Flux의 특징을 생각하면 Flux만이 Iterator와의 zip연산을 수행할 수 있다는 것을 알 수 있습니다.

 

// Flux
Flux<Tuple2<T,T2>> zipWithIterable(Iterable<? extends T2> iterable)
Flux<V> zipWithIterable(Iterable<? extends T2> iterable, 
                        BiFunction<? super T,? super T2,? extends V> zipper)

 

 

Iterable과 BiFunction을 받아 Flux로 감싼 데이터 소스를 반환하는 쉬운 예시로 준비했습니다.

 

Flux<Integer> f = Flux.range(1, 5)
     .zipWithIterable(Arrays.asList(10, 20, 30, 40, 50), (a, b) -> a + b);

// output: 11 22 33 44 55

 

 

 

📌 zipWhen

: 🟢.zipWhen(⏱…🟨) -> {🛑}

 

zipWhen() 메소드는 입력 값으로 받는 소스가 방출되는 시점에 결합됩니다.

 

🟢.zipWhen(⏱&hellip;🟨) -> {🛑}

 

 

zipWhen() 은 Mono 에서 정의되었으며, 파라미터 값으로 rightGenerator을 받습니다. rightGenerator는 Mono로 감싼 소스 값을 반환하는 Function입니다. zipWhen() 메소드는 zipWhen() 호출하는 Mono 요소와 rightGenerator에서 방출되는 요소를 결합합니다.

 

Mono<Tuple2<T,T2>> zipWhen(Function<T,Mono<? extends T2>> rightGenerator)
Mono<O> zipWhen(Function<T,Mono<? extends T2>> rightGenerator, BiFunction<T,T2,O> combinator)

 

combinator는 Tuple2가 아닌 특정 값으로 반환하고 싶을 때 유용합니다. 원하는 방식으로 커스터마이징하거나 개발자가 원하는 오브젝트를 반환함으로써 활용할 수 있습니다.

 

예시를 통해 감을 익혀보겠습니다.

 

record User (
        Integer userId,
        String username,
        List<String> attributes
) {}

...

Mono<User> userMono = Mono.just(new User(1, "Sunny", List.of("Lovely", "Curious")));
Mono<Boolean> emailSentMono = Mono.just(/* sending email logic and returning the result */ true);
Mono<String> databaseResultMono = Mono.just("'new user 'sunny' is created");

Mono<String> result = userMono
        .zipWhen(user -> emailSentMono, (t1, t2) -> Tuples.of(t1, t2))
        .zipWhen(tuple -> databaseResultMono, (tuple, databaseResult) -> {
            User user = tuple.getT1();
            Boolean emailSent = tuple.getT2();
            return "Response: " + user + ", Email Sent: " + emailSent + ", Database Result: " + databaseResult;
});

result.subscribe(System.out::println);
// Response: User[userId=1, username=Sunny, attributes=[Lovely, Curious]], Email Sent: true, Database Result: 'new user 'sunny' is created

 

 

 

📌 zipDelayError

: ❌.zipDelayError(🟨) -> {…⏱…❌}

 

zipDelayError() 메소드는 소스에서 오류가 발생해도 오류를 바로 반환하지 않고 모든 소스가 완료될 때까지 보류합니다. 

 

❌.zipDelayError(🟨) -> {&hellip;⏱&hellip;❌}

 

 

zipDelayError()는 오직 Mono에서만 정의되어 있으며, 하나 혹은 그 이상의 오류를 합쳐서 한 번에 반환합니다.

 

// Only for Mono
Mono<R> zipDelayError(Function<? super Object[],? extends R> combinator, Mono<?>... monos)
Mono<R> zipDelayError(Iterable<? extends Mono<?>> monos, Function<? super Object[],? extends R> combinator)

Mono<Tuple2<T1,T2>> zipDelayError(Mono<? extends T1> p1, Mono<? extends T2> p2)
Mono<Tuple3<T1,T2,T3>> zipDelayError(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3)
...
Mono<Tuple8<T1,T2,...,T8>> zipDelayError(Mono<? extends T1> p1, Mono<? extends T2> p2, ..., Mono<? extends T7> p7, Mono<? extends T8> p8)

 

첫 번째 살펴보았던 Tuple을 반환하는 zip() 메소드처럼, zipDelayError() 도 비슷한 형식을 취합니다. N개의 요소를 파라미터로 받아 TupleN의 타입을 Mono로 감싸서 반환합니다. 

 

Exception boom1 = new NullPointerException("boom1");
Exception boom2 = new IllegalArgumentException("boom2");

StepVerifier.create(Mono.zipDelayError(
        Arrays.asList(Mono.just("foo"), Mono.<String>error(boom1), Mono.<String>error(boom2)),
        Tuples.fn3()))
    .verifyErrorMatches(e -> {
            return e.getMessage().equals("Multiple exceptions") &&
                   e.getSuppressed()[0] == boom1 &&
                  e.getSuppressed()[1] == boom2;
     });

 

예시를 보면 NullPointerException과 IllegalArgumentException을 한 번에 반환하는 것을 확인할 수 있습니다.

 

반응형

Backend Software Engineer

Gyeongsun Park