Sometimes it’s not enough for something to work - it has to work under load. In modern applications that process large amounts of data, the Streams API in Java provides developers with an elegant, declarative tool to transform, filter, and aggregate data in pipelines. Describing complex data operations with just a few lines is seductive and realistic. But what happens when these operations encounter millions of entries? When should execution be done in multiple threads in parallel to save time and effectively use multi-core systems?
- Basics: Collector and concurrency
- Criteria for parallelizable collectors
- Examples from the Java standard library
- Own parallel collector implementations
- Best practices for productive use
- Application scenario: License analysis in the benchmark
- Comparison with alternative parallelism concepts
- Conclusion and outlook
At this point, a concept that often receives too little attention is the collector. The element at the end of a stream pipeline determines what should happen with the processed data. And even though the API seems simple – collect(Collectors.toList()) – there is an architecture behind it that brings its challenges when executed in parallel.
This text is, therefore, not just about the syntax or mechanics of collectors but rather a deep understanding of the conditions under which they can be used correctly and efficiently in parallel. We look at standard JDK solutions, discuss our implementations, show typical errors, and ultimately ask ourselves: How parallel can a collector be without becoming dangerous?
Basics: Collector and concurrency#
At first glance, Java’s Streams API suggests that collecting results—the so-called terminal aggregation—can easily be carried out in parallel. But behind the method collect(…) hides more than just syntactic convenience. It is a coordinated collaboration between a data stream and a so-called collector—an object that forms a whole from individual parts.
A collector essentially consists of four functional components: the supplier, which provides a new cache for each subprocess; the accumulator, which feeds elements into this cache; the combiner, which merges multiple caches into one; and finally, the finisher, which produces the result. While supplier and accumulator are also essential in sequential streams, the combiner only comes into action when several threads have collected independently of each other, i.e., one parallelStream().
Here lies the first fundamental difference between sequential and parallel processing: in a sequential stream, it is sufficient to accumulate step by step into a single memory. In the parallel variant, on the other hand, several buffers are created that are isolated from one another, the contents of which must later be merged into a final result without conflict. This merging happens through the combiner, and it is precisely at this point that it is decided whether a collector is suitable for parallel use or not.
This suitability depends on several properties: The operations must be associative, i.e., deliver the same result regardless of the combination of the intermediate results. In addition, there must not be a shared state without synchronization. Last but not least, the individual steps must remain deterministic and free of side effects—otherwise, parallelisation quickly becomes a source of subtle errors.
Knowing these structural requirements is the first step toward consciously using parallel processing. Only if you understand how the collector and stream work together can you estimate when a performance gain is possible and when you will get unstable or simply wrong results instead.
Criteria for parallelizable collectors#
Let’s imagine a stream running in parallel—perhaps over a large dataset divided into several segments. Each segment is now processed independently. What sounds trivial has profound implications: As soon as several threads collect at the same time, their intermediate results must not get in the way. The responsibility for correctness lies with the collector—or, more precisely, with its structural and functional design.
The first fundamental property is Associativity. A combiner call must produce consistent results regardless of the order. combine (a, b) and combine (b, a) must produce equivalent results. This is necessary because the order of combination in a parallel context depends on the scheduler and is, therefore, unpredictable.
A second central point concerns Access to memory structures. A potential hotspot for race conditions arises when a collector uses a shared, mutable state during accumulation, such as an out-of-sync list or map. The collector must either work exclusively with local, thread-isolated caches or rely on concurrent data structures such as b: ConcurrentHashMap, LongAdder, or explicitly synchronized wrappers.
In addition, it is also deterministic and an essential criterion: parallel execution must not lead to different results, neither in terms of content nor structure. Caution is advised, especially with non-deterministic structures like HashSet or HashMap, as the iteration order can vary, as with Collectors.joining() or Collectors.toMap() becomes problematic when the application relies on order.
These requirements—associativity, isolated state, and determinism—form the technical touchstone for parallel collectors. They are not optional but fundamental. Anyone who ignores them risks difficult-to-reproduce errors, incomplete results, or high-performance but semantically incorrect output.
Concurrency in Java Streams is powerful but not trivial. Only if a collector meets these criteria does it become one parallelStream() more than a placebo effect.
Examples from the Java standard library#
An obvious way to make the abstract concept of parallel collectors tangible is through the collectors already included in the Java standard library. Many developers use Collectors.toList(), toSet(), or joining() almost every day—but rarely with knowledge of whether and how these collectors behave in a parallel context.
A simple example: The Collector Collectors.toList() uses an internal ArrayList. This is not thread-safe; therefore, the result when used in parallel is potentially inconsistent unless the buffers are isolated internally.
public static <T>
Collector<T, ?, List<T>> toList() {
return new CollectorImpl<>(ArrayList::new, List::add,
(left, right) -> { left.addAll(right); return left; },
CH_ID);
}In fact, this collector still works correctly in parallel streams because the Streams API allocates each thread its accumulation area and merges only at the end via a combined merge process. The crucial point lies not in the data structure itself but in its controlled isolation.
It turns out to be less robust: Collectors.groupingBy(…). This variant is based on one HashMap and is not designed for concurrent access. This collector will be in one parallelStream(). Used without suitable protective measures, there is a risk of race conditions. The standard solution is Collectors.groupingByConcurrent(…), internal ConcurrentHashMap, designed for simultaneous access.
public static <T, K>
Collector<T, ?, ConcurrentMap<K, List<T>>>
groupingByConcurrent(Function<? super T, ? extends K> classifier) {
return groupingByConcurrent(classifier, ConcurrentHashMap::new, toList());
}A look at the signature of this method already shows the intention:
Map<Integer, List<String>> result = namen.parallelStream()
.collect(Collectors.groupingByConcurrent(String::length));In this example, strings are grouped by their length in a way that can be processed in parallel. Crucially, both the map implementation and the accumulation process are thread-safe.
Equally interesting Collectors.toConcurrentMap(…), which is explicitly intended to aggregate large sets of key-value pairs in parallel. The combination of key conflicts and the correct handling of merge functions is particularly interesting here.
These examples conclude that not every standard collector is suitable for parallel use per se. Just because a method from the Collectors-kit, this does not mean that it will work correctly in every execution configuration. The context decides - and with it the data structure used, the behavior of the combiner and the type of accumulation.
So, if you want to get more than one result from a stream—a correct and high-performance result—you should choose your collector just as carefully as you choose the filter criterion at the beginning of the pipeline.
Own parallel collector implementations#
As powerful as the Java Standard Library’s prebuilt collectors are, they are sometimes not sufficient for specific needs. Especially when domain-specific aggregations, specialized data structures, or non-trivial reduction logic are required, it is worth considering the possibility of creating your own collector implementations.
A custom collector is usually created using the static method Collector.of(…) created. This method expects five parameters: one Supplier, which creates a new accumulator; a BiConsumer<A, T>, which inserts an element into the accumulator; a BinaryOperator for combining two accumulators; optionally one Function<A, R> to convert the result; and a Collector.Characteristics…-Array containing meta information like CONCURRENT or UNORDERED provides.
A simple but meaningful collector could, for example, collect strings in parallel ConcurrentLinkedQueue collect:
Collector<String, ?, Queue<String>> toConcurrentQueue() {
return Collector.of(
ConcurrentLinkedQueue::new,
Queue::add,
(left, right) -> { left.addAll(right); return left; },
Collector.Characteristics.CONCURRENT, Collector.Characteristics.UNORDERED
);
}This collector is both CONCURRENT and UNORDERED, which means it can be written to by multiple threads simultaneously without guaranteeing the insertion order. What is important is that ConcurrentLinkedQueue acts as a thread-safe data structure, and the operation addAll is also incidentally uncritical.
However, more complex scenarios, such as the parallel determination of statistical key figures (minimum, maximum, average) across a data set, are also conceivable. In such cases, one can record serve as an accumulator structure that already encapsulates all the required partial states. The combiner then only has to consolidate these structures field by field.
Own collector implementations force you to deal intensively with the parallelizability of the data structures used and the combinability of the aggregation logic. This is not a disadvantage, but a valuable learning effect, because only those who understand what a collector does inside can use it consciously and safely.
Best practices for productive use#
Anyone who wants to use collectors productively in a parallel context should consider some proven strategies - not as rigid rules, but as a framework for robust and efficient implementations.
A first principle is: Only parallelize if there is real benefit to be expected. Small amounts of data, trivial transformations or IO-bound processes usually do not benefit from parallelStream(). On the contrary: the overhead of thread management can even exceed the potential performance gain. Parallelization is only worthwhile if the amount of data to be processed is sufficiently large and the operations are CPU-intensive.
Second: Use only thread-safe or isolated data structures. This means that each thread uses its accumulator, which the Streams API supports internally, or explicitly concurrent data structures such as ConcurrentHashMap, ConcurrentLinkedQueue, or atomic wrappers can be used.
Third: Select collectors specifically. The standard library provides groupingByConcurrent, toConcurrentMap or mapping powerful tools specifically designed for parallel use. Anyone who also develops their solutions should pay particular attention to this combiner and the associativity of logic.
Fourth: Validate results , especially for new or complex pipelines. Parallel streams do not behave deterministically in execution, so tests in different utilization scenarios and under varying loads are necessary. This is especially true if collectors were developed or adapted in-house.
Last but not least: Measure instead of assume. Tools such as JMH (Java Microbenchmark Harness), Flight Recorder, or async-profiler help make realistic statements about performance advantages. Parallelization without metrics is like flying blind with a tailwind—it may be faster, but possibly in the wrong direction.
In summary, Parallel collectors are powerful tools, but they can only develop their potential if they are used consciously, well-founded, and with an eye on data structure, semantics, and execution context.
Application scenario: License analysis in the benchmark#
In our example, we will now look at the following scenario. We will evaluate fictitious license information. These consist of several attributes that lend themselves to automated analysis: Is the license valid? Is this a trial version? Which country does the license come from? How long was it valid?
Instead of answering these questions individually and sequentially, the Java Streams API allows us to view the data as a stream and convert it into a consolidated analysis in a collector. But what happens if we analyse not 100, but 100,000 or 1,000,000 such license objects? A simple one is enough stream(), or is the parallel variant worth it? parallelStream()?
This is precisely where our experiment comes in: We want to measure the amount of data for which a parallel collector is noticeably worthwhile, both in terms of runtime and resource utilization. We use a specially developed collector that aggregates various metrics:
- Total number of valid licenses
- Share of test licenses
- Average duration in days
- Distribution of licenses by country
- The oldest and youngest licenses issued
To capture these values as realistically as possible, we use a benchmark with JMH, the official microbenchmark tool of the OpenJDK community. The aim is to develop a sense of when parallel collectors actually offer added value and under which conditions they may even prove inefficient.
The data model#
The benchmark’s data model is based on a practical license management system, such as those typically used in software products, platform services, or license servers. It consists of a compact but expressive Record class called LicenseInfo, which encapsulates all the essential information of a single software license.
At the center is the precise product allocation across the field productId, which allows conclusions about the licensed application or component to be drawn. The license itself is replaced by a cryptographically unique licenseKey represented, simulated here in the example, by a randomly generated UUID. These two fields are responsible for the formal identification of each license instance.
Two further fields describe temporal aspects: issuedAt indicates the date and time of issuance, and during validUntil marks the expiry time. This not only allows you to check whether a license is currently valid, but also the period for which it was originally created. Based on this period, the average useful life can be calculated, a central metric type for license analyses in companies.
The structure is complemented by the field country, which documents the geographical origin or scope of the license. This is relevant for compliance issues, regional licensing models or sales evaluations. The information is intended as an ISO country abbreviation, but can be flexibly adapted to actual requirements.
Finally, the field isTrial depends on whether it is one Trial license or a regular license. This boolean field enables differentiated evaluations according to usage type—an important aspect if, for example, a distinction is to be made between active customers and evaluations.
The record definition remains deliberately slim, but provides a solid basis for various analysis methods: from counting and grouping to time series and country statistics. Their immutability and precise semantics make them ideal for use in parallel streams and collecting aggregations. In combination with a user-defined collector, the model allows precise and at the same time high-performance evaluation of extensive license stocks, with full parallelizability.
import java.time.LocalDateTime;
public record LicenseInfo(
String productId,
String licenseKey,
LocalDateTime issuedAt,
LocalDateTime validUntil,
String country,
boolean isTrial
) {}The test setup#
The structure of the benchmark for license analysis follows a systematic pattern, which is typical for microbenchmark-based performance tests with JMH. The aim is to measure under controlled conditions how the running time of a specially developed collector changes as the amount of data increases, both in serial and in parallel execution. The focus is not on comparing different algorithms, but rather on the stream processing strategy’s effect on the aggregation’s overall duration.
The test focuses on the benchmark class LicenseCollectorBenchmark, which is equipped with the annotations of the JMH library. It defines two measuring points: serialCollector() for serial evaluation using stream() and parallelCollector() for parallel evaluation using parallelStream(). Both methods process identical data but use different execution strategies.
Before each benchmark run, the @Setup(Level.Iteration)-annotated method setUp() generates a consistent test data set. This amount of data – 100,000 randomly generated by default LicenseInfo-Instances - is built fresh with each iteration to eliminate possible caching effects or side effects between benchmarks. The values such as product ID, license key, timestamp, country code and test status are generated randomly, creating realistic heterogeneity.
Both benchmark methods then fully aggregate the license data, each using the same custom collector. This collector consolidates the data into one final LicenseStats-Object that summarizes several metrics, including the number of valid licenses, the number of test licenses, the average term, the country-specific distribution and the earliest and latest issuance time. This breadth of aggregation ensures that the collector in the benchmark is sufficiently loaded both computationally and in memory terms.
The actual measurement is carried out via the JMH runtime mechanism with a predefined number of warm-up and measurement cycles. The average execution time per benchmark method is expressed in milliseconds per operation (ms/op), outputting an easily interpretable value representing the total stream processing duration, including accumulation and reduction.
This setup makes it possible to quantify the efficiency advantages – or potential disadvantages – of parallel stream processing under practical conditions. The comparability between serial and parallel is complete because the data is identical, the collector is similar, and only the execution strategy differs.
import org.openjdk.jmh.annotations.*;
import java.time.LocalDateTime;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(MILLISECONDS)
@State(Scope.Thread)
public class LicenseCollectorBenchmark {
static final int DATA_SIZE = 100_000;
List<LicenseInfo> licenseData;
@Setup(Level.Iteration)
public void setUp() {
Random rand = new Random();
licenseData = new ArrayList<>(DATA_SIZE);
for (int i = 0; i < DATA_SIZE; i++) {
licenseData.add(new LicenseInfo(
"product-" + rand.nextInt(100),
UUID.randomUUID().toString(),
LocalDateTime.now().minusDays(rand.nextInt(1000)),
LocalDateTime.now().plusDays(rand.nextInt(1000)),
"country-" + rand.nextInt(20),
rand.nextBoolean()
));
}
}
@Benchmark
public LicenseStats serialCollector() {
return licenseData.stream()
.collect(LicenseCollectors.stats());
}
@Benchmark
public LicenseStats parallelCollector() {
return licenseData.parallelStream()
.collect(LicenseCollectors.stats());
}
// --- data model ---
public record LicenseInfo(
String productId,
String licenseKey,
LocalDateTime issuedAt,
LocalDateTime validUntil,
String country,
boolean isTrial
) {}
public record LicenseStats(
long totalValid,
long totalTrial,
double avgDurationDays,
Map<String, Long> byCountry,
LocalDateTime oldestIssued,
LocalDateTime newestIssued
) {}
// --- Collector-Factory ---
static class LicenseCollectors {
static Collector<LicenseInfo, ?, LicenseStats> stats() {
class Acc {
long valid = 0;
long trial = 0;
long total = 0;
long sumDays = 0;
Map<String, Long> byCountry = new ConcurrentHashMap<>();
LocalDateTime oldest = null;
LocalDateTime newest = null;
void add(LicenseInfo li) {
if (li.validUntil().isAfter(LocalDateTime.now())) valid++;
if (li.isTrial()) trial++;
long days = Duration.between(li.issuedAt(), li.validUntil()).toDays();
sumDays += days;
total++;
byCountry.merge(li.country(), 1L, Long::sum);
if (oldest == null || li.issuedAt().isBefore(oldest)) oldest = li.issuedAt();
if (newest == null || li.issuedAt().isAfter(newest)) newest = li.issuedAt();
}
Acc merge(Acc other) {
valid += other.valid;
trial += other.trial;
sumDays += other.sumDays;
total += other.total;
other.byCountry.forEach((k, v) -> byCountry.merge(k, v, Long::sum));
if (oldest == null || (other.oldest != null && other.oldest.isBefore(oldest)))
oldest = other.oldest;
if (newest == null || (other.newest != null && other.newest.isAfter(newest)))
newest = other.newest;
return this;
}
LicenseStats finish() {
double avg = total == 0 ? 0 : (double) sumDays / total;
return new LicenseStats(valid, trial, avg, byCountry, oldest, newest);
}
}
return Collector.of(
Acc::new,
Acc::add,
Acc::merge,
Acc::finish,
Collector.Characteristics.UNORDERED,
Collector.Characteristics.CONCURRENT
);
}
}
}Comparison with alternative parallelism concepts#
While parallel streams with collectors offer a declarative and comparatively accessible way to use multi-core processors, there are other parallelisation models in the Java platform, each with its own strengths and areas of application.
An obvious comparison is with the ForkJoinPool-API. This allows recursive tasks to be broken down and carried out efficiently in parallel, for example, when searching or sorting large amounts of data. However, unlike streams, fork/join structures are explicit: the developer manually controls the division of tasks and the combination of results. This creates more flexibility, but also more complexity and potential for errors.
Another approach is to use CompletableFuture – especially in combination with methods like supplyAsync() or thenCombineAsync(). The focus here is not on aggregating data but coordinating asynchronous, non-blocking tasks. While streams must have all the available data, they are suitable CompletableFutures for I/O-heavy or delayed processing.
Newer Java versions also provide structures such as StructuredTaskScope, which comes into play - a still experimental but promising approach to organise parallel tasks better and explicitly control their lifespan. Unlike streams, this API allows structured concurrency with clearly defined error handling and cancellation semantics.
Parallel collectors position themselves within this spectrum as a compact, declarative mechanism focusing on data-centric, memory-bound processing. They offer low barriers to entry, but are limited in terms of expressiveness and controllability. They reach their natural limits in more complex scenarios - with dependencies between subtasks, adaptive scheduling or concurrent I/O operations.
Therefore, the choice of the appropriate parallelism model should always be based on the task’s requirements: Where pure data aggregation is involved, streams and parallel collectors are ideal. However, where control flow, resilience, or the ability to integrate into distributed systems is required, other APIs are more effective.
Conclusion and outlook#
Parallel collectors are not a sideshow to the Java Streams API - they are at the heart of whether modern computing can truly scale. Anyone who understands their structure quickly realizes that the correct aggregation in a parallel context is not a coincidence, but the result of precise planning. Associativity, deterministic processing and thread-safe data structures are not academic footnotes, but central pillars of this architecture.
The concepts shown here—from the structural basis to best practices to your own implementations—are intended not only to convey the functionality but also to create awareness of the responsibility that comes with parallelism. What is harmless in a single-threaded scenario can lead to serious errors in a parallel environment.
The question for the future is how the Streams API will continue to develop. More structured parallelism models, which are based on concepts such as structured concurrency or data flow graphs, are conceivable. Integration with reactive streams or declarative parallelism à la Fork/Join-DSL would also be a logical next step.
One thing is certain: Anyone who learns to use parallel collectors correctly and efficiently today will lay the foundation for high-performance and scalable data processing in the Java platform of tomorrow.
Happy Coding
Sven





