Reactive Programming, 제대로 이해하기

2022. 11. 16. 23:58Spring

Spring의 WebFlux에 대한 개념을 쉽고 정확하게 이해하는 것을 목표로 합니다.

 

 

| Reactor Series | 

👉🏻 Reactive Programming, 제대로 이해하기

Reactor 제대로 이해하기, Marble Diagram

- Reactor, 제대로 이해하기 - Flux Create

 

 

Background

많은 서비스를 한 앱에 구현하는 monolithic 구조는 더 이상 사용하지 않습니다. 빠른 속도를 요구하는 시대에서 monolithic 구조는 문제가  점점 도태되었습니다. 대신 작게 분산된 컴포넌트를 구성하여 연결하는 microservice 구조(MSA)와 이를 서비스하는 클라우드 기반의 클러스터를 선호하게 되었습니다. Microservice 구조는 한 앱을 작은 컴포넌트로 분산시킨 뒤, 각 컴포넌트가 서로 데이터를 주고 받으며 하나의 서비스를 제공합니다. 분산된 구조로 높은 확장성을 가능하게 하고 자원을 효율적으로 쓰면서 응답 속도를 빠르게 하는 구조가 될 수 있었죠.

 

이 때, 각 서비스의 통신 방식에 대해 살펴봅시다. 기존의 API 통신 방식은 각 요청마다 하나의 스레드를 할당합니다. 이때, Thread pool 사이즈를 제한하여 스레드의 수가 너무 많아지지 않게 조정합니다. 참고로, Tomcat의 Thread pool 기본 설정 값은 200개(connections) 로, 한 번에 200개를 할당하여 처리할 수 있는 것입니다. 각 스레드는 메모리를 할당받아 사용하는데, 이 때 스레드의 수가 너무 많아지면 성능 저하의 원인이 될 수 있습니다. 그래서 앱의 인스턴스를 더 많이 띄우는 방식(spinning up)으로 수평 확장(horizontal scaling) 을 사용하곤 합니다. 하지만 이렇게 새로운 인스턴스를 만드는 것은 상당한 금액이 필요합니다.

 

 

Sync & Blocking

 

위에 소개한 기존의 REST API는 Blocking과 Synchronous 방식으로 처리됩니다. 서버에 HTTP 요청으로 유저 리스트를 요청한다고 가정해봅시다. 새로운 요청이 서버로 들어오면 서블릿 스레드가 그 요청을 할당받아 요청 로직을 처리하고 DB 요청을 보냅니다. 이 때, 요청을 할당 받은 그 스레드는 DB 호출을 후 DB에서 응답을 줄 때까지 기다립니다(Blocking). 200개의 요청을 한 번에 받을 수 있지만 201번째 들어오는 요청은 이전의 요청 200개가 할당된 스레드의 작업이 종료될 때까지 대기 상태이어야 한다는 의미입니다.

 

그리고, 지금부터 이보다 더 좋은 Event-driven 방법을 소개해드립니다.

 

 

 

Event-driven

스레드가 요청을 처리하고 DB 응답을 기다리는 대신, 종료되었다는 소식을 받을 때까지 다른 일을 하고 있다면 어떨까요? 다음 요청을 처리할 수 있게끔 요청을 받는 스레드를 자유롭게 풀어주는 겁니다. 조회할 데이터를 가진 DB에게 작업을 맡기고, 그 작업을 마치면 호출(Callback)을 할 수 있는 구조입니다. 요청을 맡긴 후 놀고 있는(free) 스레드는 다른 데이터를 처리할 수 있습니다. 훨씬 좋아 보이죠. 이런 방식은 서블릿 스레드가 대기 상태가 되는 것을 피할 수 있습니다.

 

 

 

이런 방식(mechanism)을 이벤트 루프(Event Loop)라고 합니다. 요청이 들어올 때면 기본적인 프로세스를 동일하게 진행하되, DB Driver에게 IO Operation을 이벤트 형식으로 위임합니다. 이 방식은 차례대로 이벤트를 발생시키고, 데이터가 준비되면 언제든 데이터를 가져갈 수 있습니다. 스레드풀 내의 어떤 스레드든 데이터를 처리하고 응답을 반환할 수 있습니다. 덕분에 CPU를 더 효과적으로 사용할 수 있게 하고, 동시에 들어오는 요청을 유연하게 처리할 수 있게 합니다.

 

