Parallel Mutable Reduction

Figure 16.14 shows the stream pipeline from Figure 16.13, where the sequential stream (2a) has been replaced by a parallel stream (2b); in other words, the collect() method is called on a parallel stream. One possible parallel execution of the pipeline is also depicted in Figure 16.14b. We see five instances of the pipeline being executed in parallel. The supplier creates five empty ArrayLists that are used as partial result containers by the accumulator, and are later merged by the combiner to a final result container. The containers created by the supplier are mutated by the accumulator and the combiner to perform mutable reduction. The partial result containers are also merged in parallel by the combiner. It is instructive to contrast this combiner with the combiner for parallel functional reduction that is illustrated in Figure 16.12, p. 963.

Figure 16.14 Parallel Mutable Reduction

In Example 16.12, the stream pipeline at (7) also creates a list containing the number of tracks on each CD, where the stream is parallel, and the lambda expressions implementing the argument functions of the collect() method are augmented with print statements so that actions of the functions can be logged. The output from this parallel mutable reduction shows that the combiner is executed multiple times to merge partial result lists. The actions of the argument functions shown in the output are the same as those illustrated in Figure 16.14b. Of course, multiple runs of the pipeline can show different sequences of operations in the output, but the final result in the same. Also note that the elements retain their relative position in the partial result lists as these are combined, preserving the encounter order of the stream.

Although a stream is executed in parallel to perform mutable reduction, the merging of the partial containers by the combiner can impact performance if this is too costly. For example, merging mutable maps can be costly compared to merging mutable lists. This issue is further explored for parallel streams in §16.9, p. 1009.

Example 16.12 Implementing Mutable Reductions

Click here to view code image

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Stream;
public final class Collecting {
  public static void main(String[] args) {
    // Query: Create a list with the number of tracks on each CD.
    System.out.println(“Sequential Mutable Reduction:”);
    List<Integer> tracks = CD.cdList                          // (1)
        .stream()                                             // (2a)
//      .parallelStream()                                     // (2b)
        .map(CD::noOfTracks)                                  // (3)
        .collect(() -> new ArrayList<>(),                     // (4) Supplier
                 (cont, noOfTracks) -> cont.add(noOfTracks),  // (5) Accumulator
                 (cont1, cont2) -> cont1.addAll(cont2));      // (6) Combiner
//      .collect(ArrayList::new, ArrayList::add, ArrayList::addAll); // (6a)
//      .toList();
    System.out.println(“Number of tracks on each CD (sequential): ” + tracks);
    System.out.println();
    System.out.println(“Parallel Mutable Reduction:”);
    List<Integer> tracks1 = CD.cdList                         // (7)

//      .stream()                                             // (8a)
        .parallelStream()                                     // (8b)
        .map(CD::noOfTracks)                                  // (9)
        .collect(                                             // (10)
            () -> {                                           // (11) Supplier
              System.out.println(“Supplier: Creating an ArrayList”);
              return new ArrayList<>();
            },
            (cont, noOfTracks) -> {                           // (12) Accumulator
              System.out.printf(“Accumulator: cont:%s, noOfTracks:%s”,
                                 cont, noOfTracks);
              cont.add(noOfTracks);
              System.out.printf(“, mutCont:%s%n”, cont);
            },
            (cont1, cont2) -> {                               // (13) Combiner
              System.out.printf(“Combiner: con1:%s, cont2:%s”, cont1, cont2);
              cont1.addAll(cont2);
              System.out.printf(“, mutCont:%s%n”, cont1);
            });
    System.out.println(“Number of tracks on each CD (parallel): ” + tracks1);
    System.out.println();

    // Query: Create an ordered set with CD titles, according to natural order.
    Set<String> cdTitles = CD.cdList                          // (14)
        .stream()
        .map(CD::title)
        .collect(TreeSet::new, TreeSet::add, TreeSet::addAll);// (15)
    System.out.println(“CD titles: ” + cdTitles);
    System.out.println();

    // Query: Go bananas.
    StringBuilder goneBananas = Stream                        // (16)
        .iterate(“ba”, b -> b + “na”)                         // (17)
        .limit(5)
        .peek(System.out::println)
        .collect(StringBuilder::new,                          // (18)
                 StringBuilder::append,
                 StringBuilder::append);
    System.out.println(“Go bananas: ” + goneBananas);
  }
}

Possible output from the program:

Click here to view code image

Sequential Mutable Reduction:
Number of tracks on each CD (sequential): [8, 6, 10, 8, 10]

Parallel Mutable Reduction:
Supplier: Creating an ArrayList
Accumulator: cont:[], noOfTracks:8, mutCont:[8]
Supplier: Creating an ArrayList
Accumulator: cont:[], noOfTracks:6, mutCont:[6]
Combiner: con1:[8], cont2:[6], mutCont:[8, 6]
Supplier: Creating an ArrayList
Accumulator: cont:[], noOfTracks:10, mutCont:[10]
Supplier: Creating an ArrayList

Accumulator: cont:[], noOfTracks:8, mutCont:[8]
Combiner: con1:[10], cont2:[8], mutCont:[10, 8]
Supplier: Creating an ArrayList
Accumulator: cont:[], noOfTracks:10, mutCont:[10]
Combiner: con1:[10, 8], cont2:[10], mutCont:[10, 8, 10]
Combiner: con1:[8, 6], cont2:[10, 8, 10], mutCont:[8, 6, 10, 8, 10]
Number of tracks on each CD (parallel): [8, 6, 10, 8, 10]

CD titles: [Hot Generics, Java Jam, Java Jive, Keep on Erasing, Lambda Dancing]
ba
bana
banana
bananana
banananana
Go bananas: babanabananabanananabanananana

Example 16.12 also shows how other kinds of containers can be used for mutable reduction. The stream pipeline at (14) performs mutable reduction to create an ordered set with CD titles. The supplier is implemented by the constructor reference TreeSet::new. The constructor will create a container of type TreeSet<String> that will maintain the CD titles according to the natural order for Strings. The accumulator and the combiner are implemented by the method references TreeSet::add and TreeSet::addAll, respectively. The accumulator will add a title to a container of type TreeSet<String> and the combiner will merge the contents of two containers of type TreeSet<String>.

In Example 16.12, the mutable reduction performed by the stream pipeline at (16) uses a mutable container of type StringBuilder. The output from the peek() method shows that the strings produced by the iterate() method start with the initial string “ba” and are iteratively concatenated with the postfix “na”. The limit() intermediate operation truncates the infinite stream to five elements. The collect() method appends the strings to a StringBuilder. The supplier creates an empty StringBuilder. The accumulator and the combiner append a CharSequence to a StringBuilder. In the case of the accumulator, the CharSequence is a String—that is, a stream element—in the call to the append() method. But in the case of the combiner, the CharSequence is a StringBuilder—that is, a partial result container when the stream is parallel. One might be tempted to use a string instead of a StringBuilder, but that would not be a good idea as a string is immutable.

Note that the accumulator and combiner of the collect() method do not return a value. The collect() method does not terminate if applied to an infinite stream, as the method will never finish processing all the elements in the stream.

Because mutable reduction uses the same mutable result container for accumulating new results by changing the state of the container, it is more efficient than a functional reduction where a new partial result always replaces the previous partial result.

Leave a Reply

Your email address will not be published. Required fields are marked *