http://www.reactivemanifesto.org/
Code for this talk under https://github.com/jlprat/reactive-jee
Database and remote server calls are typically synchronous. This means that every time one of those methods is called it takes some time to return, with a thread left waiting for the answer. Why not model it accordingly?
How a synchronous one looks like
@WebServlet(urlPatterns = "/servlet/users")
public class UserServlet extends HttpServlet {
...
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
final List<Author> authors = authorService.getAuthors();
final List<Reader> readers = readerService.getReaders();
final List<Person> users = new ArrayList<>(authors.size() + readers.size());
users.addAll(authors);
users.addAll(readers);
final ServletOutputStream out = resp.getOutputStream();
for (Person user : users) {
out.println(user.toString());
}
}
}
Making it asynchronous
@WebServlet(urlPatterns = "/servlet/users", asyncSuppported = true)
public class UserServlet extends HttpServlet {
...
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
final AsyncContext asyncCtx = req.startAsync();
asyncCtx.start(() -> {
final List<Author> authors = authorService.getAuthors();
final List<Reader> readers = readerService.getReaders();
final List<Person> users = mergeUsers(authors, readers);
printUsers(users, asyncCtx.getResponse().getOutputStream());
asyncCtx.complete();
});
}
}
How a synchronous one looks like
@Path("/users")
public class UserResource {
...
@GET
@Produces(MediaType.APPLICATION_JSON)
public Response getUsers() {
final List<Author> authors = authorService.getAuthors();
final List<Reader> readers = readerService.getReaders();
final List<Person> users = mergeUsers(authors, readers);
return Response.ok(users).build();
}
}
Making it asynchronous
@Stateless
@Path("/users")
public class UserResource {
...
@GET
@Produces(MediaType.APPLICATION_JSON)
@Asynchronous
public void getUsers(@Suspended AsyncResponse response) {
final List<Author> authors = authorService.getAuthors();
final List<Reader> readers = readerService.getReaders();
final List<Person> users = mergeUsers(authors, readers);
response.resume(Response.ok(users).build());
}
}
Never wait forever!
@Stateless
@Path("/users")
public class UserResource {
...
@GET
@Produces(MediaType.APPLICATION_JSON)
@Asynchronous
public void getUsers(@Suspended AsyncResponse response) {
response.setTimeout(2, TimeUnit.SECONDS);
response.setTimeoutHandler(resp ->
resp.resume(Response.status(Status.REQUEST_TIMEOUT).build()));
final List<Author> authors = authorService.getAuthors();
final List<Reader> readers = readerService.getReaders();
final List<Person> users = mergeUsers(authors, readers);
response.resume(Response.ok(users).build());
}
}
Request is accepted by a thread belonging to the http-thread-pool but the processing it is handled by a thread from the ejb-thread-pool. This way, the http-thread is free to accept new incoming request.
Last thing you want is waiting forever to complete a request from a client. Always specify timeouts when doing asynchronous tasks.
Use them when you have a combination of heavy and light loaded queries. You can get a higher throughput and your server can be better fine-tuned.
How a synchronous one looks like
@Stateless
public class AuthorService {
public Author createAuthor(final String name, final String surname) {
final Author author = new Author(UUID.randomUUID(), name, surname);
em.persist(author);
return author;
}
}
Making it asynchronous (I)
@Stateless
public class AuthorService {
@Asynchronous
public Future<Author> createAuthor(final String name, final String surname) {
final Author author = new Author(UUID.randomUUID(), name, surname);
em.persist(author);
return new AsyncResult<>(author);
}
}
The Client (I)
@Path("/users/authors")
public class AuthorResource {
@POST
@Consumes(MediaType.APPLICATION_FORM_URLENCODED)
@Produces(MediaType.APPLICATION_JSON)
public Response createAuthor(@FormParam("name") final String name, @FormParam("surname") final String surname) {
final Future<Author> authorFuture =
authorService.createAuthor(name, surname);
try {
final Author author = authorFuture.get(2, TimeUnit.SECONDS);
return Response.ok(author).build();
} catch (InterruptedException | ExecutionException | TimeoutException e) {
return Response.serverError().build();
}
}
}
Making it asynchronous (II)
@Stateless
public class AuthorService {
@Asynchronous
public CompletableFuture<Author> createAuthor(final String name, final String surname) {
final Author author = new Author(UUID.randomUUID(), name, surname);
em.persist(author);
return CompletableFuture.completedFuture(author);
}
}
The Client (II)
@Stateless
@Path("/users/authors")
public class AuthorResource {
@POST
@Consumes(MediaType.APPLICATION_FORM_URLENCODED)
@Produces(MediaType.APPLICATION_JSON)
@Asynchronous
public void createAuthor(@FormParam("name") final String name, @FormParam("surname") final String surname,
@Suspended AsyncResponse response) {
response.setTimeout(2, TimeUnit.SECONDS);
response.setTimeoutHandler(resp -> {
resp.resume(Response.status(Response.Status.REQUEST_TIMEOUT).build());
});
authorService.createAuthor(name, surname).thenApply(response::resume);
}
}
JEE specification doesn't allow this.
Async method exposed through no-interface view must have return type void or java.lang.concurrent.Future<V>
Making it asynchronous (III)
@Stateless
public class AuthorService {
@Asynchronous
public void createAuthor(final String name, final String surname,
final CompletableFuture<Author> promise) {
final Author author = new Author(UUID.randomUUID(), name, surname);
em.persist(author);
promise.complete(author);
}
}
The Client (III)
@Stateless
@Path("/users/authors")
public class AuthorResource {
@POST
@Consumes(MediaType.APPLICATION_FORM_URLENCODED)
@Produces(MediaType.APPLICATION_JSON)
@Asynchronous
public void createAuthor(@FormParam("name") final String name, @FormParam("surname") final String surname,
@Suspended AsyncResponse response) {
CompletableFuture<Author> promise = new CompletableFuture<>();
authorService.createAuthor(name, surname, promise);
promise.thenApply(response::resume);
}
}
The Client (III bis)
@GET
@Produces(MediaType.APPLICATION_JSON)
@Asynchronous
public void getUsers(@Suspended AsyncResponse response) {
final CompletableFuture<List<Author>> authors = new CompletableFuture<>();
final CompletableFuture<List<Reader>> readers = new CompletableFuture<>();
authorService.getAuthors(authors);
readerService.getReaders(readers);
authors.thenCombine(readers, (a, r) -> {
final List<Person> list = new ArrayList<>(a.size() + r.size());
list.addAll(a);
list.addAll(r);
return list;
}).thenApply(response::resume);
}
Any time an asynchronous method is invoked, it is served by another thread-pool, freeing the caller one to continue with their tasks.
Use them when you can benefit of calling different Session Beans at the same time. Throughput can be incremented this way.
Futures are currently the main actor for Asynchronicity in JEE. However, they have some limitations as they can not compose and make use of the new lambdas like CompletableFuture does.
If things can go wrong, model failure in your API's. In the same way, do not expect users of your methods to check if your returned object is null.
Introduced in Java 8, its only purpose is to express that the return value of a method might not exist. It is not intended to be used in fields.
Example code
@Stateless
public class ReaderService {
public Optional<Reader> getReader(final String id) {
return Optional.ofNullable(em.find(Reader.class, id));
}
}
...
@Path("/users/readers")
public class ReaderResource {
public Response getReader(@PathParam("id") final String id) {
final Optional<Reader> reader = readerService.getReader(id);
return reader
.map(Response::ok)
.orElseGet(() -> Response.status(Response.Status.NOT_FOUND))
.build();
}
}
Optional is NOT serializable. It can not be used in remote Session Beans! Use it only on local methods.
This class doesn't exist in Java (currently). It can be found, for example, in the javaslang library. The Try object represents a computation that might complete or fail with an exception.
Possible Implementation
interface Try<T> {
//Consumes the throwable if this is a Failure.
Try<T> onFailure(Consumer<? super Throwable> action);
//Consumes the value if this is a Success.
Try<T> onSuccess(Consumer<? super T> action);
//Creates a Try of a supplier that might throw.
static <T> Try<T> of(Try.CheckedSupplier<? extends T> supplier)
}
Example code
Try<String> attempt = getURL("https://devnull-as-a-service.com/");
attempt.onFailure(e -> logger.log(Level.SEVERE, "Error occurred", e))
.onSuccess(logger::info);
Try<String> getURL(String url) {
return Try.of(() -> {
StringBuilder result = new StringBuilder();
HttpURLConnection conn = (HttpURLConnection) new URL(url).openConnection();
conn.setRequestMethod("GET");
BufferedReader rd = new BufferedReader(new InputStreamReader(conn.getInputStream()));
String line;
while ((line = rd.readLine()) != null) {
result.append(line);
}
rd.close();
return result.toString();
}):
}
Provide loosely coupled communication between services while keeping certain degree of guarantees of delivery and fault tolerance.
The oldest of the API's introduced in this talk (2001). It is loosely coupled, transactional and it can be durable, fault tolerant and clustered. Messages can be ObjectMessage, MapMessage, TextMessage, StreamMessage or BytesMessage.
Coupled code
public class LendingService {
public BookAvailability lendBook(final Book book, final Reader reader) {
final BookAvailability bookAvailability =
em.find(BookAvailability.class, book.getIsbn());
if (!bookAvailability.isAvailable()) {
throw new BookNotAvailableException();
} else {
reader.loanBook(book);
bookAvailability.lend();
em.merge(reader);
em.merge(bookAvailability);
return bookAvailability;
}
}
}
The Producer
public class LendingService {
@Inject JMSContext jmsContext;
@Resource(lookup = "jms/BookLendingQueue") Destination queue;
public BookAvailability lendBook(Book book, Reader reader) {
...
sendLoanBookJMS(reader, book);
bookAvailability.lend();
em.merge(bookAvailability);
...
}
private void sendLoanBookJMS(Reader reader, Book book) {
final Map<String, Object> message = new HashMap<>();
message.put("readerId", reader.getId());
message.put("bookIsbn", book.getIsbn());
jmsContext.createProducer().send(queue, message);
}
}
The Consumer
@MessageDriven(activationConfig = {
@ActivationConfigProperty(propertyName = "destinationType",
propertyValue = "javax.jms.Queue"),
@ActivationConfigProperty(propertyName = "destinationLookup",
propertyValue = "jms/BookLendingQueue")})
@Stateless
public class ReaderMDB implements MessageListener {
...
public void onMessage(Message message) {
MapMessage msg = (MapMessage) message;
Reader reader = em.find(Reader.class, msg.getString("readerId"));
Book book = em.find(Book.class, msg.getString("bookIsbn"));
reader.loanBook(book);
em.merge(reader);
}
}
Use JMS when you want to decouple services from each other. The processing of the message will be done in asynchronous fashion. You can add delivery guarantees, or even send messages to other machines. Also useful when you want to broadcast some information.
CDI events are around since JEE 6. They allow beans to communicate without any compile-time dependency. By default events are processed in a synchronous way.
Producer
public class LendingService {
@Inject @ReturnedBook Event<BookLoan> returnedBook;
public BookAvailability returnBook(final Book book, final Reader reader) {
...
if (reader.getBookShelf().contains(book)) {
bookAvailability.returned();
em.merge(bookAvailability);
returnedBook.fire(new BookLoan(book, reader));
return bookAvailability;
}
...
}
}
Qualifier
@Qualifier
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD, ElementType.PARAMETER})
public @interface ReturnedBook {
}
Consumer
@Stateless
public class ReaderService {
public void updateBookShelf(@Observes @ReturnedBook BookLoan bookLoan) {
bookLoan.reader.returnBook(bookLoan.book);
em.merge(bookLoan.reader);
}
}
Asynchronous Consumer
@Stateless
public class ReaderService {
@Asynchronous
public void updateBookShelf(@Observes @ReturnedBook BookLoan bookLoan) {
bookLoan.reader.returnBook(bookLoan.book);
em.merge(bookLoan.reader);
}
}
Producing and consuming CDI events is easier than the JMS way, but it comes with some limitations. If you need asynchronously process the events, it needs to be done manually and thread-safety needs to be taken into account.
Josep Prat - @jlprat - Berlin Expert Days - 16/09/2016