하지만, 아직 해결하지 못한 기존의 문제가 하다 더 있습니다. 가령 응답의 사이즈가 너무 클 때와 같은 상황입니다. 만약 요청했던 데이터가 우리가 처리할 수 있는 것보다 훨씬 크고, 만약 프론트 앱이 받기 힘든 수준이라면 어떻게 해야할까요? 더 유연한 통신을 할 수는 없을까요? 여전히 이런 문제가 남아있다면 리액티브 프로그래밍이라고 할 수 없겠죠 😉

 

 

 

Async & Non-Blocking

리액티브 프로그래밍은 비동기과 이벤트 호출(event-driven)인 논블럭 방식의 앱이며 확장을 위해서 적은 스레드만을 필요로합니다.
또 바로 위의 문제를 풀 열쇠를 가질 수 있는데, producer가 consumer에게 무리한 압박을 주지 않는 backpressure 개념을 제공합니다. 예를들어 데이터 소스에서 HTTP 소켓까지 이동하는 리액티브 컴포넌트로 만든 파이프라인에 클라이언트는 데이터의 크기를 다루지 못할 때, 데이터의 움직임은 클라이언트가 처리할 수 있을 때까지 속도를 낮추거나 멈춥니다.

 

 

Backpressure

Backpressure는 '배압'이라는 의미를 가집니다. 흔히 배압이라고 하면 유체 흐름의 저항력을 의미합니다. 물이 흐르는 배수관의 통로 크기가 갑자기 작아지는 구간을 생각해보세요. 가령, 10m 였던 배수관의 너비가 1m로 갑자기 작아지는 구간이 있을 때, 오른쪽으로 흐르던 물의 흐름은 해당 구간에서 왼쪽 방향으로 힘을 받게 될 것입니다. 이 힘을 받아 10m만큼의 물이 전부 흐르지 않고 1m만큼의 물이 흐를 수 있게 되죠. 일상 생활에서 세면대 아래의 구부러진 배수관도 이 원리를 이용합니다. 구부러지지 않고 직선으로 흐른다면 물은 굉장히 빠르게 지나갈테지만, 구부러진 구간에서 아래로 흐르던 물은 반대 방향의 힘을 받아 한 번에 너무 많은 물들이 내려가지 않도록 조절합니다.

 

소프트웨어에서의 Backpressure의 개념도 비슷합니다. Publisher(게시자)가 아주 많은 데이터를 한 번에 보내도 이런 '배압' 역할을 하는 무엇인가 존재한다면, 느린 Consumer가 작업할 때 무리를 덜 수 있게됩니다. 

 

 

 

https://impurepics.com/posts/2018-05-26-backpressure-origins.html

 

 

Backpressure가 존재한다면 느린 Consumer는 빠른 Producer의 작업을 수월하게 진행할 수 있겠지만, Backpressure이 없다면 Consumer는 큰 부하를 받아 결국 작동할 수 없게 됩니다.

 

 

https://impurepics.com/posts/2018-05-26-backpressure-origins.html

 

 

 

 

 

Reactive Programming

리액티브 프로그래밍은 데이터 스트림을 비동기 처리하는 선언형 프로그래밍입니다. 선언형 프로그래밍이란 기존의 명령형imperative 프로그래밍 방식과 대비되는 새로운 프로그래밍 패러다임으로, 라인 단위의 프로그래밍 과정과 달리 특정 목적과 같이 무엇을 하는 지를 명시하여 개발하는 과정입니다. 리액티브 프로그래밍은 아래의 3가지 측면으로 기존 프로그래밍 방식의 문제점들을 해결하기 위해 등장했습니다.

 

 

 

#1. Data Stream

: A flow of data objects from one function to the next

 

연쇄적인 여러 함수 사이에서 모든 데이터를 한 번에 처리하는 것이 아닌 시간에 따른 데이터의 흐름으로 처리합니다.

 

 

 

#2. Functional Programming

: Functions are meant to be pure and all functions have an input and output data types. No external state is modified.

 

선언형 프로그래밍의 방식으로써, 함수형 프로그래밍은 사이드 이펙트(외부 상태로 내부 상태 값이 변경되는 효과) 없는 순수함수 형태를 가집니다. 즉, 데이터들을 특정 절차로 처리할지를 생각하는 것(imperative)보다는 특정한 목적을 가진 묶음(functional)으로 데이터를 처리할지를 생각하는 것입니다.

 

가령, add(x, y) -> return x + y 와 같은 형식을 띕니다.

 

 

 

#3. Asynchronous observers

