Use Promise To Combine Async Requests

In Part II we talked about Future vs CompletableFuture(Java’s version of Promise). Now let’s take a look at some examples of how we can actually use them.

Some of the most common real world scenarios that keeps popping up, have to deal with multiple async requests.

  • One is serial: we need the result of “request 1” to feed into “request 2”, and the result of “request 2” into “request 3”, and so on. So request 1,2,3 have to be run in that order, and one can not start before the prior one ended.

  • The other one is parallel: we need all the results from “request 1.1”, “request 1.2”, … “request 1.N” before we could proceed to request 2, while “1.1”, “1.2”, … “1.N” are independent of each other, thus can be executed in parallel.

For each of these two challenges, we will first take a look at how we used to solve them without Promise, and then, with.

 

Multiple Async Requests - Serial

 

Setup

For this post, let’s use a simple setup: we have three slow requests, each one being slow by doing a Thread.sleep(2000); Also I have intentionally made the input & output of each request different and chain-able:

  • request 1 output a String,
  • request 2 take a String and output a int (length of input),
  • and request 3 take a int, and output a String.
public class AsyncSerial {
  private static String slow_request_1() {
    try {
      Thread.sleep(2000);
    } catch (InterruptedException e) { e.printStackTrace(); }
    return "result from request 1";
  }

  private static int slow_request_2(final String input) {
    try {
      Thread.sleep(2000);
    } catch (InterruptedException e) { e.printStackTrace(); }
    return input==null ? 0 : input.length();
  }

  private static String slow_request_3(int input) {
    try {
      Thread.sleep(2000);
    } catch (InterruptedException e) { e.printStackTrace(); }
    return "the length of request 1 result ="+input;
  }

}

 

Blocking version

The blocking version is the easiest:

public class AsyncSerial {
  . . . 

  public static void main(String[] args) {
    // // blocking version
    String result1 = slow_request_1();
    int result2 = slow_request_2(result1);
    String result3 = slow_request_3(result2);
    System.out.println( result3 );
  }
}

and after about 6 seconds, we see the output the length of request 1 result =21

 

Callback version

It’s at this stage that we realized that Future is not very helpful in this scenario. Because of the serial nature of the call chain, we need one request to finish before calling the next. So if we want to do this in anything that resemble some kind of async fashion, we could use ‘callback-style’: pass in the ‘next request when this one is done’ as an argument to the current request.

Unfortunately we need to modify our existing requests with the additional callback as a parameter. And here is where it gets ugly:

  private static String slow_request_1_withCB(
        final BiFunction<String, Function<Integer, String>, String> cb1,
        Function<Integer, String> cb2) {
    String request1Output = slow_request_1();
    return cb1.apply( request1Output, cb2 );
  }

  private static String slow_request_2_withCB(final String input, final Function<Integer, String> callback) {
    int request2Output = slow_request_2(input);
    return callback.apply( request2Output );
  }

  private static String slow_request_3_withCB(int input) {
    String request3Output = slow_request_3(input);
    return request3Output;
  }

  public static void main(String[] args) {
    // callback (hell) version
    System.out.println(slow_request_1_withCB(AsyncSerial::slow_request_2_withCB, AsyncSerial::slow_request_3_withCB));
  }

As you can see, the problem is that the chaining of the calls have to be written into the logic. When we call ‘request 1’ not only do we have to pass in ‘request 2’ as a parameter, we also have to supply ‘request 3’ as a parameter to ‘request 1’! This is because at the end of request 1, when 1 calls 2, 1 needs to tell 2 that when it finishes it needs to call 3. In other words ‘request 1’ needs to know not only its immediate successor, but the whole chain.

 

Promise (CompletableFuture) version

The good news is, the CompletableFuture version is pretty much as simple as the blocking version, while remain async.

  public static void main(String[] args) {
    // promise version
    CompletableFuture.supplyAsync( AsyncSerial::slow_request_1 )
        .thenApply( AsyncSerial::slow_request_2 )
        .thenApply( AsyncSerial::slow_request_3 )
        .thenAccept( System.out::println );
    System.out.println("to prove it is async, this line should be printed before the result from above is printed");
  }

