In a recent article, I mentioned my 2020 New Year's resolution: no more loops in Java. In that article, I chose a common (and simplified) forest management calculation—determining whether an area is forested, based on a legal definition, by calculating the proportion of ground shaded by tree canopies.
From a data collection point of view, this requires sampling the area and then estimating the proportion covered by tree canopies from that sample. Traditionally, sampling is conducted first by reviewing the area in aerial photographs or satellite images and dividing the area into units that appear to have approximately uniform vegetation characteristics. These units are called strata (plural of stratum). Then a collection of randomly located points is generated within each stratum. At each point is located a sample, typically a circle or rectangle of specific dimensions, and all trees within each sample are measured in the field. Then, back in the office, sample values are totaled, stratum averages are calculated, and these averages are weighted into a total average for the area.
In traditional imperative Java programming style, this would have required several loops: one to read the stratum definitions, another to read in the field sample data, another to sum the area covered by tree canopies in the samples, another to calculate the stratum averages of those samples, and a final one to calculate the weighted averages of the strata for the total area.
In my previous article, I explained how to use Java Streams to replace each one of those loops with a sequence of map and reduce function calls. The Java interface java.util.stream defines two distinct kinds of reduce functions (which, in my sample calculation, take the form of accumulators):
- reduce(), which yields an immutable partial accumulation upon consuming each item in the stream
- collect(), which yields a mutable partial accumulation upon consuming each item in the stream
The advantage to using collect() is that there is less overhead: a new immutable partial result is not generated and then discarded in each step of the accumulation; instead, the existing partial result has the new data element accumulated into it.
As I worked on my sample calculation, I found myself learning about collect() in what seems to me to be a common and unsatisfying way: all the examples and tutorials I could find were based on toy problems that accumulate one data item at a time; moreover, all were structured as little recipes that use existing predefined functionality that seemed to be useful only in this limited case of "accumulating one data item at a time." I kept getting in deeper and deeper water as I proceeded through the programming until I wasn't sure that I understood the whole Java Streams framework sufficiently to really be able to use it.
So I decided to revisit my code, trying to understand in detail what was going on "under the hood" and to expose a bit more of the mechanisms involved in a more consistent and coherent fashion. Read on for a summary of the revisions I made.
Collecting maps of complex things
Previously, I used a call to collect() to convert input lines containing stratum number in the first column and stratum area in the second column to a Map<Integer,Double>:
final Map<Integer,Double> stratumDefinitionTable = inputLineStream
.skip(1) // skip the column headings
.map(l -> l.split("\\|")) // split the line into String[] fields
.collect(
Collectors.toMap(
a -> Integer.parseInt(a[0]), // (1)
a -> Double.parseDouble(a[1]) // (2)
)
);
Code comment (1) above marks the definition of the key (the integer stratum number) and comment (2) marks the definition of the value (the double stratum area).
In more detail, the (static) convenience method java.util.stream.Collectors.toMap() creates a Collector that initializes the map and populates it with the map entries while processing the input data. Strictly speaking, this isn't accumulation… but anyway.
But what if there was more info to collect than just the stratum area? For example, what if I want to include a text label along with the area to use in the output?
To solve this problem, I might first define a class like this, which would hold all of the information about the stratum:
class StratumDefinition {
private int number;
private double ha;
private String label;
public StratumDefinition(int number, double ha, String label) {
this.number = number;
this.ha = ha;
this.label = label;
}
public int getNumber() { return this.number; }
public double getHa() { return this.ha; }
public String getLabel() { return this.label; }
}
Then, once StratumDefinition is declared, I can use code similar to the following to carry out the "accumulation" (changes highlighted in green text):
final Map<Integer,StratumDefinition> stratumDefinitionTable = inputLineStream
.skip(1) // skip the column headings
.map(l -> l.split("\\|")) // split the line into String[] fields
.collect(
Collectors.toMap(
a-> Integer.parseInt(a[0]),
a-> new StratumDefinition(Integer.parseInt(a[0]),Double.parseDouble(a[1]), a[2])
)
);
Now the code is much more general-purpose, as I can change the columns in the stratum definition file and the fields and methods in the StratumDefinition class to match without needing to change the Streams processing logic.
Note that I probably don't need to keep the stratum number both as the key and in the value stored in each map entry; however, this way if I decide later to process the map entry values as a stream, I get the stratum number for free, without any gymnastics to fetch the key.
Collecting subtotals of several data items by group and subgroup
Previously, I used a call to collect() to accumulate each individual tree canopy area into the total proportion covered for each sample in each stratum, a map of maps Map<Integer, Map<Integer,Double>>:
final Map<Integer,Map<Integer,Double>> sampleValues = inputLineStream
.skip(1)
.map(l -> l.split("\\|"))
.collect(
Collectors.groupingBy(a -> Integer.parseInt(a[0]), // (1)
Collectors.groupingBy(b -> Integer.parseInt(b[1]), // (2)
Collectors.summingDouble( // (3)
c -> {
double rm = (Double.parseDouble(c[5]) + Double.parseDouble(c[6]))/4d;
return rm*rm * Math.PI / 500d; // (4)
})
)
)
);
Code comment (1) above marks where the top-level key is defined—the stratum number. Comment (2) marks the definition of the second-level key—the sample number, and comment (3) accumulates the stream of double values calculated in (4).
In more detail, the (static) convenience method java.util.stream.Collectors.groupingBy() creates a Collector that subsets the stream according to the value returned by the first argument and applies the Collector given as the second argument. In the above example, there are two levels of grouping, first by stratum, then by sample (within stratum). The inner groupingBy() uses java.util.stream.Collectors.summingDouble() to create a Collector that initializes the sum and accumulates each tree's proportional contribution to the total coverage within the sample.
Notice in the above that summingDouble() is a handy shortcut if you want to sum up just one number. But, remembering that I have recorded the species, trunk diameter, crown diameter, and height for each tree measured, what if I want to accumulate figures related to all of those measurements?
To solve this problem, I need to define a pair of classes, one to wrap the measurement information, which might look something like this:
class Measurement {
private int stratum, sample, tree;
private String species;
private double ha, basalDiameter, crownArea, height;
public Measurement(int stratum, int sample, double ha, int tree, String species,
double basalDiameter, double crownDiameter1, double crownDiameter2, double height) {
...
}
public int getStratum() { return this.stratum; }
public int getSample() { return this.sample; }
public double getHa() { return this.ha; }
public int getTree() { return this.tree; }
public String getSpecies() { return this.species; }
public double getBasalDiameter() { return this.basalDiameter; }
public double getCrownArea() { return this.crownArea; }
public double getHeight() { return this.height; }
}
and one to accumulate the information into the sample totals, which might look something like this:
class SampleAccumulator implements Consumer<Measurement> {
private double ...;
public SampleAccumulator() {
...
}
public void accept(Measurement m) {
...
}
public void combine(SampleAccumulator other) {
...
}
...
}
Note that the SampleAccumulator implements the interface java.util.function.Consumer<T>. This isn't strictly necessary; I could design this class "freehand" as long as I end up providing functionality similar to that required to build my Collector, which I will show below.
Then I could use code similar to the original to carry out the accumulation into instances of SampleAccumulator (changes highlighted in green text):
final Map<Integer,Map<Integer,SampleAccumulator>> sampleAccumulatorTable = inputLineStream
.skip(1)
.map(l -> l.split("\\|"))
.map(a -> new Measurement(Integer.parseInt(a[0]), Integer.parseInt(a[1]),
Double.parseDouble(a[2]), Integer.parseInt(a[3]), a[4], Double.parseDouble(a[5]),
Double.parseDouble(a[6]), Double.parseDouble(a[7]), Double.parseDouble(a[8])))
.collect(
Collectors.groupingBy(Measurement::getStratum,
Collectors.groupingBy(Measurement::getSample,
Collector.of(
SampleAccumulator::new,
(smpAcc, msrmt) -> smpAcc.accept(msrmt),
(smpAcc1, smpAcc2) -> {
smpAcc1.combine(smpAcc2);
return smpAcc1;
},
Collector.Characteristics.UNORDERED
)
)
)
);
Note the two big changes created by using the two new classes above:
- It inserts a second call to java.util.stream.map() using a lambda to create a new instance of Measurement with the values parsed out of the String array of data fields.
- It replaces the use of java.util.stream.Collectors.summingDouble() that created a "collector of doubles," which accumulates only one number at a time, with java.util.stream.Collector.of() to create a "collector of SampleAccumulators," which accumulates an arbitrary number of numbers at a time.
Once again, the resulting code is much more general purpose: I can change the sample data file and the fields in the Measurement and SampleAccumulator classes to manage different input data items without having to mess with the stream-processing code.
Perhaps I'm slow, but it took me a while to get my head around the correspondence between the types of arguments to the of() method and the actual lambda parameters. For example, the third argument to of() defines the "combiner" function, which is of type BinaryOperator<A>. Although the name of the type is suggestive, it's important to actually look up the definition to learn that it takes two arguments of type A and returns a value of type A, which is the combination of the arguments. In passing, I should emphasize that this is different behavior than the "combine" method of the java.util.function.Consumer<T>, which takes one argument of type T and combines it with the instance.
Once I figured this out, I realized that I had essentially defined a version of Collector.of() that takes a Consumer as an argument… too bad that this isn't built into the java.util.stream.Collector interface; it (now) seems like an obvious omission to me.
The rest of the code
The remaining code from the previous example uses the version of collect() that takes three arguments: a supplier, an accumulator, and a combiner. The StratumAccumulator and TotalAccumulator classes both implement the interface java.util.function.Consumer<T> and, therefore, provide these three functions.
In the case of StratumAccumulator, I see:
.collect(
() -> new StratumAccumulator(stratumDefinitionTable.get(e.getKey()).getHa()),
StratumAccumulator::accept,
StratumAccumulator::combine)
and in the case of TotalAccumulator:
.collect(
TotalAccumulator::new,
TotalAccumulator::accept,
TotalAccumulator::combine)
For both of these, the only work necessary is to further elaborate the StratumAccumulator and TotalAccumulator classes to incorporate the additional fields and accumulation steps.
However, for symmetry, it's also possible to rewrite those to use Collector.of() as the argument for the collect() calls (for those of us who like to apply a common approach when possible).
Then, for StratumAccumulator, I see:
.collect(
Collector.of(
() -> new StratumAccumulator(stratumDefinitionTable.get(e.getKey()).getHa()),
(strAcc, smpAcc) -> strAcc.accept(smpAcc),
(strAcc1, strAcc2) -> {
strAcc1.combine(strAcc2);
return strAcc1;
},
Collector.Characteristics.UNORDERED
)
)
and for TotalAccumulator:
.collect(
Collector.of(
TotalAccumulator::new,
(totAcc, strAcc) -> totAcc.accept(strAcc),
(totAcc1, totAcc2) -> {
totAcc1.combine(totAcc2);
return totAcc1;
},
Collector.Characteristics.UNORDERED
)
)
Is this better? Well, maybe, since it uses the same pattern for each call to collect(), but it's also wordier. You be the judge. Maybe I should bite the bullet and implement java.util.stream.Collector instead of java.util.function.Consumer.
In conclusion
When I turned my single-purpose application into something more general that processed all the available data, I discovered myself learning a lot more about both collect() and Collectors. In particular, the need to accumulate more than one value as I processed the input streams meant I had to throw out those handy and tempting special-purpose Collectors defined in java.util.stream.Collectors and learn how to build my own. In the end, I suppose it wasn't that hard, but the jump from (for example) using java.util.stream.Collectors.summingDouble() to accumulate a stream of double values to rolling my own Collector with Collector.of() in order to accumulate a stream of tuples was, well, a real jump, at least for me.
I think there are at least two things that would make life a lot easier for Java Streams users:
- A version of java.util.stream.Collectors.groupingBy() that accepts a "classifier" and three arguments corresponding to the "supplier," "consumer," and "combiner," as defined by java.util.function.Consumer<T> (as does collect())
- A version of java.util.stream.Collector.of() that takes three arguments corresponding to the "supplier," "consumer," and "combiner," as defined by java.util.function.Consumer<T> (as does collect()), although perhaps this would be best served with a different name than of().
Perhaps one day, when I have a deeper understanding of all this, I'll be clear as to why we really need a Consumer and a Collector that serve such similar purposes.
And perhaps my next learning effort will be to replace my use of Consumer<T> with full-fledged Collector<T,A,R>.
In any event, I hope that by detailing my learning pathway, I can help others as they travel toward the same destination.
What's your experience with Java Streams? Have you found yourself struggling with moving from toy examples to more complicated real-world applications?
Comments are closed.