: Uses the Observer pattern with Subject data sources that notify Observers. Errors propagated throught the flow of data.

 

데이터 소스의 비동기 처리 과정을 수시로 확인(관찰자) 합니다. 오류가 발생하면 데이터 흐름을 통해 전파됩니다.

 

 

 

 

Principle

Reactive Programming 은 하나의 이론입니다. "내 코드에 어떻게 적용하지?" 를 고려할 때에는, Reactive Programming을 구현한 다양한 라이브러리 확장들을 사용합니다. ReactiveX의 languages 를 확인해보면 RxJava, RxSwift, TxJS 등의 언어 별 사용 기술이 명시되어 있습니다.

 

Reactive programming은 새로운 프로그래밍의 패러다임이며, 비동기 방식에 논블럭킹으로 처리합니다. 데이터는 이벤트/메세지로 발동되는driven 스트림으로 흐릅니다. 스레드는 Blocking 되지 않고 데이터가 사용될 수 있게되면 바로 스트림을 통하여 사용자에게 보내집니다. 한 번 모든 데이터가 반환될 때 성공적이라면 onComplete() 이벤트를 통해 얻을 수 있고, 만약 프로세스 중 예외가 발생한다면 onError() 이벤트로 받습니다. 또한 onNext() 이벤트를 통해 observer가 시퀀스 내 새로운 요소 발견하는 것을 알려notification줄 수 있습니다.

 

리액티브 프로그래밍 환경은 새롭게 다가올 텐데요. 이렇게 새로운 개념이 등장하면 항상 표준이 필요합니다. 그렇지 않으면 혼돈에 빠지기도 하고 제각기 다른 모습들로 관리하기가 어려워집니다. 때문에 Reactive Streams specifications 이 등장하면서 비동기와 Non-blocking Backpressure을 위한 표준 스펙을 정의합니다. 그리고, 이러한 표준 스펙을 기준으로Reactive Library들이 생기게끔 만듭니다.

 

 

 

API는 4개의 핵심 인터페이스로 구성됩니다: Publisher, Subscriber, Subscription, Processor

 

 

✔️ Publisher

Publisher는 잠재적으로 정해지지 않은 수의 연속적인sequenced 원소들의 데이터 자원(database, API call, etc...)이고 Subscriber의 수요를 통해서 Publish 됩니다.

public interface Publisher<T> {
  public void subscribe(Subscriber<? super T> s);
}

 

✔️ Subscriber

consumer라고도 하며, 이 인터페이스는 언제 그리고 어떻게 해서 Subscriber의 많은 원소들을 사용할 수 있으며, 데이터 스트림을 받아 들일 수 있는 준비가 되면 데이터를 받는 다는 책임을 결정establish한다는 목적을 가집니다. Subscriber 인터페이스는 4개의 추상 메소드를 가집니다.

 

public interface Subscriber<T> {
  public void onSubscribe(Subscription s);
  public void onNext(T t);
  public void onError(Throwable t);
  public void onComplete();
}

 

 

✔️ Subscription

Subscriber와 Publisher 사이의 단 하나 존재하며Unique 관계 를 나타내며 두 개의 메소드를 갖습니다다.

 

public interface Subscription {
  public void request(long n);
  public void cancel();
}

 

✔️ Processor

이름이 표현하듯이 하나의 프로세싱 단계를 의미하는데, Subscriber와 Publisher 사이의 중간에 위치하며 둘의 관계 하에서 실행obey됩니다.

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

 

 

 

전체적인 그림을 담은 다이어그램을 보면 아래와 같습니다.

 

 

 

 

#1. Subscriber  : Publisher에 구현된 subscribe()를 호출

#2. Publisher    : Consumer로 Element(데이터) 전송을 승인하여 Subscription 전달

#3. Subscriber  : Subscriber는 Subscription 인스턴스를 얻어 request(n) 메소드로 Element를 요청

#4. Publisher     : Response의 크기를 확인하면서 더 이상 응답을 하지 않았다면 기본적으로 Publisher는 모든 데이터들을 반환

모든 데이터를 전송하기 위해서 onNext() 메소드를 사용합니다. response의 사이즈를 명시적으로 지정하는데, backpressure 를 구현하기 위한 예시로 볼 수 있습니다. Consumer가 response의 크기를 확인하며 전송을 조정한다는 의미입니다. 데이터가 한 번 반환되면 publisher는 onComplete() 이벤트를 전송합니다. Subscriber는 또한 데이터 흐름의 취소 옵션을 가지는데 바로 Subscription 인터페이스의 cancel() 메소드를 호출할 수 있습니다.

