B.1. How Do I Wrap a Synchronous, Blocking Call?
정보의 소스가 동기적이고 블로킹 방식인 경우가 많이 있다. Reactor 애플리케이션에서 이러한 소스를 처리하려면 다음 패턴을 적용한다.
Mono blockingWrapper = Mono.fromCallable(() -> { // 1
return /* make a remote synchronous call */ // 2
})
.subscribeOn(Schedulers.boundedElastic()); // 3
blockingWrapper.subscribe(...);
(1)
fromCallable
을 사용하여 새로운Mono
로 만든다. 소스가 하나의 값을 반환하기 때문에Mono
를 사용해야 한다.
(2) 동기적인 블로킹 호출의 결과를 반환하는Callable
람다를 작성한다.
(3) 구독할 때 마다 전용 단일 스레드에서 실행되도록 한다.Schedulers.boundedElastic()
의 워커를 지정한다.subscribeOn
은Mono
를 구독하지 않으며, 구독이 실행되는Scheduler
의 종류를 지정하는데 사용된다.
Scheduler
로는 boundedElastic
을 사용해야 한다. 다음과 같은 이유 때문이다.
- 전용 스레드를 사용하여 블로킹 리소스가 처리되기까지 대기하며, 이는 다른 논블로킹 처리에 영향을 주지 않는다.
- 생성할 수 있는 스레드의 갯수에 제한이 있어 너무 많은 스레드가 생성되지 않게 해준다.
- 블로킹 태스크가 급증하면 큐잉되어 처리 시점이 미뤄질 수 있게 해준다.
예제
Mono<Long> blockingLong = Mono
.fromCallable(
() -> {
log.info("blocking start");
Thread.sleep(1000);
log.info("blocking end");
return System.currentTimeMillis() % 100;
}
)
.subscribeOn(Schedulers.boundedElastic());
blockingLong
.doOnSubscribe(s -> log.info("onSubscribe"))
.subscribe(l -> log.info("result: " + l));
결과
18:56:10.908 [main] INFO my.ex.GenerateFlux - onSubscribe
18:56:10.910 [boundedElastic-1] INFO my.ex.GenerateFlux - blocking start
18:56:11.912 [boundedElastic-1] INFO my.ex.GenerateFlux - blocking end // 1000ms의 차이가 있다.
18:56:11.913 [boundedElastic-1] INFO my.ex.GenerateFlux - result: 12
'잡다구리' 카테고리의 다른 글
Building a Multi-Module Spring Boot Application with Gradle (0) | 2022.08.13 |
---|---|
What is Upstream and Downstream in Software Development (0) | 2022.08.13 |
reactor: Handle (0) | 2022.08.13 |
reactor: Synchronous generate (0) | 2022.08.13 |
reactor: publishOn, subscribeOn() (0) | 2022.08.13 |
docker-compose에서 다른 서비스가 healthy 상태여야 특정 서비스를 기동하여야 할 때 (0) | 2022.08.13 |
스레드 상태 (Thread State) (0) | 2022.08.12 |
Redis Cluster 구성 테스트 (0) | 2022.08.12 |
댓글