Flux를 프로그래밍 방식으로 생성하는 가장 간단한 방법은 generator 함수를 사용하는 generate 메서드를 사용하는 것이다.
generate는 동기적으로 한번에 하나씩 발행(emission)한다. 여기서 사용되는 싱크(sink)는 SynchronousSink이며 콜백 함수가 호출될 때에 최대 한 번 싱크의 next() 메소드를 호출할 수 있다. 추가적으로 error(Throwable) 이나 complete()를 호출할 수 도 있다.
가장 유용한 메소드의 변형은 다음번에 무엇을 발행할지 결정하기 위해 싱크가 참조할 수 있는 state를 유지할 수 있게 해주는 변형일 것이다. generator 함수는 BiFunction<S, SynchronousSink<T>, S>이다.<S>는 state 객체의 타입이고, <T>는 발행할 데이터의 타입이고, BiFunction은 state 객체를 반환해야 한다. 초기 상태를 위한 Supplier<S>를 제공해야 하며 generator 함수는 이제 각 라운드에서 내부에서 데이터를 발행하고 새 state를 반환한다.
int를 state로 사용하는 "구구단" 예제이다.
Flux<String> flux = Flux.generate(
() -> 0, // 1
(state, sink) -> {
sink.next("3 x " + state + " = " + 3*state); // 2
if (state == 10) sink.complete(); // 3
return state + 1; // 4
}
);
(1) 초기 state를 0으로 제공한다.
(2) state를 사용해서 무엇을 발행할지 결정한다.
(3) 종료해야 할지 판단하는데 사용한다.
(4) 다음 번 호출에 사용할 새로운 state를 반환한다.
결과는 다음과 같다.
3 x 0 = 0
3 x 1 = 3
3 x 2 = 6
3 x 3 = 9
3 x 4 = 12
3 x 5 = 15
3 x 6 = 18
3 x 7 = 21
3 x 8 = 24
3 x 9 = 27
3 x 10 = 30
<S>로 가변 객체를 사용할 수도 있다. 위와 동일한 예제를 AtomicLong을 사용하여 재작성하였다.
Flux<String> flux = Flux.generate(
AtomicLong::new,
(state, sink) -> {
long i = state.getAndIncrement();
sink.next("3 x " + i + " = " + 3*i);
if (i == 10) sink.complete();
return state;
}
);
state 객체를 사용해서 리소스 정리 작업을 수행해야 한다면 generate(Supplier<S>, BiFunction, Consumer<S>)를 사용하면 된다. 마지막 state 객체가 Consumer에게 전달된다.
위의 예제에 Consumer를 추가한 버전이다.
Flux<String> flux = Flux.generate(
AtomicLong::new,
(state, sink) -> {
long i = state.getAndIncrement();
sink.next("3 x " + i + " = " + 3*i);
if (i == 10) sink.complete();
return state;
},
(state) -> System.out.println("state: " + state)
);
Consumer에는 마지막 state인 11이 전달된다. 이 람다식은 generate 처리가 완료될 때 한 번만 실행된다.
'잡다구리' 카테고리의 다른 글
Hexagonal Architecture with Java and Spring (0) | 2022.08.13 |
---|---|
Building a Multi-Module Spring Boot Application with Gradle (0) | 2022.08.13 |
What is Upstream and Downstream in Software Development (0) | 2022.08.13 |
reactor: Handle (0) | 2022.08.13 |
reactor: 동기적인 블로킹 호출을 감싸기 (0) | 2022.08.13 |
reactor: publishOn, subscribeOn() (0) | 2022.08.13 |
docker-compose에서 다른 서비스가 healthy 상태여야 특정 서비스를 기동하여야 할 때 (0) | 2022.08.13 |
스레드 상태 (Thread State) (0) | 2022.08.12 |
댓글