본문 바로가기
잡다구리

reactor: publishOn, subscribeOn()

by Growing! 2022. 8. 13.

publishOn(scheduler)

  • onNext, onComplete, onError가 지정된 SchedulerWorker(thread)에서 실행된다.
  • publishOn을 지정한 이후의 operator들이 수행될 스레딩 컨텍스가 결정된다. 다음 publishOn을 지정하기 전까지 계속 유지된다.
  • 일반적으로 빠른 publisher와 느린 consumer로 구성된 시나리오에서 사용된다.

    javadoc link

예제

final Flux<String> flux = Flux
    .range(1, 2)
    .doOnNext(i -> log.info("range: {}", i))
    .publishOn(Schedulers.newSingle("ooo"))
    .map(i -> 10 + i)
    .doOnNext(i -> log.info("map1: {}", i))
    .publishOn(Schedulers.newSingle("***"))
    .map(i -> "value " + i)
    .doOnNext(i -> log.info("map2: {}", i));

결과

publishOn()을 만날 때 마다 operator가 생산하는 item의 next가 실행되는 스레드가 변경되는 것을 확인할 수 있다.

// publishOn을 만날 때 마다 실행 스레드가 변경되었다.
[main]  INFO  my.ex.SchedulerEx - range: 1
[main]  INFO  my.ex.SchedulerEx - range: 2
[ooo-2] INFO  my.ex.SchedulerEx - map1: 11
[ooo-2] INFO  my.ex.SchedulerEx - map1: 12
[***-1] INFO  my.ex.SchedulerEx - map2: value 11
[***-1] INFO  my.ex.SchedulerEx - value 11
[***-1] INFO  my.ex.SchedulerEx - map2: value 12
[***-1] INFO  my.ex.SchedulerEx - value 12

subscribeOn(scheduler)

  • subscribe, onSubscribe, request가 지정된 SchedulerWorker(thread)에서 실행된다.
  • 이 연산자가 체인의 어디에 있더라도 체인 전체의 onNext/onError/onComplete 시그널의 실행 컨텍스트에 영향을 준다. publishOn을 만나면 더 이상 영향을 주지 않는다. publishOn은 실행 컨텍스트를 변경하기 때문이다.
  • 체인에서 가장 먼저 나오는 subscribeOn이 적용되며, 나머지 subscribeOn은 무시된다.
  • 일반적으로 느린 publisher(예: blocking IO)와 빠른 consumer로 구성된 시나리오에서 사용된다.

    javadoc link

예제

final Flux<String> flux2 = Flux
    .range(1, 2)
    .doOnNext(i -> log.info("range: {}", i))
    .subscribeOn(Schedulers.newSingle("@@@1"))
    .map(i -> 10 + i)
    .doOnNext(i -> log.info("map1: {}", i))
    .subscribeOn(Schedulers.newSingle("@@@2"))
    .map(i -> "value " + i)
    .doOnNext(i -> log.info("map2: {}", i))
    .subscribeOn(Schedulers.newSingle("@@@3"));

결과

subscribeOn()이 여러 번 호출되어도 최초 호출만 적용되고 나머지 호출은 무시된다.

// 체인 전체가 "@@@1" 스케줄러에서 실행되었다.
[@@@1-4] INFO  my.ex.SchedulerEx - range: 1
[@@@1-4] INFO  my.ex.SchedulerEx - map1: 11
[@@@1-4] INFO  my.ex.SchedulerEx - map2: value 11
[@@@1-4] INFO  my.ex.SchedulerEx - value 11
[@@@1-4] INFO  my.ex.SchedulerEx - range: 2
[@@@1-4] INFO  my.ex.SchedulerEx - map1: 12
[@@@1-4] INFO  my.ex.SchedulerEx - map2: value 12
[@@@1-4] INFO  my.ex.SchedulerEx - value 12

subscribeOn(scheduler) + publishOn(scheduler)

첫 번째 subscribeOn에 지정된 Scheduler Worker가 체인 전체에 적용된다. 이후 publishOn을 만날 때 마다 Scheduler Worker가 변경되어 적용된다.

예제

final Flux<String> flux3 = Flux
    .range(1, 2)
    .doOnNext(i -> log.info("range: {}", i))
    .subscribeOn(Schedulers.newSingle("###1"))
    .publishOn(Schedulers.newSingle("ooo"))
    .map(i -> 10 + i)
    .doOnNext(i -> log.info("map1: {}", i))
    .subscribeOn(Schedulers.newSingle("###2"))
    .publishOn(Schedulers.newSingle("***"))
    .map(i -> "value " + i)
    .doOnNext(i -> log.info("map2: {}", i))
    .subscribeOn(Schedulers.newSingle("###3"));

결과

최초 시작은 subscribeOn 스케줄러에서 지정한 스레드로 실행되었고, pusblishOn을 만날 때 마다 스케줄러가 변경되었다.

[###1-8] INFO  my.ex.SchedulerEx - range: 1
[###1-8] INFO  my.ex.SchedulerEx - range: 2
[ooo-7] INFO  my.ex.SchedulerEx - map1: 11
[ooo-7] INFO  my.ex.SchedulerEx - map1: 12
[***-5] INFO  my.ex.SchedulerEx - map2: value 11
[***-5] INFO  my.ex.SchedulerEx - value 11
[***-5] INFO  my.ex.SchedulerEx - map2: value 12
[***-5] INFO  my.ex.SchedulerEx - value 12
 
 

댓글