As we can see, the power & flexibility of CompletableFuture is in full display here. The CompletableFuture.supplyAsync turns a regular blocking call into an Async Promise, and you can easily chain multiple requests by .thenApply and .thenAccept.

The difference between .thenApply and .thenAccept is that

  • .thenApply takes a Function< T,R > which has both an input and an output. So it’s usually meant for a middle transformation.
  • .thenAccept takes a Consumer< T > which just has an input. So it’s usually meant for the last transformation.

A complete Java source code can be found here.

 

Multiple Async Requests - Parallel

 

Setup

Let’s again use a simple setup: we have three slow requests, this time each one being slow by doing a Thread.sleep(n); but sleep for a random 1 - 2 seconds. After the sleep, request 1, 2, 3 will return String “1”, “2”, “3”. This time we have to wait for all three requests to complete, before proceeding.

public class AsyncParallel {
  private static SecureRandom secureRandom = new SecureRandom();
  static {
    secureRandom.setSeed( System.currentTimeMillis() );
  }
  private static void waitForAWhile() {
    try {
      long sleepTimeInMilli = 1000L + 100L*secureRandom.nextInt(10);  // 1-2 seconds
      Thread.sleep(sleepTimeInMilli);
    } catch (InterruptedException ie) { ie.printStackTrace(); }
  }

  private static String slow_request_1() {
    waitForAWhile();
    return "1";
  }
  private static String slow_request_2() {
    waitForAWhile();
    return "2";
  }
  private static String slow_request_3() {
    waitForAWhile();
    return "3";
  }
}

 

Blocking version

Again, the blocking version is simple:

public class AsyncParallel {
  . . . 

  public static void main(String[] args) {
    // // blocking version
    String s1 = slow_request_1();
    String s2 = slow_request_2();
    String s3 = slow_request_3();
    System.out.println( s1+s2+s3 )
  }
}

and after about 3-6 seconds, we see the output 123. Of course though simple, this code is really bad. We are wasting a lot of time waiting for each request to wake up from their sleep. In fact we are waiting for the sum of their sleep time.

 

CompletionService version

Since Java 7, Oracle introduced CompletionService, where we can submit multiple requests, and these requests can run in parallel, and we can get the result back not in the order of submission but in the order of who finishes first.

  private static final ExecutorService THREAD_POOL = Executors.newCachedThreadPool();

  public static void main(String[] args) {
    // completionService version
    final ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(THREAD_POOL);
    completionService.submit( AsyncParallel::slow_request_1 );
    completionService.submit( AsyncParallel::slow_request_2 );
    completionService.submit( AsyncParallel::slow_request_3 );
    int numFetched = 0;
    StringBuilder buf = new StringBuilder();
    while (numFetched<3) {
      try {
        final Future<String> future = completionService.take();
        buf.append(future.get());
        ++numFetched;
      } catch (InterruptedException ie) {
        break;
      }
      catch (Throwable e) {
        ++numFetched;
      }
    }
    System.out.println( buf.toString() );
  }

If we run the above code several times, we will see that sometimes the printout is “123”, sometimes it could be “132”, “312”, and so on. This is because completionService.take() will return in the order of who finishes first.

Though the above code is longer than the blocking version, we now run requests in parallel, and the total wait time is the slowest request, instead of the sum of all requests. Much more efficient!

 

Promise version

As usual we saved the best for last. With CompletableFuture we can achieve the efficiency of the CompletionService version, while keeping the blocking version’s simplicity:

public static void main(String[] args) {
  // promise version
  CompletableFuture<String> cf1 = CompletableFuture.supplyAsync( AsyncParallel::slow_request_1 );
  CompletableFuture<String> cf2 = CompletableFuture.supplyAsync( AsyncParallel::slow_request_2 );
  CompletableFuture<String> cf3 = CompletableFuture.supplyAsync( AsyncParallel::slow_request_3 );
  CompletableFuture.allOf(cf1, cf2, cf3).thenApply(v -> {
    System.out.println( cf1.join()+cf2.join()+cf3.join() );
    return null;
  });
  System.out.println("to prove it is async, this line should be printed before the result from above is printed");
}

A complete Java source code can be found here.