#5. Publisher     : 데이터가 모두 반환되면 Publisher는 onComplete() 이벤트를 전송

Subscriber는 또한 데이터 흐름의 취소 옵션을 가지는데 바로 Subscription 인터페이스의 cancel() 메소드를 호출할 수 있습니다. Subscription의 인스턴스 이후에 호출될 수 있는데, 이 subscription은 취소될 수 있으며 publisher는 더 이상의 이벤트를 송출하지 않습니다.

 

 

 

 

 

Project Reactor

Project Reactor는 위에서 언급한 말한 인터페이스 구현을 가지며 바로 JVM 위에서 Non-blocking을 구현한 Reactive library입니다. 공식문서를 보면 이 library는 다양한 모듈로 구성되었습니다. reactive-core 모듈에서 Flux와 Mono라는 단어를 찾을 수 있는데, 바로 Reactive Stream Publisher를 나타냅니다. 이 둘의 가장 다른 점은 원소를 방출emit하는 횟수입니다.

 

Flux는 0부터 N개의 원소를, Mono는 단 하나의 원소를 방출합니다.

이 두개는 Project Reactor되거나 혹은 에러를 가질 수 있습니다.

 

 

Flux

 

 

 

위 그림에 대해 설명하자면, 위에서 부터, 맨 위의 원들은 데이터 소스로 부터 제공되는 "검색한 데이터"를 나타내며, 중앙의 박스는 operator는 "데이터 처리"를 나타냅니다. 맨 아래의 원들은 "Subscriber에게 전송될 데이터"를 나타냅니다.

Mono는 데이터 소스에서 Element(데이터 구성 요소)를 단 하나만 원할 때 사용되는데요. 가령, 흔히 CRUD 구조에서 하나의 유일한 값을 가진 ID로 사용자를 검색하는 것을 생각해볼 수 있습니다.

 

Mono

 

 

 

FYI, Reactive Stream - reactive streams JVM Github 를 참고하시면 많은 도움이 될 듯합니다.

 

 

 

 

 

WebFlux

Spring을 사용하여 Reactive 웹 앱을 생성build하기 위해서는 Spring WebFlux가 필요하며, Spring WebFlux는 Spring 5에서 도입되어 웹앱을 위한 Reactive Programming 을 제공합니다. Spring Webflux는 내부적으로 Project Reactor를 사용하고 Publisher의 구현체인 Flux 와 Mono를 사용합니다. WebFlux는 아래 두 개의 프로그래밍 모델을 갖습니다.

 

✔️ Annotation based

✔️ Functional routing and handling

 

비동기이자 논블럭킹 앱을 구현하기 위해서 우리는 Servlet 컨테이너가 필요합니다. 만약 Spring WebFlux를 사용한다면 Spring Boot는 기본 서버로 Reactor Netty를 사용하며, Reactor Netty는 비동기의 Event-driven 의 네트워크 앱 프레임워크으로 높은 성능을 유지하는 서버 & 클라이언트 프로토콜입니다. 여기서도 async 와 event-driven라는 키워드가 보이죠?

 

 

 

 

Netty 서버에 요청이 있을 때, 클라이언트를 Blocking 하지 않는 방식으로 Future로 즉시 응답하기 때문에 대기 상태없이 작업을 진행할 수 있습니다. 또한 큰 장점 중 하나는 많은 connection을 처리할 수 있다는 점입니다. Netty로 전달되는 이벤트는 클라이언트의 요청을 의미할 수도, 혹은 Netty로 부터의 응답이 될 수도 있습니다.

 

클라이언트와 서버 사이의 Connection은 channel을 통해 생성됩니다. Netty의 이벤트 루프는 이벤트를 찾는데, 모든 이벤트를 처리할 때까지 FIFO 형식의 이벤트 루프와 이벤트를 저장하는 내부 이벤트 큐를 가집니다. ChannelHandler는 호출을 위한 정확한 endpoint를 찾는데 @RestController나 함수형 웹 endpoint의 어노테이션이 붙여진 클래스를 찾습니다. Netty 이벤트 루프는 각 요청을 Channel과 연결하는데, 이를 통해 응답을 전송해야 하는 채널을 알 수 있습니다.

 

 

 

 

 

Source:

- Intro to Reactive Programming by Jordan Jozwiak of Google - CS50 Tech Talk

- reactive-programming-with-spring-boot-and-webflux