2022. 12. 18. 21:29ㆍSpring
Reactor Publisher인 Mono와 Flux의 요소를 결합하는 zip를 이해하고 예제 코드로 직접 코드를 확인하는 것이 해당 포스팅의 목적입니다.
| Reactor Series |
- Reactive Programming, 제대로 이해하기
- Reactor 제대로 이해하기, Marble Diagram
- Reactor, 제대로 이해하기 - Flux Create
👉🏻 Reactor, 제대로 이해하기, zip() method
Zip method?
Reactor가 zip method를 사용하면 두 개의 소스를 병합할 수 있습니다. 다양한 형태의 zip 메소드들을 지원하고 있으며, 모든 zip 메소드는 '주어진 입력 소스를 병합하는' 동일한 역할을 합니다. 아래의 그림에서 볼 수 있듯이, zip 메소드는 여러 Source나 Provider를 받습니다.
그림의 표현된 내용을 보면, 첫 번째 소스는 A부터 E까지의 다섯 개의 소스(🟢)를 방출하고, 두 번째 소스는 1부터4까지의 네 개의 소스 (🟥)를 방출합니다. 모든 소스는 다른 소스에서 방출되는 요소가 있을 때 방출되어 병합됩니다.
각 소스는 병합되어 하나씩 방출되어야 하기 때문에, 다른 소스에서 방출되는 요소가 없다면 대기합니다. 때문에 위의 그림에서 A1, B2, C3, D4이 출력되고 첫 번째 소스에서 방출된 E는 결합되지 않은 것을 알 수 있습니다.
실생활에서 사용하는 '지퍼'와 매우 동일하지 않나요? 치아처럼 생긴 두 부분이 서로 감겨져서 마치 하나를 이루는 모습과 동일하다고 비유할 수 있습니다. 지금부터는 Reactor에서 제공하는 zip 메소드들의 종류를 알아보도록 하겠습니다.
📌 zip( ) returning Tuple
: 🟢 +🟥 -> { 🟢, 🟥 }
가장 간단한 형태의 함수는 두 개의 소스를 받아 하나의 Tuple로 반환하는 zip 함수입니다. 해당 메소드는 입력한 소스의 개수와 동일한 개수의 요소를 가지는 Tuple을 반환합니다.
소스 A와 소스 1을 Tuple로 결합해서 Tuple{A, 1}을 생성한 그림입니다. 입력 값의 개수이자 Tuple에 포함된 요소의 개수는 최소 두 개부터 최대 여덟 개를 취할 수 있습니다. 아래는 관련 메소드의 정의입니다.
Mono<Tuple2<T1,T2>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2)
Mono<Tuple3<T1,T2,T3>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3)
Mono<Tuple4<T1,T2,T3,T4>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2,..., Mono<? extends T4> p4)
Mono<Tuple5<T1,T2,T3,T4,T5>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, ..., Mono<? extends T5> p5)
Mono<Tuple6<T1,T2,T3,T4,T5,T6>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, ..., Mono<? extends T6> p6)
Mono<Tuple7<T1,T2,T3,T4,T5,T6,T7>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, ..., Mono<? extends T7> p7)
Mono<Tuple8<T1,T2,T3,T4,T5,T6,T7,T8>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, ..., Mono<? extends T8> p8)
위의 함수 정의를 보면 알겠지만, 가령 4개의 입력 값을 전달하면 Tuple4라는 4개의 요소를 가지는 Tuple을 반환 받습니다. 실제 코드를 확인해보며 예시를 살펴보겠습니다.
Mono<Tuple4<String, String, String, String>> mono = Mono.zip(
Mono.just("hello"),
Mono.just("world"),
Mono.just("hello"),
Mono.just("reator")
);
// output: [hello,world,hello,reator]
입력 값을 모두 String 타입으로 네 개의 인자를 전달했으며, 그 결과 Tuple4를 얻을 수 있습니다.
📌 Mono.zip( combinator )
: 🟢 +🟨 -> {🛑}
해당 함수는 여러 Mono 소스를 원하는 타입과 값으로 결합할 수 있습니다. 위의 zip 함수와 동일한 이름이지만, 지정된 입력 형식과 지정된 결과 값을 도출하지 않는다는 점이 다릅니다. 해당 함수는 여러 Mono 타입의 소스를 다루며, 원하는 방식으로 결합할 수 있도록 combinator를 정의할 수 있습니다.
그림을 보면, 위의 함수에서는 결과 값을 Tuple{A, 1}이었지만, 해당 함수는 A1을 하나의 Element로 표시하는 것을 확인할 수 있습니다. 해당 함수 정의를 살펴보도록 해봅시다.
/*
R -> Resource Type
R' -> ? extends R
O'[] -> ? super Object[]
M' -> ? extends Mono<?>
*/
Mono<R> zip(Iterable<M> monos, Function<O'[], R'> combinator)
Mono<R> zip(Function<O'[], R'> combinator, Mono<?>... monos)
위의 두 메소드는 다른 입력 값을 받는 것 같지만 실제로는 다수의 Mono 데이터와 해당 Mono 데이터를 결합하는 combinator를 정의할 수 있습니다. 예시 코드를 통해 이해를 더해보도록 해봅시다.
Example1. with Iterable Monos
Mono<Tuple3> mono = Mono.zip(
Arrays.asList(Mono.just("foo"), Mono.just("foo2"), Mono.just("foo3")), // (1)
Tuples.fn3() // (2)
);
// output: [foo,foo2,foo3] // (3)
(1) Mono로 감싼 세 개의 소스 foo, foo2, foo3 (monos)와
(2) 해당 소스들을 변환할 Tuples 클래스의 fn3 (combinator)를 입력 값으로 넣어주어
(3) Mono에 감싸진 Tuple3{foo, foo2, foo3} 를 반환받습니다.
Example2. with spread of multiple Monos
Function<Object[], String> conbinator = obArr -> Arrays.stream(obArr)
.map(Object::toString)
.reduce((a,b) -> a.concat(" " + b))
.orElse("");
Mono<String> mono = Mono.zip(conbinator,
Mono.just("hello"),
Mono.just("world,"),
Mono.just("hello"),
Mono.just("reator")
);
// output: hello world, hello reator
첫 번째 예시에서 사용한 것처럼 combinator가 미리 정의된 함수될 수도 있고, 두 번째 예시에서 처럼 사용자의 입맛대로 정의할 수 있습니다.
📌 Flux.zip( combinator )
: 🟢 + 🟨 -> {🛑}
Sources 혹은 Publisher를 입력받아 combinator로 변환하여 Flux로 감싼 결과 값으로 반환합니다. Mono.zip과 비슷하게 Flux에서도 결합 방식을 combinator로 정의하기 때문에, 방출되는 소스를 원하는 타입과 값으로 결합할 수 있습니다.
Flux의 zip메소드는 Mono보다 더 다양하게 정의되어 있습니다. 하지만 모두 입력 소스와 combinator 를 받는 의미에서 동일합니다.
Flux<O> zip(Function<? super Object[],? extends O> combinator, Publisher<? extends I>... sources)
Flux<O> zip(Function<? super Object[],? extends O> combinator, int prefetch, Publisher<? extends I>... sources)
Flux<O> zip(Iterable<? extends Publisher<?>> sources, Function<? super Object[],? extends O> combinator)
Flux<O> zip(Iterable<? extends Publisher<?>> sources, int prefetch, Function<? super Object[],? extends O> combinator)
Flux<V> zip(Publisher<? extends Publisher<?>> sources, Function<? super TUPLE,? extends V> combinator)
처음보기에 낯선 단어인 prefetch가 등장합니다.
✔️ Prefetch
Prefetch는 Flux에서 Backpressure를 위해 실제 메소드 역할을 수행하기 전에 미리 실행을 수행합니다. Prefetch는 zip 메소드 뿐만 아니라, 몇몇의 Flux 메소드에서 확인할 수 있는 범용적인 개념입니다. downstream이 upstream으로 request를 보내 downstream이 요구한 데이터를 upstream이 미리 방출합니다. 만약 operator가 prefetch의 75%정도를 수행했다면, prefetch의 75%정도를 다시 upstream에게 미리 요청합니다.
해당 포스팅에서는 여기까지만 다루고, 다음에 기회가 생기면 다뤄보도록 하겠습니다. 해당 내용은 공식 문서에서 확인할 수 있습니다.
Flux.zip()의 예시 코드를 확인해보도록 하겠습니다.
Flux<Integer> evenNumbers = Flux
.range(1, 5)
.filter(x -> x % 2 == 0);
Flux<Integer> oddNumbers = Flux
.range(1, 5)
.filter(x -> x % 2 > 0);
Flux<Integer> fluxOfIntegers = Flux.zip(
evenNumbers,
oddNumbers,
(a, b) -> a + b); // 3 7
📌 zipWith()
: 🟢.zipWith(🟨) -> {🛑}
zipWith()는 Flux에서 방출되는 요소를 파라미터로 받은 또 다른 Publisher 소스와 병합합니다.
// # Mono
Mono<Tuple2<T,T2>> zipWith(Mono<? extends T2> other)
Mono<O> zipWith(Mono<? extends T2> other, BiFunction<? super T,? super T2,? extends O> combinator)
// # Flux
Flux<Tuple2<T,T2>> zipWith(Publisher<? extends T2> source2)
Flux<Tuple2<T,T2>> zipWith(Publisher<? extends T2> source2, int prefetch)
Flux<V> zipWith(Publisher<? extends T2> source2, BiFunction<? super T,? super T2,? extends V> combinator)
Flux<V> zipWith(Publisher<? extends T2> source2, int prefetch, BiFunction<? super T,? super T2,? extends V> combinator)
combinator는 BiFunction 타입을 가지며 ((🟢, 🟨) -> 🛑), 해당 메소드를 호출하는 원본 소스와 해당 메소드의 파라미터로 입력받은 소스를 병합합니다.
아래의 예시를 확인해보도록 해볼게요.
Flux<Integer> evenNumbers = Flux
.range(1, 5)
.filter(x -> x % 2 == 0);
Flux<Integer> oddNumbers = Flux
.range(1, 5)
.filter(x -> x % 2 > 0);
Flux<Integer> fluxOfIntegers = evenNumbers
.zipWith(oddNumbers, (a, b) -> a * b); // 2 12
중첩 flatMap 해결
zipWith (+ 아래에서 다룰 zipWhen)은 종종 발생하는 중첩 flatMap을 해결하는데 큰 도움이 됩니다.
fun getProductById(productId: Int): Mono<Product> {
return productRepository.findById(productId)
.flatMap { product ->
colorRepository.findByProduct(productId).collectList()
.flatMap { colors ->
sizeRepository.findByProduct(productId).collectList()
.map { sizes ->
product.copy(colors = colors, sizes = sizes)
}
}
}
}
위와 같이 중첩된 flatMap을 zipWith을 통해 간단히 줄일 수 있습니다.
fun getProductById(productId: Int): Mono<Product> {
return productRepository.findById(productId)
.zipWith(colorRepository.findByProduct(productId).collectList())
.map { it.t1.copy(colors = it.t2) }
.zipWith(sizeRepository.findByProduct(productId).collectList())
.map { it.t1.copy(sizes = it.t2) }
}
참고로, 코드는 🔗 Avoiding nested streams in Spring Webflux & Kotlin 에서 참고했습니다.
상세 내용을 참고하면 이해에 큰 도움이 될 듯합니다.
📌 zipWithIterable()
: 🟢.zipWithIterable(🟨, 🟨, 🟨, …) -> {🛑}
zipWithIterable() 메소드는 Iterable이 방출하는 요소를 결합합니다.
Flux 클래스에서만 정의되어 있는데, 여러 요소를 방출하는 Flux의 특징을 생각하면 Flux만이 Iterator와의 zip연산을 수행할 수 있다는 것을 알 수 있습니다.
// Flux
Flux<Tuple2<T,T2>> zipWithIterable(Iterable<? extends T2> iterable)
Flux<V> zipWithIterable(Iterable<? extends T2> iterable,
BiFunction<? super T,? super T2,? extends V> zipper)
Iterable과 BiFunction을 받아 Flux로 감싼 데이터 소스를 반환하는 쉬운 예시로 준비했습니다.
Flux<Integer> f = Flux.range(1, 5)
.zipWithIterable(Arrays.asList(10, 20, 30, 40, 50), (a, b) -> a + b);
// output: 11 22 33 44 55
📌 zipWhen
: 🟢.zipWhen(⏱…🟨) -> {🛑}
zipWhen() 메소드는 입력 값으로 받는 소스가 방출되는 시점에 결합됩니다.
zipWhen() 은 Mono 에서 정의되었으며, 파라미터 값으로 rightGenerator을 받습니다. rightGenerator는 Mono로 감싼 소스 값을 반환하는 Function입니다. zipWhen() 메소드는 zipWhen() 호출하는 Mono 요소와 rightGenerator에서 방출되는 요소를 결합합니다.
Mono<Tuple2<T,T2>> zipWhen(Function<T,Mono<? extends T2>> rightGenerator)
Mono<O> zipWhen(Function<T,Mono<? extends T2>> rightGenerator, BiFunction<T,T2,O> combinator)
combinator는 Tuple2가 아닌 특정 값으로 반환하고 싶을 때 유용합니다. 원하는 방식으로 커스터마이징하거나 개발자가 원하는 오브젝트를 반환함으로써 활용할 수 있습니다.
예시를 통해 감을 익혀보겠습니다.
record User (
Integer userId,
String username,
List<String> attributes
) {}
...
Mono<User> userMono = Mono.just(new User(1, "Sunny", List.of("Lovely", "Curious")));
Mono<Boolean> emailSentMono = Mono.just(/* sending email logic and returning the result */ true);
Mono<String> databaseResultMono = Mono.just("'new user 'sunny' is created");
Mono<String> result = userMono
.zipWhen(user -> emailSentMono, (t1, t2) -> Tuples.of(t1, t2))
.zipWhen(tuple -> databaseResultMono, (tuple, databaseResult) -> {
User user = tuple.getT1();
Boolean emailSent = tuple.getT2();
return "Response: " + user + ", Email Sent: " + emailSent + ", Database Result: " + databaseResult;
});
result.subscribe(System.out::println);
// Response: User[userId=1, username=Sunny, attributes=[Lovely, Curious]], Email Sent: true, Database Result: 'new user 'sunny' is created
📌 zipDelayError
: ❌.zipDelayError(🟨) -> {…⏱…❌}
zipDelayError() 메소드는 소스에서 오류가 발생해도 오류를 바로 반환하지 않고 모든 소스가 완료될 때까지 보류합니다.
zipDelayError()는 오직 Mono에서만 정의되어 있으며, 하나 혹은 그 이상의 오류를 합쳐서 한 번에 반환합니다.
// Only for Mono
Mono<R> zipDelayError(Function<? super Object[],? extends R> combinator, Mono<?>... monos)
Mono<R> zipDelayError(Iterable<? extends Mono<?>> monos, Function<? super Object[],? extends R> combinator)
Mono<Tuple2<T1,T2>> zipDelayError(Mono<? extends T1> p1, Mono<? extends T2> p2)
Mono<Tuple3<T1,T2,T3>> zipDelayError(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3)
...
Mono<Tuple8<T1,T2,...,T8>> zipDelayError(Mono<? extends T1> p1, Mono<? extends T2> p2, ..., Mono<? extends T7> p7, Mono<? extends T8> p8)
첫 번째 살펴보았던 Tuple을 반환하는 zip() 메소드처럼, zipDelayError() 도 비슷한 형식을 취합니다. N개의 요소를 파라미터로 받아 TupleN의 타입을 Mono로 감싸서 반환합니다.
Exception boom1 = new NullPointerException("boom1");
Exception boom2 = new IllegalArgumentException("boom2");
StepVerifier.create(Mono.zipDelayError(
Arrays.asList(Mono.just("foo"), Mono.<String>error(boom1), Mono.<String>error(boom2)),
Tuples.fn3()))
.verifyErrorMatches(e -> {
return e.getMessage().equals("Multiple exceptions") &&
e.getSuppressed()[0] == boom1 &&
e.getSuppressed()[1] == boom2;
});
예시를 보면 NullPointerException과 IllegalArgumentException을 한 번에 반환하는 것을 확인할 수 있습니다.
'Spring' 카테고리의 다른 글
Reactor 제대로 이해하기, Marble Diagram (4) | 2022.11.30 |
---|---|
Reactor, 제대로 사용하기 - Error Handling (0) | 2022.11.27 |
Reactive Programming, 제대로 이해하기 (5) | 2022.11.16 |
Spring WebClient, 제대로 사용하기 - exchange (0) | 2022.08.25 |
Spring WebClient, 제대로 사용하기 - retrieve (0) | 2022.08.24 |
Backend Software Engineer
𝐒𝐮𝐧 · 𝙂𝙮𝙚𝙤𝙣𝙜𝙨𝙪𝙣 𝙋𝙖𝙧𝙠