We can again augment the accumulator with print statements as shown at (16) in Example 16.11. The output at (5) shows that the number of tracks from the first CD was used as the initial value before the accumulator is applied repeatedly to the rest of the values.

Example 16.11 Implementing Functional Reductions

Click here to view code image

import static java.lang.System.out;
import java.util.Comparator;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.function.BinaryOperator;
public final class FunctionalReductions {
  public static void main(String[] args) {
// Two-argument reduce() method:
  {
    out.println(“(1) Find total number of tracks (loop-based version):”);
    int sum = 0;                           // (1) Initialize the partial result.
    for (CD cd : CD.cdList) {              // (2) Iterate over the list.
      int numOfTracks = cd.noOfTracks();   // (3) Get the next value.
      sum = sum + numOfTracks;             // (4) Calculate new partial result.
    }
    out.println(“Total number of tracks: ” + sum);
  }
    out.println(“(2) Find total number of tracks (stream-based version):”);
    int totNumOfTracks = CD.cdList                         // (5)
        .stream()                                          // (6)
        .mapToInt(CD::noOfTracks)                          // (7)
        .reduce(0,                                         // (8)
                (sum, numOfTracks) -> sum + numOfTracks);  // (9)
    //  .reduce(0, (sum, noOfTracks) -> Integer.sum(sum, noOfTracks));
    //  .reduce(0, Integer::sum);
    //  .sum();
    out.println(“Total number of tracks: ” + totNumOfTracks);
    out.println();
    out.println(“(3) Find total number of tracks (accumulator logging): “);
    int totNumOfTracks1 = CD.cdList                        // (10)
        .stream()
        .mapToInt(CD::noOfTracks)
        .reduce(0,                                         // (11)
            (sum, noOfTracks) -> {                         // (12)
                int newSum = sum + noOfTracks;
                out.printf(“Accumulator: sum=%2d, noOfTracks=%2d, newSum=%2d%n”,
                            sum, noOfTracks, newSum);
                return newSum;
            }
         );
    out.println(“Total number of tracks: ” + totNumOfTracks1);
    out.println();
// One-argument reduce() method:
    out.println(“(4) Find total number of tracks (stream-based version):”);
    OptionalInt optSumTracks0 = CD.cdList                  // (13)
        .stream()
        .mapToInt(CD::noOfTracks)
        .reduce(Integer::sum);                             // (14)
    out.println(“Total number of tracks: ” + optSumTracks0.orElse(0));
    out.println();
    out.println(“(5) Find total number of tracks (accumulator logging): “);
    OptionalInt optSumTracks1 = CD.cdList                  // (15)
        .stream()
        .mapToInt(CD::noOfTracks)
        .reduce((sum, noOfTracks) -> {                     // (16)
           int newSum = sum + noOfTracks;
           out.printf(“Accumulator: sum=%2d, noOfTracks=%2d, newSum=%2d%n”,
                       sum, noOfTracks, newSum);
           return newSum;
         });
    out.println(“Total number of tracks: ” + optSumTracks1.orElse(0));
    out.println();
// Three-argument reduce() method:
    out.println(“(6) Find total number of tracks (accumulator + combiner): “);
    Integer sumTracks5 = CD.cdList                         // (17)
    //  .stream()                                          // (18a)
        .parallelStream()                                  // (18b)
        .reduce(Integer.valueOf(0),                        // (19) Initial value
                (sum, cd) -> sum + cd.noOfTracks(),        // (20) Accumulator
                (sum1, sum2) -> sum1 + sum2);              // (21) Combiner
    out.println(“Total number of tracks: ” + sumTracks5);
    out.println();
    out.println(“(7) Find total number of tracks (accumulator + combiner): “);
    Integer sumTracks6 = CD.cdList                         // (22)
//      .stream()                                          // (23a)
        .parallelStream()                                  // (23b)
        .reduce(0,
               (sum, cd) -> {                              // (24) Accumulator
                 Integer noOfTracks = cd.noOfTracks();
                 Integer newSum = sum + noOfTracks;
                 out.printf(“Accumulator: sum=%2d, noOfTracks=%2d, “
                            + “newSum=%2d%n”, sum, noOfTracks, newSum);
                 return newSum;
               },
               (sum1, sum2) -> {                           // (25) Combiner
                 Integer newSum = sum1 + sum2;
                 out.printf(“Combiner: sum1=%2d, sum2=%2d, newSum=%2d%n”,
                            sum1, sum2, newSum);
                 return newSum;
               }
         );
    out.println(“Total number of tracks: ” + sumTracks6);
    out.println();
    // Compare by CD title.
    Comparator<CD> cmpByTitle = Comparator.comparing(CD::title);    // (26)
    BinaryOperator<CD> maxByTitle =
        (cd1, cd2) -> cmpByTitle.compare(cd1, cd2) > 0 ? cd1 : cd2; // (27)
    // Query: Find maximum Jazz CD by title:
    Optional<CD> optMaxJazzCD = CD.cdList                  // (28)
        .stream()
        .filter(CD::isJazz)
        .reduce(BinaryOperator.maxBy(cmpByTitle));         // (29a)
    //  .reduce(maxByTitle);                               // (29b)
    //  .max(cmpByTitle);                                  // (29c)
    optMaxJazzCD.map(CD::title).ifPresent(out::println);// Keep on Erasing
  }
}

