Reactive JEE

Improving Your JEE Monolith with Reactive Techniques

So you have a JEE monolith…

Monolith

Is it bad?

  • Does it need to scale?
  • Is the codebase growing fast?
  • Is it badly modularized?

Microservices Time!

microservices

Reactive Manifesto

http://www.reactivemanifesto.org/

  • Responsive
  • Resilient
  • Elastic
  • Message Driven

Plenty of options

But, I don't want a rewrite!

Is there anything for me?

Reactive JEE

Code for this talk under https://github.com/jlprat/reactive-jee

https://github.com/jlprat/reactive-jee

3 main areas

  1. Process Asynchronously
  2. Embrace Failure
  3. Decouple via Messages

1. Process Asynchronously

threads

Free the Threads

Database and remote server calls are typically synchronous. This means that every time one of those methods is called it takes some time to return, with a thread left waiting for the answer. Why not model it accordingly?

Asynchronous Servlet

How a synchronous one looks like

@WebServlet(urlPatterns = "/servlet/users")
public class UserServlet extends HttpServlet {
    ...
    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        final List<Author> authors = authorService.getAuthors();
        final List<Reader> readers = readerService.getReaders();
        final List<Person> users = new ArrayList<>(authors.size() + readers.size());
        users.addAll(authors);
        users.addAll(readers);
        final ServletOutputStream out = resp.getOutputStream();
        for (Person user : users) {
            out.println(user.toString());
        }
    }
}

Asynchronous Servlet

Making it asynchronous

@WebServlet(urlPatterns = "/servlet/users", asyncSuppported = true)
public class UserServlet extends HttpServlet {
    ...
    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        final AsyncContext asyncCtx = req.startAsync();
        asyncCtx.start(() -> {
            final List<Author> authors = authorService.getAuthors();
            final List<Reader> readers = readerService.getReaders();
            final List<Person> users = mergeUsers(authors, readers);
            printUsers(users, asyncCtx.getResponse().getOutputStream());
            asyncCtx.complete();
        });
    }
}

But who writes Servlets nowadays?

Asynchronous JAX-RS

How a synchronous one looks like

@Path("/users")
public class UserResource {
    ...
    @GET
    @Produces(MediaType.APPLICATION_JSON)
    public Response getUsers() {
        final List<Author> authors = authorService.getAuthors();
        final List<Reader> readers = readerService.getReaders();
        final List<Person> users = mergeUsers(authors, readers);
        return Response.ok(users).build();
    }
}

Asynchronous JAX-RS

Making it asynchronous

@Stateless
@Path("/users")
public class UserResource {
    ...
    @GET
    @Produces(MediaType.APPLICATION_JSON)
    @Asynchronous
    public void getUsers(@Suspended AsyncResponse response) {
        final List<Author> authors = authorService.getAuthors();
        final List<Reader> readers = readerService.getReaders();
        final List<Person> users = mergeUsers(authors, readers);
        response.resume(Response.ok(users).build());
    }
}

Asynchronous JAX-RS

Never wait forever!

@Stateless
@Path("/users")
public class UserResource {
    ...
    @GET
    @Produces(MediaType.APPLICATION_JSON)
    @Asynchronous
    public void getUsers(@Suspended AsyncResponse response) {
        response.setTimeout(2, TimeUnit.SECONDS);
        response.setTimeoutHandler(resp ->
            resp.resume(Response.status(Status.REQUEST_TIMEOUT).build()));
        final List<Author> authors = authorService.getAuthors();
        final List<Reader> readers = readerService.getReaders();
        final List<Person> users = mergeUsers(authors, readers);
        response.resume(Response.ok(users).build());
    }
}

Behind the Scenes

Request is accepted by a thread belonging to the http-thread-pool but the processing it is handled by a thread from the ejb-thread-pool. This way, the http-thread is free to accept new incoming request.

Behind the Scenes

Last thing you want is waiting forever to complete a request from a client. Always specify timeouts when doing asynchronous tasks.

Asynchronous Servlet & JAX-RS

Use them when you have a combination of heavy and light loaded queries. You can get a higher throughput and your server can be better fine-tuned.

Asynchronous Session Beans

How a synchronous one looks like

@Stateless
public class AuthorService {
    public Author createAuthor(final String name, final String surname) {
        final Author author = new Author(UUID.randomUUID(), name, surname);
        em.persist(author);
        return author;
    }
}

Asynchronous Session Beans

Making it asynchronous (I)

@Stateless
public class AuthorService {
    @Asynchronous
    public Future<Author> createAuthor(final String name, final String surname) {
        final Author author = new Author(UUID.randomUUID(), name, surname);
        em.persist(author);
        return new AsyncResult<>(author);
    }
}

Asynchronous Session Beans

The Client (I)

