Reactor 제대로 이해하기, Marble Diagram

2022. 11. 30. 23:27Spring

반응형

본 포스팅은 Project Reactor를 Marble Diagram을 읽는 방법과 함께 쉽게 이해하는 것이 목표입니다.

 

 

시리즈가 시작됐습니다 🔥

프로젝트 리액터를 파볼 예정입니다.

Reactor는 Spring의 WebFlux가 채택한 Reactive Library입니다. 즉, WebFlux의 기반이자 기본 구성 요소이 됩니다.

 

 

| Reactor Series | 

- Reactive Programming, 제대로 이해하기

👉🏻 Reactor 제대로 이해하기, Marble Diagram

Reactor, 제대로 이해하기 - Flux Create

 


 

 

Reactor는 JVM 환경에서 Non-blocking의 Reactive Programming을 전적으로 지원하며, "Backpressure"을 관리하면서 데이터의 흐름을 효과적으로 제어해줍니다. Java 8의 Functional API을 사용하며, 특히 CompletableFuture, Stream, Duration와의 결합된 함수를 많이 볼 수 있습니다. reactor-netty 프로젝트와 동일하게 Non-blocking의 IPC(inter-process communication, 프로세스 간 통신)을 지원합니다. Microservices Architecture 에 적합하며, Reactor Netty는 HTTP (including Websockets), TCP, UDP 을 위한 과도한 요청을 대비한 Backpressure기반의 네트워크 엔진을 지원합니다. Reactive encoding과 decoding이 전적으로 지원됩니다.

 

 

 

Marble Diagrams

Flux와 Mono를 처리하면서 다양한 API를 사용합니다. 이 때, API를 이해하기 위해 projectreactor에서는 쉬운 이해를 위해 Marble Diagram을 제공합니다. Reactor의 공식 문서나 javadoc에서 해당 API에 대한 설명으로 확인할 수 있습니다.

 

하지만 역설적이게도 이 다이어그램이 이해가 어려워 Reactor를 두려워하는 사람이 있는데요.

이 다이어그램은 한 번 이해해두면 정말 유용하기 때문에 이번 포스팅에서 중점적으로 다뤄보려 합니다.

 

 

Timeline

 

아주 단순한 화살표 표시로, 처리 흐름을 의미합니다.

타임라인은 왼쪽에서 오른쪽 방향으로 읽으며, 타임라인의 왼쪽 방향일수록 더 이후에 발생한 사건임을 의미합니다.

Timeline(left to right)

 

수학이나 통계 시간에 배운 그래프의 X 좌표가 시간의 흐름을 의미하는 것과 동일합니다.

개인적인 생각으로 이벤트가 발생한 시간의 흐름이 아니라,

이벤트가 발생한 순서 혹은 읽는 순서라고 생각하는 것이 더 이해하기 쉬울 것 같습니다.

즉, "이 방향으로 읽으세요" → 라고 명시한 것이죠.

 

 

 

Element

두 번째로는 이 타임 라인을 지나는 도형 (🟢🔺🟧🔸 )을 확인할 수 있습니다.

색과 모양과는 상관없이, 타임라인 위의 도형은 Mono나 Flux를 사용하면 다루게 될 데이터 요소를 의미합니다.

 

 

 

Element

 

 

Mono는 0개 혹은 1개의 Element를 Flux는 0개 혹은 그 이상의 Element를 다룹니다.

 

이 때, 타임라인은 왼쪽에서 오른쪽으로 이벤트가 발생하는 것을 표시하기 때문에,

초록색 요소(🟢) 먼저 등장(emit) 하고, 그 이후에 노란색 요소(🟡)가 등장했다고 읽으면 됩니다.

즉,  🟢 🟡 🔵 → 🟠 → 🟣 순으로 등장한 것을 알 수 있습니다.

 

Reactor에서 데이터는 시간에 따라 연속적으로 발생할 수 있으며, 한 개의 데이터 단위를 요소(Element)라고 부릅니다.

또, 이 요소가 발생하는 것을 일종의 이벤트로 봅니다. (Reactor라는 이름을 생각해보면 납득이 가는 듯 합니다)

때문에 특정 시점에 요소가 등장하는 것을 방출(emit)하는 것으로 표현합니다.

즉, 위의 다이어그램에서 타임라인 위에 도형 하나가 위치한 것은 요소가 방출된 것(Element is emited) 입니다.

Reactor에 대한 공식 문서와 해당 포스팅의 이하 내용을 확인할 때 참고하시길 바랍니다. 

 

 

 

Subscription

더 많은 내용을 다루기 전에 반드시 알고 가야하는 내용을 짚고 가겠습니다.

 

Mono와 Flux는 Publisher이며, 이벤트를 방출시키는 역할을 합니다. 즉, 원하는 데이터를 만들어 제공하는 역할을 하죠. Mono.create( source... ), Flux.create(source ... ), Flux.generate( source... ) 와 같은 형식으로 소스를 생성해냅니다.

 

 

 

 

 

Mono와 Flux로 생성된 데이터는 총 두 가지 방식인 '동기' 혹은 '비동기'로 읽을 수 있습니다.

동기 방식의 대표적인 메소드는 block(), 비동기 방식의 대표적인 메소드는 subscribe() 가 있습니다.

 

예를 들어, Mono.create() 를 한 후 위의 두 방식 중 하나로 읽어 올 수 있는데,

Mono.create(...).subscribe() 와 같은 모습이 될 수 있습니다.

 

 

 

 

onNext

Flux 같은 경우에는 하나 이상의 요소를 제공할 수 있습니다.

요소를 구독(subscribe)하고 나서 다음 요소를 읽기 위해서는 onNext()를 사용해서 다음 요소를 읽습니다.

이 때, onNext는 하나의 요소를 기준으로 타임라인의 오른쪽에 있는 모든 요소가 됩니다.

 

첫 번째 인자 🟢 를 기준으로 onNext()를 수행하면 다음 인자 🟡를 불러오게 되고, 그 다음 onNext()를 수행하면 그 다음 인자 🔵를 불러올 수 있으며, → 🟣 → 🟠 순으로 불러옵니다.

 

모든 인자를 읽어오면 완료하게 되죠. 이 표시는 바로 아래서 확인하겠습니다.

 

 

 

 

Terminal

요소들은 총 3가지의 진행 과정을 가집니다.

: Complete, Error, Process

 

 

 

 |   : 처리되는 요소들의 정상적인 완료를 의미합니다.

 

 

 

: 처리되는 요소들이 오류를 가지고 종료됨을 의미합니다.

 

 

 

: 아무 표시 없이 계속되는 표시는 종료없이 계속 진행됨을 의미합니다.

 

 

 

 

 

 

Input / Operator / Output

위에서 다룬 타임라인은 아래 그림과 같이 operator를 기준으로 두 개 이상 표시됩니다.

Operator의 위에 위치하면서 Operator를 향하는 화살표가 표시가 있는 타임라인의 요소는 Input/Source 데이터를 의미합니다.

Operator의 아래에 위치하면서 Operator에서 파생된 화살표가 지목하는 타임라인의 요소는 Output 데이터를 의미합니다.

Operator는 보통, Input/Source 데이터를 인자로 받아 가공하여 Output으로 반환하는 역할을 하는 연산자를 의미합니다.

 

 

 

위의 다이어그램은 Mono<T> output = source.operator( ... ) 와 같은 형태를 가질 수 있습니다.

 

 

 

EX: filter(🟡 ->  🔘 or ○)

 

아주 간단한 filter 사용 예시로 위의 내용들을 살펴보겠습니다.

 

 

 

 

filter 메소드는 요소 하나하나를 확인하며 조건과 일치하는 요소만을 반환하며, 만약 일치하지 않으면 반환하지 않습니다.

 

 

 

 

 

위의 다이어그램은 아래 코드를 도식화한 그림입니다.

순서대로 2, 30, 22, 5, 60, 1 데이터를 갖는 이벤트가 발생했으며, filter Operator를 통해 30, 22, 60이라는 Output을 반환합니다.

 

Flux<Integer> source = Flux.just(2, 30, 22, 5, 60, 1);
Flux<Integer> output = source.filter(x -> x > 10);

output.subscribe(x -> System.out.println(x + " ")); 
// output: 30 22 60

 

위의 코드는 아래와 정확히 일치합니다.

 

Flux.just(2, 30, 22, 5, 60, 1)
    .filter(x -> x > 10)
    .subscribe(x -> System.out.println(x + " "));
// output: 30 22 60

 

 

 

Signal

Reactor는 Signal을 다루며 Signal에 따라 특정 이벤트가 발생합니다.

가령, subscribe, cancel, request 와 같이 순서대로 방출되는 요소를 가져오는 구독 이벤트, 요소를 더 이상 가져오지 않는 취소 이벤트, 요소를 호출하는 요청 이벤트(unbounded는 무한한 요청, 1은 요청 한 번을 의미)가 있습니다.

 

 

요청 이벤트는 위와 같이 표시됩니다.

subscribe, request, cancel 로 표시되는 이벤트가 보이는 데요.

하나씩 확인해보도록 하겠습니다.

 

 

 

 