Possible output from the program:

Click here to view code image

(1) Find total number of tracks (loop-based version):
Total number of tracks: 42
(2) Find total number of tracks (stream-based version):
Total number of tracks: 42
(3) Find total number of tracks (accumulator logging):
Accumulator: sum= 0, noOfTracks= 8, newSum= 8
Accumulator: sum= 8, noOfTracks= 6, newSum=14
Accumulator: sum=14, noOfTracks=10, newSum=24
Accumulator: sum=24, noOfTracks= 8, newSum=32
Accumulator: sum=32, noOfTracks=10, newSum=42
Total number of tracks: 42
(4) Find total number of tracks (stream-based version):
Total number of tracks: 42
(5) Find total number of tracks (accumulator logging):
Accumulator: sum= 8, noOfTracks= 6, newSum=14
Accumulator: sum=14, noOfTracks=10, newSum=24
Accumulator: sum=24, noOfTracks= 8, newSum=32
Accumulator: sum=32, noOfTracks=10, newSum=42
Total number of tracks: 42
(6) Find total number of tracks (accumulator + combiner):
Total number of tracks: 42
(7) Find total number of tracks (accumulator + combiner):
Accumulator: sum= 0, noOfTracks=10, newSum=10
Accumulator: sum= 0, noOfTracks=10, newSum=10
Accumulator: sum= 0, noOfTracks= 8, newSum= 8
Combiner: sum1= 8, sum2=10, newSum=18
Combiner: sum1=10, sum2=18, newSum=28
Accumulator: sum= 0, noOfTracks= 6, newSum= 6
Accumulator: sum= 0, noOfTracks= 8, newSum= 8
Combiner: sum1= 8, sum2= 6, newSum=14
Combiner: sum1=14, sum2=28, newSum=42
Total number of tracks: 42

Keep on Erasing

The single-argument and two-argument reduce() methods accept a binary operator as the accumulator whose arguments and result are of the same type. The three-argument reduce() method is more flexible and can only be applied to objects. The stream pipeline below computes the total number of tracks on CDs using the three-argument reduce() method.

Click here to view code image

Integer sumTracks5 = CD.cdList                           // (17)
    .stream()                                            // (18a)
//  .parallelStream()                                    // (18b)
    .reduce(Integer.valueOf(0),                          // (19) Initial value
            (sum, cd) -> sum + cd.noOfTracks(),          // (20) Accumulator
            (sum1, sum2) -> sum1 + sum2);                // (21) Combiner

The reduce() method above accepts the following arguments:

  • An identity value: Its type is U. In this case, it is an Integer that wraps the value 0. As before, it is used as the initial value. The type of the value returned by the reduce() method is also U.
  • An accumulator: It is a BiFunction<U,T,U>; that is, it is a binary function that accepts an object of type U and an object of type T and produces a result of type U. In this case, type U is Integer and type T is CD. The lambda expression implementing the accumulator first reads the number of tracks from the current CD before the addition operator is applied. Thus the accumulator will calculate the sum of Integers which are, of course, unboxed and boxed to do the calculation. As we have seen earlier, the accumulator is repeatedly applied to sum the tracks on the CDs. Only this time, the mapping of a CD to an Integer is done when the accumulator is evaluated.
  • A combiner: It is a BinaryOperator<U>; that is, it is a binary operator whose arguments and result are of the same type U. In this case, type U is Integer. Thus the combiner will calculate the sum of Integers which are unboxed and boxed to do the calculation.

In the code above, the combiner is not executed if the reduce() method is applied to a sequential stream. However, there is no guarantee that this is always the case for a sequential stream. If we uncomment (18b) and remove (18a), the combiner will be executed on the parallel stream.

That the combiner in the three-argument reduce() method is executed for a parallel stream is illustrated by the stream pipeline at (22) in Example 16.11, that has been augmented with print statements. There is no output from the combiner when the stream is sequential. The output at (7) in Example 16.11 shows that the combiner accumulates the partial sums created by the accumulator when the stream is parallel.

Leave a Reply

Your email address will not be published. Required fields are marked *