Links: Table of Contents | Single HTML

Chapter 6. Reactive Jersey Client API

Reactive Jersey Client API is quite a generic API allowing end users to utilize the popular reactive programming model when using Jersey Client. Several extensions come out of the box with Jersey that bring support for several existing 3rd party libraries for reactive programming. Along with describing the API itself, this section also covers existing extension modules and provides hints to implement a custom extension if needed.

If you are not familiar with the JAX-RS Client API, it is recommended that you see Chapter 5, Client API where the basics of JAX-RS Client API along with some advanced techniques are described.

6.1. Motivation for Reactive Client Extension

The Problem

Imagine a travel agency whose information system consists of multiple basic services. These services might be built using different technologies (JMS, EJB, WS, ...). For simplicity we presume that the services can be consumed using REST interface via HTTP method calls (e.g. using a JAX-RS Client). We also presume that the basic services we need to work with are:

  • Customers service – provides information about customers of the travel agency.

  • Destinations service – provides a list of visited and recommended destinations for an authenticated customer.

  • Weather service – provides weather forecast for a given destination.

  • Quoting service – provides price calculation for a customer to travel to a recommended destination.

The task is to create a publicly available feature that would, for an authenticated user, display a list of 10 last visited places and also display a list of 10 new recommended destinations including weather forecast and price calculations for the user. Notice that some of the requests (to retrieve data) depend on results of previous requests. E.g. getting recommended destinations depends on obtaining information about the authenticated user first. Obtaining weather forecast depends on destination information, etc. This relationship between some of the requests is an important part of the problem and an area where you can take a real advantage of the reactive programming model.

One way how to obtain data is to make multiple HTTP method calls from the client (e.g. mobile device) to all services involved and combine the retrieved data on the client. However, since the basic services are available in the internal network only we'd rather create a public orchestration layer instead of exposing all internal services to the outside world. The orchestration layer would expose only the desired operations of the basic services to the public. To limit traffic and achieve lower latency we'd like to return all the necessary information to the client in a single response.

The orchestration layer is illustrated in the Figure 6.1. The layer accepts requests from the outside and is responsible of invoking multiple requests to the internal services. When responses from the internal services are available in the orchestration layer they're combined into a single response that is sent back to the client.

Figure 6.1. Travel Agency Orchestration Service

Travel Agency Orchestration Service


The next sections describe various approaches (using JAX-RS Client) how the orchestration layer can be implemented.

A Naive Approach

The simplest way to implement the orchestration layer is to use synchronous approach. For this purpose we can use JAX-RS Client Sync API (see Example 6.1, “Excerpt from a synchronous approach while implementing the orchestration layer”). The implementation is simple to do, easy to read and straightforward to debug.

Example 6.1. Excerpt from a synchronous approach while implementing the orchestration layer

final WebTarget destination = ...;
final WebTarget forecast = ...;

// Obtain recommended destinations.
List<Destination> recommended = Collections.emptyList();
try {
    recommended = destination.path("recommended").request()
            // Identify the user.
            .header("Rx-User", "Sync")
            // Return a list of destinations.
            .get(new GenericType<List<Destination>>() {});
} catch (final Throwable throwable) {
    errors.offer("Recommended: " + throwable.getMessage());
}

// Forecasts. (depend on recommended destinations)
final Map<String, Forecast> forecasts = new HashMap<>();
for (final Destination dest : recommended) {
    try {
        forecasts.put(dest.getDestination(),
                forecast.resolveTemplate("destination", dest.getDestination()).request().get(Forecast.class));
    } catch (final Throwable throwable) {
        errors.offer("Forecast: " + throwable.getMessage());
    }
}


The downside of this approach is it's slowness. You need to sequentially process all the independent requests which means that you're wasting resources. You are needlessly blocking threads, that could be otherwise used for some real work.

If you take a closer look at the example you can notice that at the moment when all the recommended destinations are available for further processing we try to obtain forecasts for these destinations. Obtaining a weather forecast can be done only for a single destination with a single request, so we need to make 10 requests to the Forecast service to get all the destinations covered. In a synchronous way this means getting the forecasts one-by-one. When one response with a forecast arrives we can send another request to obtain another one. This takes time. The whole process of constructing a response for the client can be seen in Figure 6.2.

Let's try to quantify this with assigning an approximate time to every request we make to the internal services. This way we can easily compute the time needed to complete a response for the client. For example, obtaining

  • Customer details takes 150 ms

  • Recommended destinations takes 250 ms

  • Price calculation for a customer and destination takes 170 ms (each)

  • Weather forecast for a destination takes 330 ms (each)

When summed up, 5400 ms is approximately needed to construct a response for the client.

Figure 6.2. Time consumed to create a response for the client – synchronous way

Time consumed to create a response for the client – synchronous way


Synchronous approach is better to use for lower number of requests (where the accumulated time doesn't matter that much) or for a single request that depends on the result of previous operations.

Optimized Approach

The amount of time needed by the synchronous approach can be lowered by invoking independent requests in parallel. We're going to use JAX-RS Client Async API to illustrate this approach. The implementation in this case is slightly more difficult to get right because of the nested callbacks and the need to wait at some points for the moment when all partial responses are ready to be processed. The implementation is also a little bit harder to debug and maintain. The nested calls are causing a lot of complexity here. An example of concrete Java code following the asynchronous approach can be seen in Example 6.2, “Excerpt from an asynchronous approach while implementing the orchestration layer”.

Example 6.2. Excerpt from an asynchronous approach while implementing the orchestration layer

final WebTarget destination = ...;
final WebTarget forecast = ...;

// Obtain recommended destinations. (does not depend on visited ones)
destination.path("recommended").request()
        // Identify the user.
        .header("Rx-User", "Async")
        // Async invoker.
        .async()
        // Return a list of destinations.
        .get(new InvocationCallback<List<Destination>>() {
            @Override
            public void completed(final List<Destination> recommended) {
                final CountDownLatch innerLatch = new CountDownLatch(recommended.size());

                // Forecasts. (depend on recommended destinations)
                final Map<String, Forecast> forecasts = Collections.synchronizedMap(new HashMap<>());
                for (final Destination dest : recommended) {
                    forecast.resolveTemplate("destination", dest.getDestination()).request()
                            .async()
                            .get(new InvocationCallback<Forecast>() {
                                @Override
                                public void completed(final Forecast forecast) {
                                    forecasts.put(dest.getDestination(), forecast);
                                    innerLatch.countDown();
                                }

                                @Override
                                public void failed(final Throwable throwable) {
                                    errors.offer("Forecast: " + throwable.getMessage());
                                    innerLatch.countDown();
                                }
                            });
                }

                // Have to wait here for dependent requests ...
                try {
                    if (!innerLatch.await(10, TimeUnit.SECONDS)) {
                        errors.offer("Inner: Waiting for requests to complete has timed out.");
                    }
                } catch (final InterruptedException e) {
                    errors.offer("Inner: Waiting for requests to complete has been interrupted.");
                }

                // Continue with processing.
            }

            @Override
            public void failed(final Throwable throwable) {
                errors.offer("Recommended: " + throwable.getMessage());
            }
        });


The example is a bit more complicated from the first glance. We provided an InvocationCallback to async get method. One of the callback methods (completed or failed) is called when the request finishes. This is a pretty convenient way to handle async invocations when no nested calls are present. Since we have some nested calls (obtaining weather forecasts) we needed to introduce a CountDownLatch synchronization primitive as we use asynchronous approach in obtaining the weather forecasts as well. The latch is decreased every time a request, to the Forecasts service, completes successfully or fails. This indicates that the request actually finished and it is a signal for us that we can continue with processing (otherwise we wouldn't have all required data to construct the response for the client). This additional synchronization is something that was not present when taking the synchronous approach, but it is needed here.

Also the error processing can not be written as it could be in an ideal case. The error handling is scattered in too many places within the code, that it is quite difficult to create a comprehensive response for the client.

On the other hand taking asynchronous approach leads to code that is as fast as it gets. The resources are used optimally (no waiting threads) to achieve quick response time. The whole process of constructing the response for the client can be seen in Figure 6.3. It only took 730 ms instead of 5400 ms which we encountered in the previous approach.

Figure 6.3. Time consumed to create a response for the client – asynchronous way

Time consumed to create a response for the client – asynchronous way


As you can guess, this approach, even with all it's benefits, is the one that is really hard to implement, debug and maintain. It's a safe bet when you have many independent calls to make but it gets uglier with an increasing number of nested calls.

Reactive Approach

Reactive approach is a way out of the so-called Callback Hell which you can encounter when dealing with Java's Futures or invocation callbacks. Reactive approach is based on a data-flow concept and the execution model propagate changes through the flow. An example of a single item in the data-flow chain can be a JAX-RS Client HTTP method call. When the JAX-RS request finishes then the next item (or the user code) in the data-flow chain is notified about the continuation, completion or error in the chain. You're more describing what should be done next than how the next action in the chain should be triggered. The other important part here is that the data-flows are composable. You can compose/transform multiple flows into the resulting one and apply more operations on the result.

An example of this approach can be seen in Example 6.3, “Excerpt from a reactive approach while implementing the orchestration layer”. The APIs would be described in more detail in the next sections.

Example 6.3. Excerpt from a reactive approach while implementing the orchestration layer

final WebTarget destination = ...;
final WebTarget forecast = ...;

// Recommended places.
final Observable<Destination> recommended = RxObservable.from(destination).path("recommended").request()
        // Identify the user.
        .header("Rx-User", "RxJava")
        // Reactive invoker.
        .rx()
        // Return a list of destinations.
        .get(new GenericType<List<Destination>>() {})
        // Handle Errors.
        .onErrorReturn(throwable -> {
            errors.offer("Recommended: " + throwable.getMessage());
            return Collections.emptyList();
        })
        // Emit destinations one-by-one.
        .flatMap(Observable::from)
        // Remember emitted items for dependant requests.
        .cache();

// Forecasts. (depend on recommended destinations)
final RxWebTarget<RxObservableInvoker> rxForecast = RxObservable.from(forecast);
final Observable<Forecast> forecasts = recommended.flatMap(destination ->
        rxForecast
                .resolveTemplate("destination", destination.getDestination()).request().rx().get(Forecast.class)
                .onErrorReturn(throwable -> {
                    errors.offer("Forecast: " + throwable.getMessage());
                    return new Forecast(destination.getDestination(), "N/A");
                }));

final Observable<Recommendation> recommendations = Observable.zip(recommended, forecasts, Recommendation::new);


As you can see the code achieves the same work as the previous two examples. It's more readable than the pure asynchronous approach even though it's equally fast. It's as easy to read and implement as the synchronous approach. The error processing is also better handled in this way than in the asynchronous approach.

When dealing with a large amount of requests (that depend on each other) and when you need to compose/combine the results of these requests, the reactive programming model is the right technique to use.

6.2. Usage and Extension Modules

Reactive Jersey Client API tries to bring a similar experience you have with the existing JAX-RS Client API. It builds on it with extending these JAX-RS APIs with a few new methods.

When you compare synchronous invocation of HTTP calls ( Example 6.4, “Synchronous invocation of HTTP requests”)

Example 6.4. Synchronous invocation of HTTP requests

Response response = ClientBuilder.newClient()
        .target("http://example.com/resource")
        .request()
        .get();


with asynchronous invocation (Example 6.5, “Asynchronous invocation of HTTP requests”)

Example 6.5. Asynchronous invocation of HTTP requests

Future<Response> response = ClientBuilder.newClient()
        .target("http://example.com/resource")
        .request()
        .async()
        .get();


it is apparent how to pretty conveniently modify the way how a request is invoked (from sync to async) only by calling async method on an Invocation.Builder.

Naturally, it'd be nice to copy the same pattern to allow invoking requests in a reactive way. Just instead of async you'd call rx on an extension of Invocation.Builder, like in Example 6.6, “Reactive invocation of HTTP requests”.

Example 6.6. Reactive invocation of HTTP requests

Observable<Response> response = Rx.newClient(RxObservableInvoker.class)
        .target("http://example.com/resource")
        .request()
        .rx()
        .get();


To achieve this a few new interfaces had to be introduced in the Reactive Jersey Client API. The first new interface is RxInvoker which is very similar to SyncInvoker and AsyncInvoker. It contains all methods present in the two latter JAX-RS interfaces but the RxInvoker interface is more generic, so that it can be extended and used in particular implementations taking advantage of various reactive libraries. Extending this new interface in a particular implementation also preserves type safety which means that you're not loosing type information when a HTTP method call returns an object that you want to process further.

As a user of the Reactive Jersey Client API you only need to keep in mind that you won't be working with RxInvoker directly. You'd rather be working with an extension of this interface created for a particular implementation and you don't need to be bothered much with why are things designed the way they are.

Note

To see how the RxInvoker should be extended, refer to Section 6.4, “Implementing Support for Custom Reactive Libraries (SPI)”.

The important thing to notice here is that an extension of RxInvoker holds the type information and the Reactive Jersey Client needs to know about this type to properly propagate it among the method calls you'll be making. This is the reason why other interfaces (described bellow) are parametrized with this type.

In addition to having a concrete RxInvoker implementation ready there is also a need to have an implementation of new reactive methods, rx() and rx(ExecutorService). They're defined in RxInvocationBuilder which extends the Invocation.Builder from JAX-RS. Using the first method you can simply access the reactive request invocation interface to invoke the built request and the second allows you to specify the executor service to execute the current reactive request (and only this one).

To access the RxInvocationBuilder we needed to also extend JAX-RS Client (RxClient) and WebTarget (RxWebTarget) to preserve the fluent Client API introduced in JAX-RS.

With all these interfaces ready the only question left behind is the way how to create an instance of Reactive Jersey Client. This functionality is beyond the actual JAX-RS API. It is not possible to create such a client via the standard ClientBuilder entry point. To resolve this, we introduced a new helper class, Rx, which does the job. This class contains factory methods to create a new (reactive) client from scratch

and it also contains methods to enhance an existing JAX-RS Client and WebTarget

It's possible to provide an ExecutorService instance to tell the reactive client that all requests should be invoked using this particular executor. This behaviour can be suppressed by providing another ExecutorService instance for a particular request.

Similarly to the RxInvoker interface the Rx class is general and does not stick to any conrete implementation (to see a list of supported reactive libraries, refer to Section 6.3, “Supported Reactive Libraries”). When Reactive Clients are created using Rx factory methods, the actual invoker type parameter has to be provided (this is not the case with similar helper classes created for particular reactive libraries).

Dependencies

The Reactive Jersey Client is implemented as an extension module in Jersey. For Maven users, simply add the following dependency to your pom.xml:

<dependency>
    <groupId>org.glassfish.jersey.ext.rx</groupId>
    <artifactId>jersey-rx-client</artifactId>
    <version>2.24.1</version>
</dependency>

With this dependency only the basic classes would be added to your class-path without any support for any reactive library. To add support for a particular library, see the Section 6.3, “Supported Reactive Libraries”.

Note

If you're not using Maven (or other dependency management tool) make sure to add also all the transitive dependencies of this extension module (see jersey-rx-client) on the class-path.

6.3. Supported Reactive Libraries

There are already some available reactive (or reactive-like) libraries out there and Jersey brings support for some of them out of the box. Jersey currently supports:

6.3.1. RxJava – Observable

RxJava, contributed by Netflix, is probably the most advanced reactive library for Java at the moment. It's used for composing asynchronous and event-based programs by using observable sequences. It uses the observer pattern to support these sequences of data/events via it's Observable entry point class which implements the Reactive Pattern. Observable is actually the parameter type in the RxJava's extension of RxInvoker, called RxObservableInvoker. This means that the return type of HTTP method calls is Observable in this case (accordingly parametrized).

Requests are by default invoked at the moment when a subscriber is subscribed to an observable (it's a cold Observable). If not said otherwise a separate thread (JAX-RS Async Client requests) is used to obtain data. This behavior can be overridden by providing an ExecutorService when a reactive Client or WebTarget is created or when a particular requests is about to be invoked.

Usage

A JAX-RS Client or WebTarget aware of reactive HTTP calls, Jersey's RxClient or RxWebTarget parametrized by RxObservableInvoker, can be created either via the generic Rx entry point or the customized RxObservable one.

When using the generic entry point you need to specify the RxObservableInvoker invoker type to obtain an appropriate instance of the client or the web target.

Example 6.7. Creating Jersey/RxJava Client and WebTarget – Using Rx

// New Client
RxClient<RxObservableInvoker> newRxClient = Rx.newClient(RxObservableInvoker.class);

// From existing Client
RxClient<RxObservableInvoker> rxClient = Rx.from(client, RxObservableInvoker.class);

// From existing WebTarget
RxTarget<RxObservableInvoker> rxWebTarget = Rx.from(target, RxObservableInvoker.class);


You can skip specifying the invoker type when you use RxObservable entry point.

Example 6.8. Creating Jersey/RxJava Client and WebTarget – Using RxObservable

// New Client
RxClient<RxObservableInvoker> newRxClient = RxObservable.newClient();

// From existing Client
RxClient<RxObservableInvoker> rxClient = RxObservable.from(client);

// From existing WebTarget
RxTarget<RxObservableInvoker> rxWebTarget = RxObservable.from(target);


In addition to specifying the invoker type and client/web-target instances, when using the factory methods in the entry points mentioned above, an ExecutorService can be specified that will be used to execute requests on separate threads. In the case of RxJava the executor service is utilized to create a Scheduler that is later leveraged in both Observable#observeOn(rx.Scheduler) and Observable#subscribeOn(rx.Scheduler).

An example of obtaining Observable with JAX-RS Response from a remote service can be seen in Example 6.9, “Obtaining Observable<Response> from Jersey/RxJava Client”.

Example 6.9. Obtaining Observable<Response> from Jersey/RxJava Client

Observable<Response> observable = RxObservable.newClient()
        .target("http://example.com/resource")
        .request()
        .rx()
        .get();


Dependencies

The Reactive Jersey Client with RxJava support is available as an extension module in Jersey. For Maven users, simply add the following dependency to your pom.xml:

<dependency>
    <groupId>org.glassfish.jersey.ext.rx</groupId>
    <artifactId>jersey-rx-client-rxjava</artifactId>
    <version>2.24.1</version>
</dependency>

After this step you can use the extended client right away. The dependency transitively adds the following dependencies to your class-path as well: org.glassfish.jersey.ext.rx:jersey-rx-client and io.reactivex:rxjava.

Note

If you're not using Maven (or other dependency management tool) make sure to add also all the transitive dependencies of this extension module (see jersey-rx-client-rxjava) on the class-path.

6.3.2. Java 8 – CompletionStage and CompletableFuture

Java 8 natively contains asynchronous/event-based completion aware types, CompletionStage and CompletableFuture. These types can be then combined with Streams to achieve similar functionality as provided by RxJava (see Section 6.3.1, “RxJava (Observable)” for more information). CompletionStage is the parameter type in the Java 8 extension of RxInvoker, called RxCompletionStageInvoker. This means that the return type of HTTP method calls is CompletionStage in this case (accordingly parametrized).

Requests are by default invoked immediately. If not said otherwise the ForkJoinPool#commonPool() pool is used to obtain a thread which processed the request. This behavior can be overridden by providing an ExecutorService when a reactive Client or WebTarget is created or when a particular request is about to be invoked.

Usage

A JAX-RS Client or WebTarget aware of reactive HTTP calls, Jersey's RxClient or RxWebTarget parametrized by RxCompletionStageInvoker, can be created either via the generic Rx entry point or the customized RxCompletionStage one.

When using the generic entry point you need to specify the RxCompletionStage invoker type to obtain an appropriate instance of the client or the web target.

Example 6.10. Creating Jersey/Java8 Client and WebTarget – Using Rx

// New Client
RxClient<RxCompletionStageInvoker> newRxClient = Rx.newClient(RxCompletionStageInvoker.class);

// From existing Client
RxClient<RxCompletionStageInvoker> rxClient = Rx.from(client, RxCompletionStageInvoker.class);

// From existing WebTarget
RxTarget<RxCompletionStageInvoker> rxWebTarget = Rx.from(target, RxCompletionStageInvoker.class);


You can skip specifying the invoker type when you use the RxCompletionStage entry point.

Example 6.11. Creating Jersey/Java 8 Client and WebTarget – Using RxCompletionStage

// New Client
RxClient<RxCompletionStageInvoker> newRxClient = RxCompletionStage.newClient();

// From existing Client
RxClient<RxCompletionStageInvoker> rxClient = RxCompletionStage.from(client);

// From existing WebTarget
RxTarget<RxCompletionStageInvoker> rxWebTarget = RxCompletionStage.from(target);


In addition to specifying the invoker type and client/web-target instances, when using the factory methods in the entry points mentioned above, an ExecutorService instance could be specifies that should be used to execute requests on a separate thread.

An example of obtaining CompletionStage with JAX-RS Response from a remote service can be seen in Example 6.12, “Obtaining CompletionStage<Response> from Jersey/Java 8 Client”.

Example 6.12. Obtaining CompletionStage<Response> from Jersey/Java 8 Client

CompletionStage<Response> stage = RxCompletionStage.newClient()
        .target("http://example.com/resource")
        .request()
        .rx()
        .get();


Dependencies

Important

To use this module the application has to be compiled (with javac -target option set to 1.8) and run in a Java 8 environment. If you want to use Reactive Jersey Client with CompletableFuture in pre-Java 8 environment, see Section 6.3.4, “JSR-166e (CompletableFuture)”.

The Reactive Jersey Client with Java 8 support is available as an extension module in Jersey. For Maven users, simply add the following dependency to your pom.xml:

<dependency>
    <groupId>org.glassfish.jersey.ext.rx</groupId>
    <artifactId>jersey-rx-client-java8</artifactId>
    <version>2.24.1</version>
</dependency>

After this step you can use the extended client right away. The dependency transitively adds the following dependency to your class-path as well: org.glassfish.jersey.ext.rx:jersey-rx-client.

Note

If you're not using Maven (or other dependency management tool) make sure to add also all the transitive dependencies of this extension module (see jersey-rx-client-java8) on the class-path.

6.3.3. Guava – ListenableFuture and Futures

Guava, contributed by Google, also contains a type, ListenableFuture, which can be decorated with listeners that are notified when the future completes. The ListenableFuture can be combined with Futures to achieve asynchronous/event-based completion aware processing. ListenableFuture is the parameter type in the Guava's extension of RxInvoker, called RxListenableFutureInvoker. This means that the return type of HTTP method calls is ListenableFuture in this case (accordingly parametrized).

Requests are by default invoked immediately. If not said otherwise the Executors#newCachedThreadPool() pool is used to obtain a thread which processed the request. This behavior can be overridden by providing a ExecutorService when a reactive Client or WebTarget is created or when a particular requests is about to be invoked.

Usage

A JAX-RS Client or WebTarget aware of reactive HTTP calls, Jersey's RxClient or RxWebTarget parametrized by RxListenableFutureInvoker, can be created either via the generic Rx entry point or the customized RxListenableFuture one.

When using the generic entry point you need to specify the RxListenableFutureInvoker invoker type to obtain an appropriate instance of the client or the web target.

Example 6.13. Creating Jersey/Guava Client and WebTarget – Using Rx

// New Client
RxClient<RxListenableFutureInvoker> newRxClient = Rx.newClient(RxListenableFutureInvoker.class);

// From existing Client
RxClient<RxListenableFutureInvoker> rxClient = Rx.from(client, RxListenableFutureInvoker.class);

// From existing WebTarget
RxTarget<RxListenableFutureInvoker> rxWebTarget = Rx.from(target, RxListenableFutureInvoker.class);


You can skip specifying the invoker type when you use RxListenableFuture entry point.

Example 6.14. Creating Jersey/Guava Client and WebTarget – Using RxListenableFuture

// New Client
RxClient<RxListenableFutureInvoker> newRxClient = RxListenableFuture.newClient();

// From existing Client
RxClient<RxListenableFutureInvoker> rxClient = RxListenableFuture.from(client);

// From existing WebTarget
RxTarget<RxListenableFutureInvoker> rxWebTarget = RxListenableFuture.from(target);


In addition to specifying the invoker type and client/web-target instances, when using the factory methods in the entry points mentioned above, an ExecutorService can be specified that will be used to execute requests on a separate thread.

An example of obtaining ListenableFuture with JAX-RS Response from a remote service can be seen in Example 6.15, “Obtaining ListenableFuture<Response> from Jersey/Guava Client”.

Example 6.15. Obtaining ListenableFuture<Response> from Jersey/Guava Client

ListenableFuture<Response> stage = RxListenableFuture.newClient()
        .target("http://example.com/resource")
        .request()
        .rx()
        .get();


Dependencies

The Reactive Jersey Client with Guava support is available as an extension module in Jersey. For Maven users, simply add the following dependency to your pom.xml:

<dependency>
    <groupId>org.glassfish.jersey.ext.rx</groupId>
    <artifactId>jersey-rx-client-guava</artifactId>
    <version>2.24.1</version>
</dependency>

After this step you can use the extended client right away. The dependency transitively adds the following dependencies to your class-path as well: org.glassfish.jersey.ext.rx:jersey-rx-client and com.google.guava:guava.

Note

If you're not using Maven (or other dependency management tool) make sure to add also all the transitive dependencies of this extension module (see jersey-rx-client-guava) on the class-path.

6.3.4. JSR-166e – CompletableFuture

When Java 8 is not an option but the functionality of CompletionStage and CompletableFuture is required a JSR 166 library can be used. It's a back-port of classes from java.util.concurrent package added to Java 8. Contributed and maintained by Doug Lea. CompletableFuture is the parameter type in the JSR-166e's extension of RxInvoker, called RxCompletableFutureInvoker. This means that the return type of HTTP method calls is CompletableFuture in this case (accordingly parametrized).

Requests are by default invoked immediately. If not said otherwise the ForkJoinPool.html#commonPool() pool is used to obtain a thread which processed the request. This behavior can be overridden by providing an ExecutorService when a reactive Client or WebTarget is created or when a particular requests is about to be invoked.

Usage

A JAX-RS Client or WebTarget aware of reactive HTTP calls, Jersey's RxClient or RxWebTarget parametrized by RxCompletableFutureInvoker, can be created either via the generic Rx entry point or the customized RxCompletableFuture one.

When using the generic entry point you need to specify the RxCompletableFutureInvoker invoker type to obtain an appropriate instance of the client or the web target.

Example 6.16. Creating Jersey/JSR-166e Client and WebTarget – Using Rx

// New Client
RxClient<RxCompletableFutureInvoker> newRxClient = Rx.newClient(RxCompletableFutureInvoker.class);

// From existing Client
RxClient<RxCompletableFutureInvoker> rxClient = Rx.from(client, RxCompletableFutureInvoker.class);

// From existing WebTarget
RxTarget<RxCompletableFutureInvoker> rxWebTarget = Rx.from(target, RxCompletableFutureInvoker.class);


You can skip specifying the invoker type when you use RxCompletableFuture entry point.

Example 6.17. Creating Jersey/JSR-166e Client and WebTarget – Using RxCompletableFuture

// New Client
RxClient<RxCompletableFutureInvoker> newRxClient = RxCompletableFuture.newClient();

// From existing Client
RxClient<RxCompletableFutureInvoker> rxClient = RxCompletableFuture.from(client);

// From existing WebTarget
RxTarget<RxCompletableFutureInvoker> rxWebTarget = RxCompletableFuture.from(target);


In addition to specifying the invoker type and client/web-target instances, when using the factory methods in the entry points mentioned above, an ExecutorService can be specified that is further used to execute requests on a separate thread.

An example of obtaining CompletableFuture with JAX-RS Response from a remote service can be seen in Example 6.18, “Obtaining CompletableFuture<Response> from Jersey/JSR-166e Client”.

Example 6.18. Obtaining CompletableFuture<Response> from Jersey/JSR-166e Client

CompletableFuture<Response> stage = RxCompletableFuture.newClient()
        .target("http://example.com/resource")
        .request()
        .rx()
        .get();


Dependencies

Important

If you're compiling and running your application in Java 8 environment consider using Reactive Jersey Client with CompletableFuture with Section 6.3.2, “Java 8 (CompletionStage and CompletableFuture)” instead.

The Reactive Jersey Client with JSR-166e support is available as an extension module in Jersey. For Maven users, simply add the following dependency to your pom.xml:

<dependency>
    <groupId>org.glassfish.jersey.ext.rx</groupId>
    <artifactId>jersey-rx-client-jsr166e</artifactId>
    <version>2.24.1</version>
</dependency>

After this step you can use the extended client right away. The dependency transitively adds the following dependencies to your class-path as well: org.glassfish.jersey.ext.rx:jersey-rx-client and org.glassfish.jersey.bundles.repackaged:jersey-jsr166e. The later is the JSR-166e library repackaged by Jersey to make sure the OSGi headers are correct and the library can be used in OSGi environment.

Note

If you're not using Maven (or other dependency management tool) make sure to add also all the transitive dependencies of this extension module (see jersey-rx-client-jsr166e) on the class-path.

6.4. Implementing Support for Custom Reactive Libraries (SPI)

In case you want to bring support for some other library providing Reactive Programming Model into your application you can extend functionality of Reactive Jersey Client by implementing SPI available in jersey-rx-client module. Steps to do such a thing are as follows.

Extend RxInvoker interface

Even though not entirely intuitive this step is required when a support for a custom reactive library is needed. As mentioned above few JAX-RS Client interfaces had to be modified in order to make possible to invoke HTTP calls in a reactive way. All of them except the RxInvoker extend the original interfaces from JAX-RS (e.g. Client). RxInvoker is a brand new interface (very similar to SyncInvoker and AsyncInvoker) that actually lets you to invoke HTTP methods in the reactive way.

Example 6.19. RxInvoker snippet

public interface RxInvoker<T> {

    public <T> get();

    public <R> <T> get(Class<R> responseType);

    // ...

}


As you can notice it's too generic as it's designed to support various reactive libraries without bringing any additional abstractions and restrictions. The first type parameter, T, is the asynchronous/event-based completion aware type (e.g. Observable). The given type should be parametrized with the actual response type. And since it's not possible to parametrize type parameter it's an obligation of the extension of RxInvoker to do that. That applies to simpler methods, such as get(), as well as to more advanced methods, for example get(Class).

In the first case it's enough to parametrize the needed type with Response, e.g. Observable<Response> get(). The second case uses the type parameter from the parameter of the method. To accordingly extend the get(Class<R>) method you need to parametrize the needed type with R type parameter, e.g. <T> Observable<T> get(Class<T> responseType).

To summarize the requirements above and illustrate them in one code snippet the Example 6.20, “Extending RxInvoker - RxObservableInvoker” is an excerpt from RxObservableInvoker that works with RxJava's Observable.

Example 6.20. Extending RxInvoker - RxObservableInvoker

public interface RxObservableInvoker extends RxInvoker<Observable> {

    @Override
    public Observable<Response> get();

    @Override
    public <T> Observable<T> get(Class<T> responseType);

    // ...

}


Implement the extended interface

Either you can implement the extension of RxInvoker from scratch or it's possible to extend from AbstractRxInvoker abstract class which serves as a default implementation of the interface. In the later case only #method(...) methods are needed to be implemented as the default implementation of other methods (HTTP calls) delegates to these methods.

Implement and register RxInvokerProvider

To create an instance of particular RxInvoker an implementation of RxInvokerProvider SPI interface is needed. When a concrete RxInvoker is requested the runtime goes through all available providers and finds one which supports the given invoker type. It is expected that each provider supports mapping for distinct set of types and subtypes so that different providers do not conflict with each other.

Example 6.21. Example of RxInvokerProvider - RxObservableInvokerProvider

public final class RxObservableInvokerProvider implements RxInvokerProvider {

    @Override
    public <T> T getInvoker(final Class<T> invokerType, final Invocation.Builder builder, final ExecutorService executor) {
        if (RxObservableInvoker.class.isAssignableFrom(invokerType)) {
            return invokerType.cast(new JerseyRxObservableInvoker(builder, executor));
        }
        return null;
    }
}


Reactive Jersey Client looks for all available RxInvokerProviders via the standard META-INF/services mechanism. It's enough to bundle org.glassfish.jersey.client.rx.spi.RxInvokerProvider file with your library and reference your implementation (by fully qualified class name) from it.

Example 6.22. META-INF/services/org.glassfish.jersey.client.rx.spi.RxInvokerProvider

org.glassfish.jersey.client.rx.rxjava.RxObservableInvokerProvider


6.5. Examples

To see a complete working examples of various approaches using JAX-RS Client API (Sync and Async) and Reactive Jersey Client APIs feature refer to the: