Threading
Reactor and threading
This is an advanced topic and expects you have read the Threading and Schedulers section of the Reactor reference. Check it out to familiarize yourself with its API and usage.
Reactor is a concurrency agnostic runtime, there is no set thread affinity for each operator, leaving users in control on the thread in which operations happen. The framework provides many Scheduler
implementations and helper methods that materialize that control and are used by Discord4J throughout its modules.
Threading model
Discord4J sets reasonable defaults on the threading aspect, to cover the most possible use cases without compromising performance. First, since our network runtime is the Reactor Netty project, every HTTP (API and websocket) and UDP interaction occurs on their default scheduling approach which is providing an event loop using an amount of threads given by the available processors, with a minimum of 4.
Given that each of the Reactor Netty event loop threads do not allow blocking operations, we use a ForkJoinPoolScheduler to dispatch every event (through EventDispatcher
+ publishOn
).
Every HTTP API response is given back to you through the Bounded Elastic Scheduler, that is capable of supporting blocking operations downstream.
Some operations create exclusive Schedulers for serializing calls like emitting permits for rate-limited operations like the GlobalRateLimiter, Gateway IDENTIFY limiter and the per-session outbound limiter.
Customization
Since Reactor gives the control to its users regarding threading, it is our goal to extend that flexibility to you as well. While you're still under control using operators such as publishOn
and subscribeOn
you can also override some defaults we set above.
ReactorResources
A new class dedicated to group application-wide resources used by Discord4J was created starting from v3.1.0. ReactorResources is the place to customize a set of Scheduler
instances and other Reactor-related resources, including the schedulers used by HTTP/Websocket and UDP clients.
An instance is created per DiscordClient
and they can be shared if created externally:
ReactorResources reactorResources = ReactorResources.builder()
.timerTaskScheduler(Schedulers.newParallel("my-scheduler"))
.blockingTaskScheduler(Schedulers.boundedElastic())
.build();
DiscordClient discordClient = DiscordClientBuilder.create("TOKEN")
.setReactorResources(reactorResources)
.build();
Replacing Schedulers
ReactorResources reactorResources = ReactorResources.builder()
.timerTaskScheduler(Schedulers.newParallel("my-scheduler"))
.blockingTaskScheduler(Schedulers.boundedElastic())
.build();
timerTaskScheduler
Dedicated to all tasks that involve delays, or other time-sensitive operations. This means blocking is forbidden under these schedulers. Defaults to a parallel scheduler named d4j-parallel-N
and will throw an exception if running blocking code on it.
- Delaying Gateway identify attempts
- Delaying outbound Gateway payloads under per-session rate limit
- Scheduling Gateway periodic heartbeat
- Delaying REST API requests under Discord global rate limit
- Delaying REST API requests under Discord per-bucket rate limit
- Scheduling Voice Gateway periodic heartbeat
- Delaying connection attempts for Gateway and Voice Gateway
blockingTaskScheduler
Dedicated to all tasks where blocking is possible. Defaults to Reactor global boundedElastic
scheduler.
- Scheduling early event listeners registered through
GatewayBootstrap::withEventDispatcher
- Publishing responses from the REST API operations
Specific resources for Gateway and Voice
A ReactorResources
instance will be used for REST API operations, and reused for Gateway and Voice operations, unless overridden by the user.
You can customize the resources used exclusively for Gateway through the bootstrap:
GatewayDiscordClient client = DiscordClient.create("TOKEN")
.gateway()
.setGatewayReactorResources(reactorResources -> new GatewayReactorResources(...))
.login()
.block();
A GatewayReactorResources
has an extra scheduler for dedicated Gateway payload sending. If not overridden, it will use a dedicated single scheduler called d4j-gateway
. Can be created through GatewayReactorResources::DEFAULT_PAYLOAD_SENDER_SCHEDULER
.
The same can be done for voice:
GatewayDiscordClient client = DiscordClient.create("TOKEN")
.gateway()
.setVoiceReactorResources(reactorResources -> new VoiceReactorResources(...))
.login()
.block();
A VoiceGatewayResources
requires extra parameters due to all concurrent tasks it needs to keep. Among them:
- Scheduling Voice send task (defaults to
timerTaskScheduler
if not replaced) - Scheduling Voice receive task (defaults to
timerTaskScheduler
if not replaced)
EventDispatcher
The EventDispatcher
is a key element in the Discord4J Core architecture. Since v3.1.0 a wide range of customization is possible on this component, in particular: the backing Processor, its back-pressure (flow control) strategy, and the Scheduler used for publishing events.
Replacing thread model
Acquire a builder using EventDispatcher::builder
and customize its scheduler using eventScheduler
:
EventDispatcher customDispatcher = EventDispatcher.builder()
.eventScheduler(Schedulers.boundedElastic())
.build();
The current default is creating a ForkJoinPool
-based scheduler called d4j-events
. If you decide to change it, these are our recommendations:
- If you perform a lot of blocking and slow operations, switch to
Schedulers.boundedElastic()
. - If you never call block, perform operations that park threads like blocking HTTP libraries nor blocking I/O in general, try using
Schedulers.immediate()
to minimize thread switching.