Devoxx UK Deep Dive: Streams and CompletableFutures

A few weeks ago I took part in the DevOxx UK Deep Dive Session Parallel and Asynchronous Programming with Streams and CompletableFuture hold by Venkat Subramaniam (@venkat_s). I did write a lot of notes. So here they are 😀

There also is a video of the full session on Youtube

Parallel Streams

  • Collection Pipeline Pattern
  • Functional Composition

  • Imperative style

  • has accidental complexity
  • Synchronize and suffer model
  • Structure of sequential code very different to structure of concurent code

  • Functional style

  • has less complexity and easier to parallelize
  • Structure of sequential code is identical to structure of concurent code
  • „Think“ when using parallelStream
  • Parallelyze with .parallel() if we are not the owner of the stream.
  • If parallel is followed by sequential, the last one wins

Streams vs Reactive Streams

Streams Reactive Streams
sequential vs parallel sync vs async
entire pipelin is sequential or parallel -> no segments Depends
subscibeOn -> no Segments
observeOn -> segments

We solve one set of problems only to create a new set of problems

Example

  • Java 1 – Threads
  • Java 5 – ExecutorService
  • Pool induced deadlock
  • Java 7 – Fork Join Pattern
  • Workstealing

Common FJP

  • Default for parallelStreams
  • one Process is executed by the main-Thread

Ordering a stream

  • Some methods are inherantly ordered
  • Some methods are unordered but may have an ordered counterpart
  • e.x. forEachOrdered -> Is parallel but sorts at the end

.reduce in streams

  • aggregate in other languages
  • to do it in parallel it’s broken on „rows“ and then the „rows“ are aggregated
  • reduce does not take an inital value, it takes an „identity“ value
  • For int + identity the identity is 0! x+0 = x
  • For int * identity the identity is 1! x*1 = x
  • What we work with should be a monoid

How many threads

How many threads should I create?
– Computation intensiv
– Threads <= number of cores
– IO intensiv
– Threads may be greater than number of cores

Formula

Number of Cores

T <= ————————- > 1 – blocking factor

0 <= blocking factor < 1 #### Fork Join Pool Thread Count „`java Runtime.getRuntime().availableProcessors() // 8 Cores ForkJoinPool.commonPool() // Just 7 (its 7 + the Main Thread => 8 Threads)


### Configure the number of threads #### JVM flag <img class="alignnone size-full wp-image-32" src="https://blog.qfotografie.de/wp-content/uploads/2018/10/4config-jvm.jpg" alt="" width="858" height="482" /> Do not use this flag! #### Programatic ```java ForkJoinPool pool = new ForkJoinPool(100); pool.submit(() -&gt; stream.forEach(e -&gt; {})); pool.shutdown(); pool.awaitTermination(10, TimeUnit.SECONDS);

Quote

Parallel will probably use more resources but will deliver faster


CompletableFuture

  • Non blocking

Bad example

Future&lt;?&gt; future = call();
future.get();

Callbacks

  • lacks consistency
  • hard to compose
  • hard to deal with errors

Promises

  • may resolve, reject or be pending
  • data channel and error channel -> FOTO
  • errors are first class citizens
  • reactive streams offer a third channel: result channel
  • failure / error is like data
  • easy to compose

CompletableFuture in Java is Promises in JavaScript

Stages

  • One stages completes and another stage may run (CompletableFuture after CompletableFuture)

Famous or popular functional interfaces

// Used in Stream API
Supplier T get() // factories
Predicat boolean test(T) // filter
Function<T,R> R apply(T) // map
Consumer void accept(T) // forEach

Example

public static CompletableFuture create() {
return CompletableFuture.supplyAsync(() -> compute());
}

CompletableFuture future = create();
CompletableFuture future2 = future.thenAccept(data -> System.out.println(data));

Easier

create()
.thenAccept(data -> System.out.println(data))
.thenRun(() -> System.out.println("This never dies")) // Runable interface
.thenRun(() -> System.out.println("Realy, this never dies"))
[...]

Bad idea

create().get() // blocking call!

The best thing to do with get() is to forget!

