Reactor, 제대로 이해하기, Flux Create

2022. 12. 8. 23:58Spring/Java

반응형

Reactor Publisher인 Flux를 생성하는 방법에 대해 이해하고 예제 코드로 직접 코드를 확인하는 것이 해당 포스팅의 목적입니다.

 

 

| Reactor Series | 

Reactive Programming, 제대로 이해하기

Reactor 제대로 이해하기, Marble Diagram

👉🏻 Reactor, 제대로 이해하기 - Flux Create

 

.from()

🔗 Reactor Link

: Publisher로 부터 새로운 Flux 인자 방출emit

 

Flux<T> from(Publisher<? extends T>  source)

 

Flux.from() 메소드는 특정 Publisher로 부터 Flux를 생성하는 API 입니다.

이를 특정 Publisher를 Decorate 한다고 표현합니다.

 

가령 기존의 Mono형태의 데이터 소스를 Flux로 재생성한다거나, 

Flux였던 데이터 소스를 원하는 양식의 Flux로 생성할 수 있습니다.

 

 

 

 

✔️ Type - T  : 파라미터 값인 source와 output sequences의 타입

✔️ Params - Publisher source : Flux로 Decorate된 기존의 Publisher source

✔️ Returns  : 새로운 Flux 객체

 

 

👉🏻 Example

// from Flux source
Flux<Integer> m = Flux.wrap(Flux.just(4123));
StepVerifier.create(m).expectNext(4123).verifyComplete();

// from Mono source
Flux<Integer> m = Flux.from(Mono.just(1));
StepVerifier.create(m).expectNext(1).verifyComplete();

// from empty
Flux<Integer> m = Flux.from(Mono.empty());
assertThat(m).isSameAs(Flux.<Integer>empty());
StepVerifier.create(m).verifyComplete();

// from error
Flux<Integer> m = Flux.from(Mono.error(new Exception("test")));
StepVerifier.create(m).verifyErrorMessage("test");

 

 

 

.fromArray(T[])

🔗 Reactor Link

: 파라미터로 입력된 배열에 포함된 아이템을 새로운 Flux 인자로 방출emit

public static <T> Flux fromArray(T[] array)
 
 
 

 

 

 

✔️ Type Parameters T  : 파라미터 값인 input source 배열과 output Flux의 공통 타입

✔️ Parameters source : 변환하고자 하는 배열 값

✔️ Returns                     : 새로운 Flux 객체

 

 

👉🏻  Example

// from string array
String source = "Santa tell me";
Flux<String> st = Flux.fromArray(source.split(" "));

StepVerifier.create(st)
	.expectNext("Santa")
	.expectNext("tell")
	.expectNext("me")
	.verifyComplete();            


// from empty array
assertThatExceptionOfType(NullPointerException.class).isThrownBy(() -> {
	Flux.fromArray((Integer[]) null);
});

 

 

 

.fromIterable(Iterable)

🔗 Reactor Link

: 파라미터로 입력된 Iterable에 포함된 아이템들을 새로운 Flux 인자로 방출emit

Flux<T> fromIterable(Iterable<? extends T> it)

 

순회 Operator인 Iterable.iterator() 는 각 구독자로부터 최소 한 번 최대 두 번 호출될 수 있습니다.

 
 
 

 

 

✔️ Type Parameters T  : 파라미터 값인 input source Iterator output Flux의 공통 타입

✔️ Parameters source : 변환하고자 하는 Iterator 값

✔️ Returns                     : 새로운 Flux 객체

 

 

Reactor project의 공식 문서를 보면, 아래와 같이 메소드의 동작이 discard 될 때 주의사항을 명시하고 있습니다.

Discard Support: Upon cancellation, this operator attempts to discard the remainder of the Iterable if it can safely ensure the iterator is finite. Note that this means the Iterable.iterator() method could be invoked twice.

 

Discard : 취소 시, iterator에 남은 순회 요소가 있다는 것이 안정적으로 확인된다면, 나머지 요소를 제거합니다. 이는 Itable.iterator() 메서드가 두 번 호출될 수 있음을 의미합니다.

해당 내용은 Github Issue #2014Github Issue #2021 를 참고할 수 있습니다.

 

👉🏻  Example

// from basic Iterable
Flux<Integer> flux = Flux.fromIterable(Arrays.asList(1, 2, 3, 4, 5, 6));
StepVerifier.create(flux).expectNext(1, 2, 3, 4, 5, 6).verifyComplete();

