The Threading Model

The Scheduler abstraction

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
interface Scheduler extends Disposable {
    
  Disposable schedule(Runnable task);
  Disposable schedule(Runnable task, long initialDelay, TimeUnit delayUnit);
  Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit);
  
  long now(TimeUnit unit);
  
  Worker createWorker();
  
  interface Worker extends Disposable {
    Disposable schedule(Runnable task);
    Disposable schedule(Runnable task, long initialDelay, TimeUnit delayUnit);
    Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit);
  }
}

Rule of thumbs for Schedulers factory methods typical usage:

Are Schedulers Always Backed by an ExecutorService?

Going from the main to a scheduler is obviously possible, but going from an arbitrary thread to the main thread is not possible.

Applying Schedulers to Operators

The publishOn(Scheduler s) operator

1
2
3
4
5
6
7
Flux.fromIterable(firstListOfUrls) //contains A, B and C
    .map(url -> blockingWebClient.get(url))
    .subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));

Flux.fromIterable(secondListOfUrls) //contains D and E
    .map(url -> blockingWebClient.get(url))
    .subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));

As a consequence, all these urls are processed sequentially on the main thread:

1
2
3
4
5
main from first list, got A
main from first list, got B
main from first list, got C
main from second list, got D
main from second list, got E

If we introduce publishOn, we can make this code more performant, so that the Flux don’t block each other:

1
2
3
4
5
6
7
8
9
Flux.fromIterable(firstListOfUrls) //contains A, B and C
    .publishOn(Schedulers.boundedElastic())
    .map(url -> blockingWebClient.get(url))
    .subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));

Flux.fromIterable(secondListOfUrls) //contains D and E
    .publishOn(Schedulers.boundedElastic())
    .map(url -> blockingWebClient.get(url))
    .subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));

Which could give us something like the following output:

1
2
3
4
5
boundedElastic-1 from first list, got A
boundedElastic-2 from second list, got D
boundedElastic-1 from first list, got B
boundedElastic-2 from second list, got E
boundedElastic-1 from first list, got C

The subscribeOn(Scheduler s) operator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//code provided in library you have no write access to
final Flux<String> fetchUrls(List<String> urls) {
  return Flux.fromIterable(urls)
    .map(url -> blockingWebClient.get(url)); //oops!
}

//your code:
fetchUrls(A, B, C)
  .subscribeOn(Schedulers.boundedElastic())
  .subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));

fetchUrls(D, E)
  .subscribeOn(Schedulers.boundedElastic())
  .subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));
1
2
3
4
5
boundedElastic-1 from first list, got A
boundedElastic-2 from second list, got D
boundedElastic-1 from first list, got B
boundedElastic-2 from second list, got E
boundedElastic-1 from first list, got C

It is important to distinguish the act of subscribing and the lambda passed to the subscribe() method. This method subscribes to its source Flux, but the lambda are executed at the end of processing, when the data has flown through all the steps (including steps that hop to another thread),. So the Thread on which the lambda is executed might be different from the subscription Thread , ie. the thread on which the subscribe method is called.

1
2
3
4
5
6
7
8
9
final Flux<String> betterFetchUrls(List<String> urls) {
  return Flux.fromIterable(urls)
    .flatMap(url -> 
             //wrap the blocking call in a Mono
             Mono.fromCallable(() -> blockingWebClient.get(url))
             //ensure that Mono is subscribed in an boundedElastic Worker
             .subscribeOn(Schedulers.boundedElastic())
    ); //each individual URL fetch runs in its own thread!
}

And What If I Mix the Two?

1
2
3
4
5
6
7
Flux.just("hello")
    .doOnNext(v -> System.out.println("just " + Thread.currentThread().getName()))
    .publishOn(Scheduler.boundedElastic())
    .doOnNext(v -> System.out.println("publish " + Thread.currentThread().getName()))
    .delayElements(Duration.ofMillis(500))
    .subscribeOn(Schedulers.elastic())
    .subscribe(v -> System.out.println(v + " delayed " + Thread.currentThread().getName()));

This will print:

1
2
3
just elastic-1
publish boundedElastic-1
hello delayed parallel-1

Unpack what happened step by step:

Reference