Subscribe

아래 그림은 요소를 받고 있지 않은 상태에서 subscribe()을 요청하면 그 때부터 요소를 받아옵니다.

 

 

 

그리고 모든 요소를 받은 후 (정상/오류) 완료 상태를 갖게되는 것을 확인할 수 있습니다.

위의 그림은 아래 코드와 동일합니다.

 

Flux.just(1, 2).subscribe();

 

1이 초록색으로 표시된 요소(🟢), 2가 노란색으로 표시된 요소(🟡)입니다.

 

 

 

 

Request & Cancel

Request와 Cancel은 Subscription 인터페이스의 메소드입니다.

해당 요청을 보낼 때 아래의 그림과 같이 표시됩니다.

 

⚠️ Cancel 된 요소는 그림과 같이 속이 빈 도형 ⚪️ 으로 표시됩니다.

 

 

 

 

위의 그림을 아래의 코드와 동일합니다.

 

Flux.range(1, 10)
    .doOnRequest(r -> System.out.println("request of " + r))
    .subscribe(new BaseSubscriber<Integer>() {

      @Override
      public void hookOnSubscribe(Subscription subscription) {
        requestUnbounded();
      }

      @Override
      public void hookOnNext(Integer integer) {
        System.out.println("Cancelling after having received " + integer);
        cancel();
      }
    });

 

BaseSubscriber를 익명 클래스로 구현한 형태입니다. 

처음 request()로 요청 후 다음 인자에서 취소 cancel() 하는 예시입니다.

 

request of 1
Cancelling after having received 1

 

 

 

 

Functional API

첫 소개말에서 Java 8의 Functional API을 사용하며, 특히 CompletableFuture, Stream, Duration과 결합된 함수가 많다고 했는데요. 이번엔 Functional API의 다이어그램 표시 방법에 대해 살펴보겠습니다.

 

주의할 점은, Operator 자체가 Functional API로 실행되는 것Operator의 파라미터로 Functional API를 받아서 실행되는 것을 구별해야하는데요. Operator 자체가 Functional API로 실행될 때는 아래와 같이 표현할 수 있습니다.

 

1. Functional API Operator

 

 

 

다이어그램을 보면 납득되는 도형으로 표시되었습니다.

예시 하나를 확인해보며 이해를 더해보도록 하겠습니다.

 

 

 

EX: map(Function: 🟡 → 🟧 )

public final <R> Mono<R> map(Function<? super T,? extends R> mapper)

map은 A 타입의 요소를 다른 B 타입으로 전환할 수 있습니다. 

 

 

예를 들면 아래와 같은 코드가 될 수 있죠.

Flux.range(1, 4).map(i -> "DATA" + i + ".").subscribe(System.out::print);
// DATA1.DATA2.DATA3.DATA4.

 

 

 

2. Operator(Functional API)

이번에는 Operator의 파라미터로 Functional API를 받아서 실행될 때의 도식을 확인해 보도록 하겠습니다.

 

 

 

 

조금 헷갈릴 수 있는데요. 위의 도식은 아래와 같이 표시될 수 있습니다.

 

 

 

 

source.sideEffectOp(Functional<> param) 와 같이 실행되는 코드들의 모습입니다.

이번에도 간단한 예시를 확인해보도록 할게요.

 

 

 

 

EX: subscribe( ⏺ )

public final Disposable subscribe(Consumer<? super T> consumer)

 

subscribe() 와 동일한 역할을 하며, 사실 위에서 사용한 적이 있는 메소드입니다.

구독한 요소를 읽고나서 처리할 Consumer를 지정하도록 의도된 메소드입니다.

 

 

 

 

가령 아래와 같은 코드로 표시될 수 있습니다.

 

Flux.range(1, 3).subscribe(i -> System.out.print(i + "."));
// 1.2.3.

 

 

 

 

위에서 살펴본 내용을 이해하시면 기본적인 다이어그램은 쉽게 이해할 수 있습니다.

마블 다이어그램이 얼마나 유용한지도 이해할 수 있습니다.

기본적인 표현식을 이해하면 나머지 처음보는 도식도 어느정도 느낌이 오긴하지만,

모르겠다면 [projectreactor.io] how-to-read-marble-diagram 에서 필요할 때마다 확인해볼 수 있습니다.

 

그럼 시리즈의 다음 편으로 찾아보겠습니다.

 

 

 

| 참고 |

- [projectreactor.io] how-to-read-marble-diagram

- [projectreactor.io] Mono

- [projectreactor.io] Flux

- [medium] read-marble-diagrams-like-a-pro

 

 

반응형

Backend Software Engineer

Gyeongsun Park