// from Iterator
final int max = 10;
Iterable<Integer> iterable = () -> new Iterator<Integer>() {
	int i = 0;

	@Override
	public boolean hasNext() {
		return i < max;
	}

	@Override
	public Integer next() {
		return i++;
	}
};

StepVerifier.create(Flux.fromIterable(iterable), 0)
	.expectSubscription()
	.thenRequest(5).expectNext(0, 1, 2, 3, 4)
	.thenRequest(5).expectNext(5, 6, 7, 8, 9)
	.verifyComplete();


// occurs error from Iterable
Flux<Integer> flux = Flux.fromIterable(Arrays.asList(1, 2, 3, null, 5, 6));
StepVerifier.create(flux).expectNext(1, 2, 3).expectError().verify();

 

 

 

 

.fromStream(Stream)

🔗 Reactor Link

: 파라미터로 입력된 Stream에 포함된 아이템들을 새로운 Flux 인자로 방emit

Flux<T> fromStream(Stream<? extends T> s)

 

Stream은 재사용할 수 없으며, 구독을 여러 번하거나(repeat()) 재구독(retry()) 시 문제가 될 수 있음을 명심하세요.

Stream은 취소, 에러, 정상 완료 시 자동으로 닫힙closed니다.

 

 

 

✔️ Type Parameters T  : 파라미터 값인 input source Stream output Flux의 공통 타입

✔️ Parameters source : 변환하고자 하는 Stream 값

✔️ Returns                     : 새로운 Flux 객체

 

 

Reactor project의 공식 문서를 보면, 아래와 같이 메소드의 동작이 discard 될 때 주의사항을 명시하고 있습니다.

 

Discard Support: Upon cancellation, this operator attempts to discard remainder of the Stream through its open Spliterator, if it can safely ensure it is finite (see Operators.onDiscardMultiple(Iterator, boolean, Context)).

 

Iterator에 남은 순회 요소가 있다는 것이 안정적으로 확인된다면, 

Discard : 해당 메소드는 취소 상태가 발생할 때 남은 순회 요소가 있다는 것이 안정적으로 확인된다면, Spliterator를 통해 나머지 요소를 제거를 시도합니다. Operator.onDiscardMultiple(Iterator, boolean, Context) 참고하세요.

 

해당 내용은 Github Issue #2014  Github Issue #2021 를 참고할 수 있습니다.

 

👉🏻  Example

// from Stream
Flux<Integer> flux = Flux.fromStream(Stream.iterate(0, i -> i + 1)).take(5, false);
StepVerifier.create(flux).expectNext(0, 1, 2, 3, 4).verifyComplete();

// occurs error from Stream
final List<Integer> source = Arrays.asList(1, 2, 3, null, 5);
Flux<Integer> flux2 = Flux.fromStream(source2.stream());
StepVerifier.create(flux2).expectNext(1, 2, 3).expectError().verify();

// NullPointException occurs error from Stream
assertThatExceptionOfType(NullPointerException.class).isThrownBy(() -> {
	Flux.fromStream((Stream<?>) null);
});

 

 

 

 

.generate()

: Programmatically Consumer Callback 을 사용해서 하나씩 Signal 을 생성하여 Flux를 생성하는 방법

 

Flux.generate() 메서드는 동기 방식으로 한 번에 1개의 데이터를 생성할 때 사용합니다.

generate()의 Signal 생성 시의 실행 순서를 살펴보면 다음과 같습니다.

 

가장 먼저,Subscriber의 구독이 시작되고 generator() 메소드에서 request 호출 시 Signal를 생성합니다.

generator를 실행할 때 인자로 SynchronousSink를 전달합니다.

generator는 전달받은 SynchronousSink를 사용해서 next, complete, error 신호를 발생합니다.

 

sink.next( nextElement ) : 다음 발행할 요소를 생성합니다.

sink.complete() : 스트림 요소 발행을 정상 완료로 종료합니다.

sink.error( ) : 스트림 요소 발행을 오류로 종료합니다.

 

그럼 지금부터 여러 인자를 받는 generate() 메소드를 확인해봅시다.

 

 

 

.generate(Consumer)

🔗 Reactor Link

: Consumer 콜백을 통해 Signal을 하나씩 생성하여 프로그래밍 방식Programmatically으로 Flux를 생성

 

