RxJava schedulers

Egmont-Petersen

Introduction

Rx Java has gained in popularity and is used by an increasing number of organizations where sequences of event arrive at a Java backend application.

Rx is fit for composing and consuming sequences of events – and it is also widely used in JavaScript and TypeScript. Rx is used for:

  • Passing on UI events such as mouse movement and button clicks (front-end programming)
  • Data-update events like property changed, collection updated, case entered and the like
  • Backend triggered events like file-upload-ready, user logged on/off
  • Broadcast functionality of messages originated from an ESB bus
  • and much more examples

Rx is easy to learn to work with, especially for Java8+ developers who on a regular basis implement more complex Lambda logic in the Java-framework.

The central concept in RxJava is ‘Observable’, which can be considered an input sink – a source of sequentially occurring  events.

A specific part of RxJava are ‘schedulers’, which are used in multi-threading environments to work with Observable operators. A scheduler is used to schedule how a chain of operators will apply to different threads.

Basically, an Observable and the chain of operators that you apply to it will work, and thereby notify its observers, on the same thread on which its Subscribe method is called. The SubscribeOn operator changes this behavior by specifying a different Scheduler on which the Observable should operate. The ObserveOn operator specifies a different Scheduler that the Observable will use to send notifications to its observers.

In the following, we give some running examples of how Schedules can be used in RxJava.

 

Schedulers in Rx

In this technical note, we describe the working of schedulers in RxJava. Wit schedulers you can specify where (and when) to execute predefined tasks when certain event occur in an observable chain.

A basic example of an immediate RxScheduler is:

Scheduler scheduler = Schedulers.immediate();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
    System.out.println("Start");
    worker.schedule(() -> System.out.println("Inner"));
    System.out.println("End");
});

This scheduler gives the console output:

Start
Inner
End

As can be seen, an immediate scheduler is obtained from Schedulers (the scheduler factory). ‘Schedulers.immediate‘  is a special scheduler that triggers a task within the client thread in a blocking way, instead of asynchronously – and returns control to the caller when the action is completed

 

Some characteristics

Rx is single-threaded so an Observable  – its operators – notify its observers on the same thread from which the subscribe() method is called.

The methods observeOn and subscribeOn operate on a Scheduler, which is built for scheduling individual actions.

In the code fragment above, a worker accepts actions and executes them sequentially on one thread.

 

A larger example with schedulers

A large example of schedulers in RxJava is shown below

public void testThreadinInSchedulers() throws InterruptedException {
   List<String> execution = new ArrayList<>();
   List<Thread> threads = new ArrayList<>();
   Semaphore workfinished = new Semaphore(-2);

   Scheduler scheduler = Schedulers.newThread();
   Scheduler.Worker worker = scheduler.createWorker();
   worker.schedule(() -> {
      threads.add(Thread.currentThread());
      execution.add("Start");
      worker.schedule(() -> {
         execution.add("Inner");
         workfinished.release();
      });
      execution.add("End");
      workfinished.release();
   });
   worker.schedule(() -> {
      threads.add(Thread.currentThread());
      workfinished.release();
   });

   workfinished.acquire();

   assertEquals("Same worker schedules on the same thread", threads.get(0), threads.get(1));
   assertEquals("New thread used as trampoline", Arrays.asList("Start", "End", "Inner"), execution);
}

Two schedules (‘ workers’) are attached to the same Java-thread.

The trampoline Scheduler creates and returns a Scheduler that queues work on the current thread to be executed after the current work completes. This type of scheduler is very similar to the immediate one because it likewise schedules tasks in the same thread, thereby blocking this thread.

 

Trampoline schedule

Contrary to the immediate schedule, the upcoming trampoline task is executed when all previously scheduled tasks have finished. Immediate invokes a given task right away, whereas trampoline waits for the current task to finish. The trampoline‘s worker executes every task on the thread that scheduled the first task. The first call to schedule is blocking until the queue is emptied.

public void testTrampoline() {
   List<String> execution = new ArrayList<>();

   Scheduler scheduler = Schedulers.trampoline();
   Scheduler.Worker worker = scheduler.createWorker();
   worker.schedule(() -> {
      execution.add("Start");
      worker.schedule(() -> execution.add("Inner"));
      execution.add("End");
   });

   assertEquals(Arrays.asList("Start", "End", "Inner"), execution);
}

The above example illustrates just this, dat the inner schedule is allowed to run as last in the chain.

 

Maven dependency used in the examples

<dependency>
   <groupId>io.reactivex</groupId>
   <artifactId>rxjava</artifactId>
   <version>1.3.8</version>
</dependency>

 

– –

 

Our commercial webservice: Insight Classifiers uses RxJava for processing of incomming requests for data analyses.