@Path("/users/authors")
public class AuthorResource {
    @POST
    @Consumes(MediaType.APPLICATION_FORM_URLENCODED)
    @Produces(MediaType.APPLICATION_JSON)
    public Response createAuthor(@FormParam("name") final String name, @FormParam("surname") final String surname) {
        final Future<Author> authorFuture =
                authorService.createAuthor(name, surname);
        try {
            final Author author = authorFuture.get(2, TimeUnit.SECONDS);
            return Response.ok(author).build();
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            return Response.serverError().build();
        }
    }
}

Asynchronous Session Beans

Making it asynchronous (II)

@Stateless
public class AuthorService {
    @Asynchronous
    public CompletableFuture<Author> createAuthor(final String name, final String surname) {
        final Author author = new Author(UUID.randomUUID(), name, surname);
        em.persist(author);
        return CompletableFuture.completedFuture(author);
    }
}

Asynchronous Session Beans

The Client (II)

@Stateless
@Path("/users/authors")
public class AuthorResource {
    @POST
    @Consumes(MediaType.APPLICATION_FORM_URLENCODED)
    @Produces(MediaType.APPLICATION_JSON)
    @Asynchronous
    public void createAuthor(@FormParam("name") final String name, @FormParam("surname") final String surname,
                                 @Suspended AsyncResponse response) {
        response.setTimeout(2, TimeUnit.SECONDS);
        response.setTimeoutHandler(resp -> {
            resp.resume(Response.status(Response.Status.REQUEST_TIMEOUT).build());
        });
        authorService.createAuthor(name, surname).thenApply(response::resume);
    }
}

This does not work!

JEE specification doesn't allow this.

Async method exposed through no-interface view must have return type void or java.lang.concurrent.Future<V>

Asynchronous Session Beans

Making it asynchronous (III)

@Stateless
public class AuthorService {
    @Asynchronous
    public void createAuthor(final String name, final String surname,
                final CompletableFuture<Author> promise) {
        final Author author = new Author(UUID.randomUUID(), name, surname);
        em.persist(author);
        promise.complete(author);
    }
}

Asynchronous Session Beans

The Client (III)

@Stateless
@Path("/users/authors")
public class AuthorResource {
    @POST
    @Consumes(MediaType.APPLICATION_FORM_URLENCODED)
    @Produces(MediaType.APPLICATION_JSON)
    @Asynchronous
    public void createAuthor(@FormParam("name") final String name, @FormParam("surname") final String surname,
                             @Suspended AsyncResponse response) {
        CompletableFuture<Author> promise = new CompletableFuture<>();
        authorService.createAuthor(name, surname, promise);
        promise.thenApply(response::resume);
    }
}

Asynchronous Session Beans

The Client (III bis)

@GET
@Produces(MediaType.APPLICATION_JSON)
@Asynchronous
public void getUsers(@Suspended AsyncResponse response) {
    final CompletableFuture<List<Author>> authors = new CompletableFuture<>();
    final CompletableFuture<List<Reader>> readers = new CompletableFuture<>();
    authorService.getAuthors(authors);
    readerService.getReaders(readers);
    authors.thenCombine(readers, (a, r) -> {
        final List<Person> list = new ArrayList<>(a.size() + r.size());
        list.addAll(a);
        list.addAll(r);
        return list;
    }).thenApply(response::resume);
}

Behind the Scenes

Any time an asynchronous method is invoked, it is served by another thread-pool, freeing the caller one to continue with their tasks.

Asynchronous Session Beans

Use them when you can benefit of calling different Session Beans at the same time. Throughput can be incremented this way.

Future vs CompletableFuture

Futures are currently the main actor for Asynchronicity in JEE. However, they have some limitations as they can not compose and make use of the new lambdas like CompletableFuture does.

2. Embrace Failure

broken

Model failure

If things can go wrong, model failure in your API's. In the same way, do not expect users of your methods to check if your returned object is null.

Optional

Introduced in Java 8, its only purpose is to express that the return value of a method might not exist. It is not intended to be used in fields.

Optional

Example code

@Stateless
public class ReaderService {
    public Optional<Reader> getReader(final String id) {
        return Optional.ofNullable(em.find(Reader.class, id));
    }
}
...
@Path("/users/readers")
public class ReaderResource {
    public Response getReader(@PathParam("id") final String id) {
        final Optional<Reader> reader = readerService.getReader(id);
        return reader
           .map(Response::ok)
           .orElseGet(() -> Response.status(Response.Status.NOT_FOUND))
           .build();
    }
}

DISCLAIMER!

Optional is NOT serializable. It can not be used in remote Session Beans! Use it only on local methods.

Try

This class doesn't exist in Java (currently). It can be found, for example, in the javaslang library. The Try object represents a computation that might complete or fail with an exception.

Try

Possible Implementation

interface Try<T> {
    //Consumes the throwable if this is a Failure.
    Try<T> onFailure(Consumer<? super Throwable> action);
    //Consumes the value if this is a Success.
    Try<T> onSuccess(Consumer<? super T> action);
    //Creates a Try of a supplier that might throw.
    static <T> Try<T> of(Try.CheckedSupplier<? extends T> supplier)
}

