publishOn(scheduler)
onNext
,onComplete
,onError
가 지정된Scheduler
의Worker
(thread)에서 실행된다.- 이
publishOn
을 지정한 이후의 operator들이 수행될 스레딩 컨텍스가 결정된다. 다음publishOn
을 지정하기 전까지 계속 유지된다. - 일반적으로 빠른 publisher와 느린 consumer로 구성된 시나리오에서 사용된다.
javadoc link
예제
final Flux<String> flux = Flux
.range(1, 2)
.doOnNext(i -> log.info("range: {}", i))
.publishOn(Schedulers.newSingle("ooo"))
.map(i -> 10 + i)
.doOnNext(i -> log.info("map1: {}", i))
.publishOn(Schedulers.newSingle("***"))
.map(i -> "value " + i)
.doOnNext(i -> log.info("map2: {}", i));
결과
publishOn()을 만날 때 마다 operator가 생산하는 item의 next가 실행되는 스레드가 변경되는 것을 확인할 수 있다.
// publishOn을 만날 때 마다 실행 스레드가 변경되었다.
[main] INFO my.ex.SchedulerEx - range: 1
[main] INFO my.ex.SchedulerEx - range: 2
[ooo-2] INFO my.ex.SchedulerEx - map1: 11
[ooo-2] INFO my.ex.SchedulerEx - map1: 12
[***-1] INFO my.ex.SchedulerEx - map2: value 11
[***-1] INFO my.ex.SchedulerEx - value 11
[***-1] INFO my.ex.SchedulerEx - map2: value 12
[***-1] INFO my.ex.SchedulerEx - value 12
subscribeOn(scheduler)
subscribe
,onSubscribe
,request
가 지정된Scheduler
의Worker
(thread)에서 실행된다.- 이 연산자가 체인의 어디에 있더라도 체인 전체의
onNext
/onError
/onComplete
시그널의 실행 컨텍스트에 영향을 준다.publishOn
을 만나면 더 이상 영향을 주지 않는다.publishOn
은 실행 컨텍스트를 변경하기 때문이다. - 체인에서 가장 먼저 나오는
subscribeOn
이 적용되며, 나머지subscribeOn
은 무시된다. - 일반적으로 느린 publisher(예: blocking IO)와 빠른 consumer로 구성된 시나리오에서 사용된다.
javadoc link
예제
final Flux<String> flux2 = Flux
.range(1, 2)
.doOnNext(i -> log.info("range: {}", i))
.subscribeOn(Schedulers.newSingle("@@@1"))
.map(i -> 10 + i)
.doOnNext(i -> log.info("map1: {}", i))
.subscribeOn(Schedulers.newSingle("@@@2"))
.map(i -> "value " + i)
.doOnNext(i -> log.info("map2: {}", i))
.subscribeOn(Schedulers.newSingle("@@@3"));
결과
subscribeOn()이 여러 번 호출되어도 최초 호출만 적용되고 나머지 호출은 무시된다.
// 체인 전체가 "@@@1" 스케줄러에서 실행되었다.
[@@@1-4] INFO my.ex.SchedulerEx - range: 1
[@@@1-4] INFO my.ex.SchedulerEx - map1: 11
[@@@1-4] INFO my.ex.SchedulerEx - map2: value 11
[@@@1-4] INFO my.ex.SchedulerEx - value 11
[@@@1-4] INFO my.ex.SchedulerEx - range: 2
[@@@1-4] INFO my.ex.SchedulerEx - map1: 12
[@@@1-4] INFO my.ex.SchedulerEx - map2: value 12
[@@@1-4] INFO my.ex.SchedulerEx - value 12
subscribeOn(scheduler) + publishOn(scheduler)
첫 번째 subscribeOn
에 지정된 Scheduler
Worker
가 체인 전체에 적용된다. 이후 publishOn
을 만날 때 마다 Scheduler
Worker
가 변경되어 적용된다.
예제
final Flux<String> flux3 = Flux
.range(1, 2)
.doOnNext(i -> log.info("range: {}", i))
.subscribeOn(Schedulers.newSingle("###1"))
.publishOn(Schedulers.newSingle("ooo"))
.map(i -> 10 + i)
.doOnNext(i -> log.info("map1: {}", i))
.subscribeOn(Schedulers.newSingle("###2"))
.publishOn(Schedulers.newSingle("***"))
.map(i -> "value " + i)
.doOnNext(i -> log.info("map2: {}", i))
.subscribeOn(Schedulers.newSingle("###3"));
결과
최초 시작은 subscribeOn
스케줄러에서 지정한 스레드로 실행되었고, pusblishOn
을 만날 때 마다 스케줄러가 변경되었다.
[###1-8] INFO my.ex.SchedulerEx - range: 1
[###1-8] INFO my.ex.SchedulerEx - range: 2
[ooo-7] INFO my.ex.SchedulerEx - map1: 11
[ooo-7] INFO my.ex.SchedulerEx - map1: 12
[***-5] INFO my.ex.SchedulerEx - map2: value 11
[***-5] INFO my.ex.SchedulerEx - value 11
[***-5] INFO my.ex.SchedulerEx - map2: value 12
[***-5] INFO my.ex.SchedulerEx - value 12
'잡다구리' 카테고리의 다른 글
What is Upstream and Downstream in Software Development (0) | 2022.08.13 |
---|---|
reactor: Handle (0) | 2022.08.13 |
reactor: Synchronous generate (0) | 2022.08.13 |
reactor: 동기적인 블로킹 호출을 감싸기 (0) | 2022.08.13 |
docker-compose에서 다른 서비스가 healthy 상태여야 특정 서비스를 기동하여야 할 때 (0) | 2022.08.13 |
스레드 상태 (Thread State) (0) | 2022.08.12 |
Redis Cluster 구성 테스트 (0) | 2022.08.12 |
redis docker (0) | 2021.08.24 |
댓글