getNow(0) returns the result or Zero. Non-blocking call. In general don`t bother with it.

And the result looks like this:

Define a custom FJP

ForkJoinPool pool = new ForkJoinPool(10);
supplyAsync() -> compute(), pool

Comparision between Stream and CompletableFuture

Stream CompletableFuture
pipeline pipeline
lazy lazy
zero, one, or more data zero or one
only data channel data and error channel
forEach thenAccept
map thenApply
exceptions – oops error channel
(((zip))) thenCombine
flatMap thenCompose

Type interference automatically selects the right return value -> Pipeline

create()
.thenApply(data -> data * 10) // Int
.thenAccept(data -> System.out.println(data)) // Int
.thenRun(() -> System.out.println("That went well"); // Void

Pipeline example

CompletableFuture future = new CompletableFuture()
.thenApply(data -> data * 2)
.thenApply(data -> data + 1)

System.out.println("built the pipeline");

future.thenAccept(data -> System.out.println(data));

System.out.println("prepare pipeline");

sleep(1000);

future.complete(2);

sleep(1000);

Result: 2

Refactoring deliveres different result

CompletableFuture future = new CompletableFuture()
.thenApply(data -> data * 2)
.thenApply(data -> data + 1)
.thenAccept(data -> System.out.println(data));

System.out.println("built the pipeline");

System.out.println("prepare pipeline");

sleep(1000);

future.complete(2);

sleep(1000);

Result: 5

Error handling and recovery

public static int compute() {
// throw new RuntimeException("error");
return 2;
}

public static CompletableFuture create() {
return CompletableFuture.supplyAsync(() -> compute());
}

public static Void handleException(Throwable throwable) {
System.out.println("ERROR: " + throwable);
throw new RuntimeException("it is beyond any hope");
}

public static int handleException2(Throwable throwable) {
System.out.println("ERROR: " + throwable);
return -1; // recover from error track
}

public void static main(String args[]){
create()
.thenApply(data -> data *2)
.exceptionally(throwable -> handleException2(throwable)
.thenAccept(data -> System.out.println(data))
.exceptionally(throwable -> handleException(throwable);
}

If everything is ok it jumps to the next thenXXX method, if an error occurs it will call the nearest exceptionally()

completeExceptionally

public static int handleException(Throwable throwable) {
System.out.println("ERROR: " + throwable)
return 1
}

CompletableFuture future = new CompletableFuture()
.thenApply(data -> data * 2)
.exceptionally(throwable -> handleException(throwable))
.thenApply(data -> data + 1)
.thenAccept(data -> System.out.println(data));

System.out.println("built the pipeline");

sleep(1000);

if (Math.random() > 0,75) {
future.completeExceptionally(new RuntimeException("my error"); // Change here
} else {
future.complete(2);
}

sleep(1000);

System.out.println("DONE");

States
– Pending
– Resolve state (final)
– Reject (final)

How long are we in the pending state?

Quote:

Both in life and programming never do something without timeout

  • Java 8 has no timeout methods
  • Java 9 has methods to do a timeout:
    future.completeOnTimeout(0, 1, TimeUnit.SECONDS);
    future.orTimeout(2, TimeUnit.SECONDS);

combine Method

public static CompletableFuture create(int number) {
return CompletableFuture.supplyAsync(() -> number);
}

public main() {
create(2)
.thenCombine(create(3), (resualt1, result2) -> result1 + result2); // Joins the results from two Futures
.thenAccept(data -> System.out.println(data));
}

compose Method

Stream example

public static int func1(int number) {
return number *2;
}

public static int[] func2(int number) {
return new int[] { number -1, number +1};
}

public main() {
numbers.stream()
//.map(e -> func(e)) // func is a one to one mapping function
//.map(e -> func2(e)) // func is a one to many mapping function
.flatMap(e -> Stream.of(func2(e))
.forEach(System.Out::println);
}

map one-to-one Stream => Stream
map one-to-many Stream => Stream<List>
flatMap one-to-many Stream => Stream ??

Function returns data -> map
Function returns collection -> flatMap

Function returns data -> thenAccept
Function returns CompletableFuture -> thenCompose

public static CompletableFuture create(int number) {
return CompletableFuture.supplyAsync(() -> number;
}

public static CompletableFuture inc(int number) {
return CompletableFuture.supplyAsync(() -> number +1;
}

public main() {
create(2)
//.thenApply(data -> inc(data))
.thenCompose(data -> inc(data))
.thenAccept(result -> System.out.println(result));
}

If your function returns a CompletableFuture use thenCompose

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert.