A few weeks ago I took part in the DevOxx UK Deep Dive Session Parallel and Asynchronous Programming with Streams and CompletableFuture hold by Venkat Subramaniam (@venkat_s). I did write a lot of notes. So here they are 😀
There also is a video of the full session on Youtube
Parallel Streams
- Collection Pipeline Pattern
- Functional Composition
-
Imperative style
- has accidental complexity
- Synchronize and suffer model
-
Structure of sequential code very different to structure of concurent code
-
Functional style
- has less complexity and easier to parallelize
- Structure of sequential code is identical to structure of concurent code
- „Think“ when using parallelStream
- Parallelyze with
.parallel()
if we are not the owner of the stream. - If
parallel
is followed bysequential
, the last one wins
Streams vs Reactive Streams
Streams | Reactive Streams |
---|---|
sequential vs parallel | sync vs async |
entire pipelin is sequential or parallel -> no segments | Depends |
subscibeOn -> no Segments |
|
observeOn -> segments |
We solve one set of problems only to create a new set of problems
Example
- Java 1 – Threads
- Java 5 – ExecutorService
- Pool induced deadlock
- Java 7 – Fork Join Pattern
- Workstealing
Common FJP
- Default for parallelStreams
- one Process is executed by the
main
-Thread
Ordering a stream
- Some methods are inherantly ordered
- Some methods are unordered but may have an ordered counterpart
- e.x.
forEachOrdered
-> Is parallel but sorts at the end
.reduce
in streams
- aggregate in other languages
- to do it in parallel it’s broken on „rows“ and then the „rows“ are aggregated
- reduce does not take an inital value, it takes an „identity“ value
- For
int
+identity
the identity is 0! x+0 = x - For
int
*identity
the identity is 1! x*1 = x - What we work with should be a monoid
How many threads
How many threads should I create?
– Computation intensiv
– Threads <= number of cores
– IO intensiv
– Threads may be greater than number of cores
Formula
Number of Cores
T <= ————————- > 1 – blocking factor
0 <= blocking factor < 1 #### Fork Join Pool Thread Count „`java Runtime.getRuntime().availableProcessors() // 8 Cores ForkJoinPool.commonPool() // Just 7 (its 7 + the Main Thread => 8 Threads)
### Configure the number of threads
#### JVM flag
<img class="alignnone size-full wp-image-32" src="https://blog.qfotografie.de/wp-content/uploads/2018/10/4config-jvm.jpg" alt="" width="858" height="482" />
Do not use this flag!
#### Programatic
```java
ForkJoinPool pool = new ForkJoinPool(100);
pool.submit(() -> stream.forEach(e -> {}));
pool.shutdown();
pool.awaitTermination(10, TimeUnit.SECONDS);
Quote
Parallel will probably use more resources but will deliver faster
CompletableFuture
- Non blocking
Bad example
Future<?> future = call();
future.get();
Callbacks
- lacks consistency
- hard to compose
- hard to deal with errors
Promises
- may resolve, reject or be pending
- data channel and error channel -> FOTO
- errors are first class citizens
- reactive streams offer a third channel: result channel
- failure / error is like data
- easy to compose
CompletableFuture
in Java is Promises
in JavaScript
Stages
- One stages completes and another stage may run (CompletableFuture after CompletableFuture)
Famous or popular functional interfaces
// Used in Stream API
Supplier T get() // factories
Predicat boolean test(T) // filter
Function<T,R> R apply(T) // map
Consumer void accept(T) // forEach
Example
public static CompletableFuture create() {
return CompletableFuture.supplyAsync(() -> compute());
}
CompletableFuture future = create();
CompletableFuture future2 = future.thenAccept(data -> System.out.println(data));
Easier
create()
.thenAccept(data -> System.out.println(data))
.thenRun(() -> System.out.println("This never dies")) // Runable interface
.thenRun(() -> System.out.println("Realy, this never dies"))
[...]
Bad idea
create().get() // blocking call!
The best thing to do with
get()
is to forget!
getNow(0)
returns the result or Zero. Non-blocking call. In general don`t bother with it.
And the result looks like this:
Define a custom FJP
ForkJoinPool pool = new ForkJoinPool(10);
supplyAsync() -> compute(), pool
Comparision between Stream and CompletableFuture
Stream | CompletableFuture |
---|---|
pipeline | pipeline |
lazy | lazy |
zero, one, or more data | zero or one |
only data channel | data and error channel |
forEach | thenAccept |
map | thenApply |
exceptions – oops | error channel |
(((zip))) | thenCombine |
flatMap | thenCompose |
Type interference automatically selects the right return value -> Pipeline
create()
.thenApply(data -> data * 10) // Int
.thenAccept(data -> System.out.println(data)) // Int
.thenRun(() -> System.out.println("That went well"); // Void
Pipeline example
CompletableFuture future = new CompletableFuture()
.thenApply(data -> data * 2)
.thenApply(data -> data + 1)
System.out.println("built the pipeline");
future.thenAccept(data -> System.out.println(data));
System.out.println("prepare pipeline");
sleep(1000);
future.complete(2);
sleep(1000);
Result: 2
Refactoring deliveres different result
CompletableFuture future = new CompletableFuture()
.thenApply(data -> data * 2)
.thenApply(data -> data + 1)
.thenAccept(data -> System.out.println(data));
System.out.println("built the pipeline");
System.out.println("prepare pipeline");
sleep(1000);
future.complete(2);
sleep(1000);
Result: 5
Error handling and recovery
public static int compute() {
// throw new RuntimeException("error");
return 2;
}
public static CompletableFuture create() {
return CompletableFuture.supplyAsync(() -> compute());
}
public static Void handleException(Throwable throwable) {
System.out.println("ERROR: " + throwable);
throw new RuntimeException("it is beyond any hope");
}
public static int handleException2(Throwable throwable) {
System.out.println("ERROR: " + throwable);
return -1; // recover from error track
}
public void static main(String args[]){
create()
.thenApply(data -> data *2)
.exceptionally(throwable -> handleException2(throwable)
.thenAccept(data -> System.out.println(data))
.exceptionally(throwable -> handleException(throwable);
}
If everything is ok it jumps to the next thenXXX
method, if an error occurs it will call the nearest exceptionally()
completeExceptionally
public static int handleException(Throwable throwable) {
System.out.println("ERROR: " + throwable)
return 1
}
CompletableFuture future = new CompletableFuture()
.thenApply(data -> data * 2)
.exceptionally(throwable -> handleException(throwable))
.thenApply(data -> data + 1)
.thenAccept(data -> System.out.println(data));
System.out.println("built the pipeline");
sleep(1000);
if (Math.random() > 0,75) {
future.completeExceptionally(new RuntimeException("my error"); // Change here
} else {
future.complete(2);
}
sleep(1000);
System.out.println("DONE");
States
– Pending
– Resolve state (final)
– Reject (final)
How long are we in the pending state?
Quote:
Both in life and programming never do something without timeout
- Java 8 has no timeout methods
- Java 9 has methods to do a timeout:
future.completeOnTimeout(0, 1, TimeUnit.SECONDS);
future.orTimeout(2, TimeUnit.SECONDS);
combine Method
public static CompletableFuture create(int number) {
return CompletableFuture.supplyAsync(() -> number);
}
public main() {
create(2)
.thenCombine(create(3), (resualt1, result2) -> result1 + result2); // Joins the results from two Futures
.thenAccept(data -> System.out.println(data));
}
compose Method
Stream example
public static int func1(int number) {
return number *2;
}
public static int[] func2(int number) {
return new int[] { number -1, number +1};
}
public main() {
numbers.stream()
//.map(e -> func(e)) // func is a one to one mapping function
//.map(e -> func2(e)) // func is a one to many mapping function
.flatMap(e -> Stream.of(func2(e))
.forEach(System.Out::println);
}
map one-to-one Stream => Stream
map one-to-many Stream => Stream<List>
flatMap one-to-many Stream => Stream ??
Function returns data -> map
Function returns collection -> flatMap
Function returns data -> thenAccept
Function returns CompletableFuture -> thenCompose
public static CompletableFuture create(int number) {
return CompletableFuture.supplyAsync(() -> number;
}
public static CompletableFuture inc(int number) {
return CompletableFuture.supplyAsync(() -> number +1;
}
public main() {
create(2)
//.thenApply(data -> inc(data))
.thenCompose(data -> inc(data))
.thenAccept(result -> System.out.println(result));
}
If your function returns a CompletableFuture use thenCompose