Devoxx UK Deep Dive: Streams and CompletableFutures

On day one of Devoxx UK I visited a Deep Dive about Streams and CompletableFutures. This are my notes.

Marius QuadfliegMarius Quadflieg

Last week I visited the Deep Dive Session Parallel and Asynchronous Programming with Streams and CompletableFuture hold by Venkat Subramaniam (@venkat_s). I did write a lot of notes. So here they are :D I will update this post if and when a youtube video of the event will be posted.

Intro

Parallel Streams

  • Collection Pipeline Pattern

    • Functional Composition
  • Imperative style

    • has accidental complexity
    • Synchronize and suffer model
    • Structure of sequential code very different to structure of concurent code
  • Functional style

    • has less complexity and easier to parallelize
    • Structure of sequential code is identical to structure of concurent code
    • "Think" when using parallelStream
    • Parallelyze with .parallel() if we are not the owner of the stream.
    • If parallel is followed by sequential, the last one wins

Example Imperative

Streams vs Reactive Streams

Streams Reactive Streams
sequential vs parallel sync vs async
entire pipelin is sequential or parallel -> no segments Depends
subscibeOn -> no Segments
observeOn -> segments

We solve one set of problems only to create a new set of problems

Example

  • Java 1 - Threads
  • Java 5 - ExecutorService
    • Pool induced deadlock
  • Java 7 - Fork Join Pattern
    • Workstealing

Common FJP

  • Default for parallelStreams
  • one Process is executed by the main-Thread

Ordering a stream

  • Some methods are inherantly ordered
  • Some methods are unordered but may have an ordered counterpart
    • e.x. forEachOrdered -> Is parallel but sorts at the end

.reduce in streams

  • aggregate in other languages
  • to do it in parallel it's broken on "rows" and then the "rows" are aggregated
  • reduce does not take an inital value, it takes an "identity" value
  • For int + identity the identity is 0! x+0 = x
  • For int * identity the identity is 1! x*1 = x
  • What we work with should be a monoid

How many threads

How many threads should I create?
- Computation intensiv
- Threads <= number of cores
- IO intensiv
- Threads may be greater than number of cores

Formula

Formula

Number of Cores

T <= -------------------------

1 - blocking factor
0 <= blocking factor < 1

Fork Join Pool Thread Count

    Runtime.getRuntime().availableProcessors() // 8 Cores
    ForkJoinPool.commonPool() // Just 7 (its 7 + the Main Thread => 8 Threads)

Configure the number of threads

JVM flag

JVM flag
Do not use this flag!

Programatic

ForkJoinPool pool = new ForkJoinPool(100);
pool.submit(() -> stream.forEach(e -> {}));
pool.shutdown();
pool.awaitTermination(10, TimeUnit.SECONDS);

Quote

Parallel will probably use more resources but will deliver faster


CompletableFuture

  • Non blocking

Bad example

Future<?> future = call();
future.get();

Callbacks

  • lacks consistency
  • hard to compose
  • hard to deal with errors

Promises

  • may resolve, reject or be pending
  • data channel and error channel -> FOTO
    • errors are first class citizens
  • reactive streams offer a third channel: result channel
  • failure / error is like data
  • easy to compose

Promises Bad

CompletableFuture in Java is Promises in JavaScript

Stages

  • One stages completes and another stage may run (CompletableFuture after CompletableFuture)

Famous or popular functional interfaces

// Used in Stream API
Supplier<T> T get() // factories
Predicat<T> boolean test(T) // filter
Function<T,R> R apply(T) // map
Consumer<T> void accept(T) // forEach

Example

CompletableFuture Example

public static CompletableFuture<Integer> create() {
    return CompletableFuture.supplyAsync(() -> compute());
}

CompletableFuture<Integer> future = create();
CompletableFuture<Void> future2 = future.thenAccept(data -> System.out.println(data));

Easier

create()
    .thenAccept(data -> System.out.println(data))
    .thenRun(() -> System.out.println("This never dies")) // Runable interface
    .thenRun(() -> System.out.println("Realy, this never dies"))
    [...]

Bad idea

create().get() // blocking call!

The best thing to do with get() is to forget!

