Thread on which their onNext method was invoked.Scheduler abstractionScheduler is an abstraction that gives the user control about threading.Scheduler can spawn Worker which are conceptually Threads, but are not necessarily backed by a Thread.Scheduler also includes the notion of a clock, whereas the Worker is purely about scheduling tasks.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
interface Scheduler extends Disposable {
Disposable schedule(Runnable task);
Disposable schedule(Runnable task, long initialDelay, TimeUnit delayUnit);
Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit);
long now(TimeUnit unit);
Worker createWorker();
interface Worker extends Disposable {
Disposable schedule(Runnable task);
Disposable schedule(Runnable task, long initialDelay, TimeUnit delayUnit);
Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit);
}
}
Schedulers factory methods typical usage:Schedulers.immediate() can be used as a null object for when an API requires a Scheduler but you don’t want to change threadsSchedulers.single() is for one-off tasks that can be run on a unique ExecutorServiceSchedulers.parallel() is good for CPU-intensive but short-lived tasks. It can execute N such tasks in parallel (by default N == number of CPUs)Schedulers.elastic() and Schedulers.boundedElastic() are good for more long-lived tasks (eg. blocking IO tasks). The elastic one spawns threads on-demand without a limit while the recently introduced boundedElastic does the same with a ceiling on the number of created threads.
elastic flavor is also backed by workers based on ScheduledExecutorService, except it creates these workers on demand and pools them.boundedElastic flavor is very similar in concept to the elastic one except it places an upper bound to the number of ScheduledExecutorService-backed Worker it creates.createWorker() method returns a facade Worker that will enqueue tasks instead of submitting them immediately.immediate() Scheduler. This one doesn’t modify which Thread the code is running on.Going from the main to a scheduler is obviously possible, but going from an arbitrary thread to the main thread is not possible.
publishOn(Scheduler s) operatorScheduler, effectively switching threads to one of that scheduler’s workers.Scheduler s, until another operator switches again (eg. another publishOn).1
2
3
4
5
6
7
Flux.fromIterable(firstListOfUrls) //contains A, B and C
.map(url -> blockingWebClient.get(url))
.subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));
Flux.fromIterable(secondListOfUrls) //contains D and E
.map(url -> blockingWebClient.get(url))
.subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));
Flux.fromIterable emits the content of its List on that same Thread.map to fetch the body of each url, which “inherits” that thread (and thus blocks it).subscribe is thus also running on the main thread.As a consequence, all these urls are processed sequentially on the main thread:
1
2
3
4
5
main from first list, got A
main from first list, got B
main from first list, got C
main from second list, got D
main from second list, got E
If we introduce publishOn, we can make this code more performant, so that the Flux don’t block each other:
1
2
3
4
5
6
7
8
9
Flux.fromIterable(firstListOfUrls) //contains A, B and C
.publishOn(Schedulers.boundedElastic())
.map(url -> blockingWebClient.get(url))
.subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));
Flux.fromIterable(secondListOfUrls) //contains D and E
.publishOn(Schedulers.boundedElastic())
.map(url -> blockingWebClient.get(url))
.subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));
Which could give us something like the following output:
1
2
3
4
5
boundedElastic-1 from first list, got A
boundedElastic-2 from second list, got D
boundedElastic-1 from first list, got B
boundedElastic-2 from second list, got E
boundedElastic-1 from first list, got C
publishOn could be used to offset blocking work on a separate Thread, by switching the publication of the triggers for that blocking work (the urls to fetch) on a provided Scheduler.map operator runs on its source thread, switching that source thread by putting a publishOn before the map works as intended.subscribeOn(Scheduler s) operatorpublishOn?Thread upstream? In a way, there is. That’s where subscribeOn can come in handy.1
2
3
4
5
6
7
8
9
10
11
12
13
14
//code provided in library you have no write access to
final Flux<String> fetchUrls(List<String> urls) {
return Flux.fromIterable(urls)
.map(url -> blockingWebClient.get(url)); //oops!
}
//your code:
fetchUrls(A, B, C)
.subscribeOn(Schedulers.boundedElastic())
.subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));
fetchUrls(D, E)
.subscribeOn(Schedulers.boundedElastic())
.subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));
1
2
3
4
5
boundedElastic-1 from first list, got A
boundedElastic-2 from second list, got D
boundedElastic-1 from first list, got B
boundedElastic-2 from second list, got E
boundedElastic-1 from first list, got C
subscribe calls are still running on the main thread, but they propagate a subscribe signal to their source, subscribeOn.subscribeOn propagates that same signal to its own source from fetchUrls, but on a boundedElastic Worker.It is important to distinguish the act of subscribing and the lambda passed to the
subscribe()method. This method subscribes to its sourceFlux, but the lambda are executed at the end of processing, when the data has flown through all the steps (including steps that hop to another thread),. So theThreadon which the lambda is executed might be different from the subscriptionThread, ie. the thread on which thesubscribemethod is called.
fetchUrls library, we could make the code even more performant by letting each fetch run on its own Worker, by leveraging subscribeOn.1
2
3
4
5
6
7
8
9
final Flux<String> betterFetchUrls(List<String> urls) {
return Flux.fromIterable(urls)
.flatMap(url ->
//wrap the blocking call in a Mono
Mono.fromCallable(() -> blockingWebClient.get(url))
//ensure that Mono is subscribed in an boundedElastic Worker
.subscribeOn(Schedulers.boundedElastic())
); //each individual URL fetch runs in its own thread!
}
subscribeOn will act throughout the subscribe phase, from bottom to top, then on the data path until it encounters a publishOn (or a time based operator).1
2
3
4
5
6
7
Flux.just("hello")
.doOnNext(v -> System.out.println("just " + Thread.currentThread().getName()))
.publishOn(Scheduler.boundedElastic())
.doOnNext(v -> System.out.println("publish " + Thread.currentThread().getName()))
.delayElements(Duration.ofMillis(500))
.subscribeOn(Schedulers.elastic())
.subscribe(v -> System.out.println(v + " delayed " + Thread.currentThread().getName()));
This will print:
1
2
3
just elastic-1
publish boundedElastic-1
hello delayed parallel-1
subscribe is called on the main thread, but subscription is rapidly switched to the elastic scheduler due to the subscribeOn immediately above.elastic, from bottom to top.just emits its value on the elastic scheduler.doOnNext receives that value on the same thread and prints it out: “just elastic-1”publishOn: data from doOnNext is propagated downstream on the boundedElastic scheduler.doOnNext receives its data on boundedElastic and prints “publish bounderElastic-1” accordingly.delayElements is a time operator, so by default it publishes data on the Schedulers.parallel() scheduler.subscribeOn does nothing but propagating signal on the same thread.subscribe(...) are executed on the thread in which data signals are received, so the lambda prints “hello delayed parallel-1”