public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator)

 

 

 

✔️ Type T  : Flux로 생성할 인자 타입

✔️ Param generator : 각 구독자가 제공하는 SynchronousSink를 소비consume하며, 1 회 호출 시 하나의 Signal을 생성

✔️ Returns   : 새로운 Flux 객체

 

👉🏻 Example

AtomicInteger at = new AtomicInteger(0);

// flux generate - complete
Flux<Integer> flux = Flux.generate(sink -> {
    int i = at.getAndIncrement();
    if (i == 3) {
        sink.next(i);
        sink.complete();
    }
    sink.next(i);
});

StepVerifier.create(flux)
    .expectNext(0).expectNext(1).expectNext(2).expectNext(3)
    .verifyComplete();
            

// flux generate - error
Flux<Integer> flux = Flux.generate(sink -> {
    int i = at.getAndIncrement();
    if (i % 2 == 0) {
        sink.next(i);
    } else {
        sink.error(new IllegalStateException("Error at " + i));
    }
});

StepVerifier.create(flux).expectNext(0).expectError().verify();

 

 

 

.generate(Callable, BiFunction)

🔗 Reactor Link

: Consumer 콜백을 통해 Signal을 하나씩 생성하여 프로그래밍 방식Programmatically으로 Flux를 생성

 

public static <T,S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S,SynchronousSink<T>,S> generator)

 

 

 

 

✔️ Type

     - T : Flux로 생성할인자 타입

     - S  : 구독자의 상태 타입

✔️ Parameters

     - Callable  stateSupplier : generator 에게 제공할 초기 state를 제공하기 위해 Subscriber가 각 요청마다 호출

     - BiFunction  generator : 각 구독자가 제공하는 SynchronousSink를 소비consume하며, 1 회 호출 시 하나의 Signal을 생성

✔️ Returns   : 새로운 Flux 객체

 

👉🏻 Example

Flux<String> flux = Flux.generate(
	AtomicLong::new, // generate a mutable object as the state.
	(state, sink) -> {
		long i = state.getAndIncrement(); // mutate the state here.
		sink.next("3 x " + i + " = " + 3 * i);
		if (i == 3) sink.complete();
			return state; // return the same instance as the new state.
	});

StepVerifier.create(flux)
    .expectNext("3 x 0 = 0")
    .expectNext("3 x 1 = 3")
    .expectNext("3 x 2 = 6")
    .expectNext("3 x 3 = 9")
    .verifyComplete();

 

 

 

.generate(Callable, BiFunction,  stateConsumer)

🔗 Reactor Link

: Consumer 콜백을 통해 Signal을 하나씩 생성하여 프로그래밍 방식Programmatically으로 Flux를 생성

 

public static <T,S> Flux<T> generate(Callable<S> stateSupplier,
                                                                 BiFunction<S,SynchronousSink<T>,S> generator,
                                                                 Consumer<? super S> stateConsumer)

 

 

 

✔️ Type

     - T : Flux로 생성할인자 타입

     - S  : 구독자의 상태 타입

✔️ Parameters

     - Callable  stateSupplier : generator 에게 제공할 초기 state를 제공하기 위해 Subscriber가 각 요청마다 호출

     - BiFunction  generator : 각 구독자가 제공하는 SynchronousSink를 소비consume하며, 1 회 호출 시 하나의 Signal을 생성

     - Comsumer stateConsumer: final cleanup 종료되었거나 다운스트림된 후 호출되어, 마지막 상태를 처리할 수 있게끔 받음 (i.e. resources cleanup)

 

✔️ Returns   : 새로운 Flux 객체

 

👉🏻 Example

Flux<String> flux = Flux.generate(
	AtomicLong::new,
	(state, sink) -> { // generate a mutable object as the state.
		long i = state.getAndIncrement(); // mutate the state here.
		sink.next("3 x " + i + " = " + 3 * i);
		if (i == 3) sink.complete();
		return state; // return the same instance as the new state.
	}, (state) -> System.out.println("state: " + state)); // We see the last state value (4) as the output of this `Consumer` lambda.

StepVerifier.create(flux)
	.expectNext("3 x 0 = 0")
	.expectNext("3 x 1 = 3")
	.expectNext("3 x 2 = 6")
	.expectNext("3 x 3 = 9")
	.verifyComplete();

 

 

반응형