getNow(0) returns the result or Zero. Non-blocking call. In general don`t bother with it.

thenAccept Example

And the result looks like this:

thenAccept Result

Define a custom FJP

ForkJoinPool pool = new ForkJoinPool(10);
supplyAsync() -> compute(), pool

Comparision between Stream and CompletableFuture

Stream CompletableFuture
pipeline pipeline
lazy lazy
zero, one, or more data zero or one
only data channel data and error channel
forEach thenAccept
map thenApply
exceptions - oops error channel
(((zip))) thenCombine
flatMap thenCompose

Type interference automatically selects the right return value -> Pipeline

create()
    .thenApply(data -> data * 10) // Int
    .thenAccept(data -> System.out.println(data)) // Int
    .thenRun(() -> System.out.println("That went well"); // Void

Pipeline example

CompletableFuture<Integer> future = new CompletableFuture<Integer>()
    .thenApply(data -> data * 2)
    .thenApply(data -> data + 1)
    
System.out.println("built the pipeline");

future.thenAccept(data -> System.out.println(data));

System.out.println("prepare pipeline");

sleep(1000);

future.complete(2);

sleep(1000);

Result: 2

Refactoring deliveres different result

CompletableFuture<Integer> future = new CompletableFuture<Integer>()
    .thenApply(data -> data * 2)
    .thenApply(data -> data + 1)
    .thenAccept(data -> System.out.println(data));
    
System.out.println("built the pipeline");

System.out.println("prepare pipeline");

sleep(1000);

future.complete(2);

sleep(1000);

Result: 5

Error handling and recovery

public static int compute() {
    // throw new RuntimeException("error");
    return 2;
}

public static CompletableFuture<Integer> create() {
    return CompletableFuture.supplyAsync(() -> compute());
}

public static Void handleException(Throwable throwable) {
    System.out.println("ERROR: " + throwable);
    throw new RuntimeException("it is beyond any hope");
}

public static int handleException2(Throwable throwable) {
    System.out.println("ERROR: " + throwable);
    return -1; // recover from error track
}

public void static main(String args[]){
    create()
        .thenApply(data -> data *2)
        .exceptionally(throwable -> handleException2(throwable)
        .thenAccept(data -> System.out.println(data))
        .exceptionally(throwable -> handleException(throwable);
}

If everything is ok it jumps to the next thenXXX method, if an error occurs it will call the nearest exceptionally()

Error handling

completeExceptionally

public static int handleException(Throwable throwable) {
    System.out.println("ERROR: " + throwable)
    return 1
}

CompletableFuture<Integer> future = new CompletableFuture<Integer>()
    .thenApply(data -> data * 2)
    .exceptionally(throwable -> handleException(throwable))
    .thenApply(data -> data + 1)
    .thenAccept(data -> System.out.println(data));
    
System.out.println("built the pipeline");

sleep(1000);

if (Math.random() > 0,75) {
    future.completeExceptionally(new RuntimeException("my error"); // Change here
} else {
    future.complete(2);
}

sleep(1000);

System.out.println("DONE");

States
- Pending
- Resolve state (final)
- Reject (final)

How long are we in the pending state?

Quote:

Both in life and programming never do something without timeout

  • Java 8 has no timeout methods
  • Java 9 has methods to do a timeout:
    future.completeOnTimeout(0, 1, TimeUnit.SECONDS);
    future.orTimeout(2, TimeUnit.SECONDS);

combine Method

public static CompletableFuture<Integer> create(int number) {
    return CompletableFuture.supplyAsync(() -> number);
}

public main() {
    create(2)
        .thenCombine(create(3), (resualt1, result2) -> result1 + result2); // Joins the results from two Futures
        .thenAccept(data -> System.out.println(data));
}

compose Method

Stream example

public static int func1(int number) {
    return number *2;
}

public static int[] func2(int number) {
    return new int[] { number -1, number +1};
}

public main() {
   numbers.stream()
    //.map(e -> func(e)) // func is a one to one mapping function
    //.map(e -> func2(e)) // func is a one to many mapping function
    .flatMap(e -> Stream.of(func2(e))
    .forEach(System.Out::println);
}

map one-to-one Stream => Stream
map one-to-many Stream => Stream<List >
flatMap one-to-many Stream => Stream ??

Function returns data -> map
Function returns collection -> flatMap

Function returns data -> thenAccept
Function returns CompletableFuture -> thenCompose

public static CompletableFuture<Integer> create(int number) {
    return CompletableFuture.supplyAsync(() -> number;
}

public static CompletableFuture<Integer> inc(int number) {
    return CompletableFuture.supplyAsync(() -> number +1;
}

public main() {
    create(2)
        //.thenApply(data -> inc(data))
        .thenCompose(data -> inc(data))
        .thenAccept(result -> System.out.println(result));
}

If your function returns a CompletableFuture use thenCompose