Tuesday, September 19, 2017

Java 8 - Parallel data processing

In Java 7 , If you want to process data in parallel, you may consider the ForkJoinPool which is one of the threadpool factory with number of initial thread equalising the number of procesor of the computer :


Runtime.getRuntime().available-Processors() // show initial number of thread

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12"); // change the number of initial thread



In current multi-core PC world, parallel processing will  mostly increase the performence. And the way to implement the parallelism is as simple as just adding one methond parallel()  into the normal Stream. This parallel() will automaticly make use of the ForkJoinPool behind and dynamicly assign the data into threads.


public static long parallelSum(long n) {
     return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(0L, Long::sum);
}

On the contrary , if you want to abandon parallel(), a sequential() will help you switch back smoothly. Well , In practice, whether to choose sequential() , parallel() or normal for loop , all depend on practical measurement, There is no one golden rule saying that parallel() is the best, you must measure the time and choose which one to apply.


long start = System.nanoTime();
// apply batch calculation here
long duration = (System.nanoTime() - start) / 1_000_000;

In most of the cases , the efficiency lost in the parallel processing is due to autobox problems,  one of the methods to increse the paralle() is to avoid the autobox using LongStream, IntStream... This is important!


public static long parallelRangedSum(long n) {
    return LongStream.rangeClosed(1, n).parallel().reduce(0L, Long::sum);
}

ForkJoinPool

Above, I have mentioned the ForkJoinPool in Java 7 and the parallel() in Java 8 is implemented by it . So how do people use this ForkJoinPool? Answer :  extends RecursiveTask<T>
public class ForkJoinSumCalculator extends RecursiveTask<Long> {
       // constructors and fields .... 
@Override protected Long compute() { // rewrite abstract method if (length <= 1000) { // Don't use parallel } ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(20000, 1, 10000); leftTask.fork(); // start left half tasks asyncronously in another ForkJoinPool ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(20000, 10001, 20000); Long rightResult = rightTask.compute(); // start the right half tasks Long leftResult = leftTask.join(); // read the left half tasks result. blocking return leftResult + rightResult; // combine } }

Parallel() is not only implemented by ForkJoinPool but also with the help of Spliterator. With these 2 weapons, parallel() in Java 8 can flexibly split big task ,compute and join together to return values.

Spliterator

Spliterator looks similar to iterator in versions before Java 8. It also helps to iterate data but in parallel way.


public interface Spliterator<T> {
   boolean tryAdvance(Consumer<? super T> action); // iterate all elements Consumer provides
   Spliterator<T> trySplit(); // split data into another Spliterator
   long estimateSize(); // return the size of collection
   int characteristics();
}

Ok, I am still working to produce an example , I will give it out later.

No comments:

Post a Comment

Add Loading Spinner for web request.

when web page is busily loading. normally we need to add a spinner for the user to kill their waiting impatience. Here, 2 steps we need to d...