Try

Example code

Try<String> attempt = getURL("https://devnull-as-a-service.com/");
attempt.onFailure(e -> logger.log(Level.SEVERE, "Error occurred", e))
       .onSuccess(logger::info);
Try<String> getURL(String url) {
    return Try.of(() -> {
        StringBuilder result = new StringBuilder();
        HttpURLConnection conn = (HttpURLConnection) new URL(url).openConnection();
        conn.setRequestMethod("GET");
        BufferedReader rd = new BufferedReader(new InputStreamReader(conn.getInputStream()));
        String line;
        while ((line = rd.readLine()) != null) {
           result.append(line);
        }
        rd.close();
        return result.toString();
    }):
}

3. Decouple via Messages

messaging

Decouple services

Provide loosely coupled communication between services while keeping certain degree of guarantees of delivery and fault tolerance.

JMS

The oldest of the API's introduced in this talk (2001). It is loosely coupled, transactional and it can be durable, fault tolerant and clustered. Messages can be ObjectMessage, MapMessage, TextMessage, StreamMessage or BytesMessage.

JMS

Coupled code

public class LendingService {
    public BookAvailability lendBook(final Book book, final Reader reader) {
        final BookAvailability bookAvailability =
                em.find(BookAvailability.class, book.getIsbn());
        if (!bookAvailability.isAvailable()) {
            throw new BookNotAvailableException();
        } else {
            reader.loanBook(book);
            bookAvailability.lend();
            em.merge(reader);
            em.merge(bookAvailability);
            return bookAvailability;
        }
    }
}

JMS

The Producer

public class LendingService {
    @Inject JMSContext jmsContext;
    @Resource(lookup = "jms/BookLendingQueue") Destination queue;
    public BookAvailability lendBook(Book book, Reader reader) {
        ...
        sendLoanBookJMS(reader, book);
        bookAvailability.lend();
        em.merge(bookAvailability);
        ...
    }
    private void sendLoanBookJMS(Reader reader, Book book) {
        final Map<String, Object> message = new HashMap<>();
        message.put("readerId", reader.getId());
        message.put("bookIsbn", book.getIsbn());
        jmsContext.createProducer().send(queue, message);
    }
}

JMS

The Consumer

@MessageDriven(activationConfig = {
        @ActivationConfigProperty(propertyName = "destinationType",
                propertyValue = "javax.jms.Queue"),
        @ActivationConfigProperty(propertyName = "destinationLookup",
                propertyValue = "jms/BookLendingQueue")})
@Stateless
public class ReaderMDB implements MessageListener {
    ...
    public void onMessage(Message message) {
        MapMessage msg = (MapMessage) message;
        Reader reader = em.find(Reader.class, msg.getString("readerId"));
        Book book = em.find(Book.class, msg.getString("bookIsbn"));
        reader.loanBook(book);
        em.merge(reader);
    }
}

JMS

Use JMS when you want to decouple services from each other. The processing of the message will be done in asynchronous fashion. You can add delivery guarantees, or even send messages to other machines. Also useful when you want to broadcast some information.

CDI Events

CDI events are around since JEE 6. They allow beans to communicate without any compile-time dependency. By default events are processed in a synchronous way.

CDI Events

Producer

public class LendingService {
    @Inject @ReturnedBook Event<BookLoan> returnedBook;
    public BookAvailability returnBook(final Book book, final Reader reader) {
        ...
        if (reader.getBookShelf().contains(book)) {
            bookAvailability.returned();
            em.merge(bookAvailability);
            returnedBook.fire(new BookLoan(book, reader));
            return bookAvailability;
        }
        ...
    }
}

CDI Events

Qualifier

@Qualifier
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD, ElementType.PARAMETER})
public @interface ReturnedBook {
}

CDI Events

Consumer

@Stateless
public class ReaderService {
    public void updateBookShelf(@Observes @ReturnedBook BookLoan bookLoan) {
        bookLoan.reader.returnBook(bookLoan.book);
        em.merge(bookLoan.reader);
    }
}

CDI Events

Asynchronous Consumer

@Stateless
public class ReaderService {
    @Asynchronous
    public void updateBookShelf(@Observes @ReturnedBook BookLoan bookLoan) {
        bookLoan.reader.returnBook(bookLoan.book);
        em.merge(bookLoan.reader);
    }
}

CDI Events

Producing and consuming CDI events is easier than the JMS way, but it comes with some limitations. If you need asynchronously process the events, it needs to be done manually and thread-safety needs to be taken into account.

Wrap Up

  • Use asynchronous methods when the computation is long lasting
  • Help callers with expressive types
  • Use CDI Events or JMS to decouple your services

#reactiveJEE

https://github.com/jlprat/reactive-jee

Josep Prat - @jlprat - Berlin Expert Days - 16/09/2016

Further Information

We are hiring!

inside.gameduell.com
www.techtalk-berlin.de