Aggregator
"How do we combine the results of individual, but related messages so that they can be processed as a whole?" — Hohpe & Woolf, Enterprise Integration Patterns, 2003
Intent
The EIP Aggregator is a stateful message-routing component that collects a set of related messages and combines them into a single aggregated message for downstream processing. Unlike a filter or router — which act on one message at a time — the Aggregator must make three explicit design decisions before it can release a result.
The three design decisions are: (1) Correlation — which messages belong together, identified by a correlation key such as orderId stamped on every message by an upstream Splitter or scatter step; (2) Completeness condition — when to release the assembled result, expressed as completionSize(N) (release when N messages arrive), completionTimeout(ms) (release after a deadline even if incomplete), or a custom predicate; (3) Aggregation algorithm — how individual message bodies or headers merge into one (collect into a list, sum a numeric field, merge JSON objects, pick the first non-null value).
The Aggregator holds state between messages: it maintains a bucket per correlation key until the completeness condition is met. This statefulness distinguishes it from all stateless routing components and is the primary operational concern — the bucket store must survive restarts. The Splitter pattern is the natural inverse: Splitter decomposes one composite message into N individual messages; Aggregator reassembles N related messages into one. Together they form the canonical Scatter-Gather workflow.
When NOT to Use
- When all related messages arrive in a single composite message — use Splitter followed by direct processing instead; no reassembly step is needed.
- When the aggregation window is unbounded and no completeness condition can be defined — an unbounded bucket store is a memory leak waiting to happen; define a timeout or reject the approach.
- When processing is strictly sequential and parallelism is not required — the Aggregator adds coordination overhead (state management, completeness logic) that serves no purpose in a serial pipeline.
- When exactly-once delivery semantics are needed at the aggregation layer — wrap the aggregated output with Idempotent-Consumer rather than embedding deduplication inside the Aggregator itself.
When to Use
- Scatter-Gather workflows: fan-out requests to N services, collect all N responses, combine into one reply for the caller.
- Batch assembly: an upstream Splitter decomposes a composite message into parts, parts are processed in parallel by different consumers, the Aggregator reassembles the processed parts before the next pipeline stage.
- Event correlation: aggregate all domain events for a given saga step before advancing the Saga state machine — each event is a partial result; the Aggregator releases when the step is complete.
How It Works
Three design decisions in detail:
-
Correlation Expression — defines which messages belong to the same group. Typically a
correlationIdheader set by the upstream Splitter (or scatter step). Every message in a group must carry the samecorrelationId. -
Completeness Condition — determines when to release the assembled result:
completionSize(N)— release when exactly N messages have arrived for thiscorrelationIdcompletionTimeout(ms)— release after a deadline even if fewer than N parts arrived (partial results)- Custom predicate — release when an application-specific condition on the accumulated payload is true
-
Aggregation Strategy — how individual message bodies and headers merge:
- Collect into a list (most common default)
- Sum or average a numeric field across all messages
- Merge JSON objects (last-write-wins or deep merge)
- Pick the first non-null value (priority aggregation)
After the completeness condition fires, the Aggregator emits the assembled message and removes the bucket from its state store. In production, replace an in-memory store with a persistent store (database, Redis) to survive process restarts.
Flow Diagram
flowchart TD
M1["Message 1<br/>correlationId=X"]
M2["Message 2<br/>correlationId=X"]
M3["Message 3<br/>correlationId=X"]
subgraph Aggregator
CORR["Correlation<br/>group by correlationId"]
BUCKET["Bucket<br/>accumulate parts"]
CHECK{"Completeness?<br/>size=3 or timeout"}
STRAT["Aggregation Strategy<br/>merge / collect / sum"]
end
M1 --> CORR
M2 --> CORR
M3 --> CORR
CORR --> BUCKET
BUCKET --> CHECK
CHECK -->|"complete"| STRAT
CHECK -->|"waiting"| BUCKET
STRAT --> OUT["Assembled Message"]
style CHECK fill:#fff3e0,stroke:#f57c00
style OUT fill:#e8f5e9,stroke:#388e3c
Confusion with DDD Aggregate The EIP Aggregator and the DDD Aggregate share a root word but are unrelated patterns. The EIP Aggregator is a message-routing component that collects and combines related messages (e.g., all responses to a scatter-gather request). The DDD Aggregate is a domain consistency boundary that enforces invariants over a cluster of domain objects. See Aggregate for the DDD pattern.
TypeScript Example
// Aggregator — TypeScript (stateful message accumulator)
// Source: Hohpe & Woolf, Enterprise Integration Patterns, 2003
const buckets = new Map<string, string[]>();
function aggregate(correlationId: string, part: string, totalParts: number): string[] | null {
const parts = buckets.get(correlationId) ?? [];
parts.push(part);
buckets.set(correlationId, parts);
if (parts.length === totalParts) { // completeness condition: size-based
buckets.delete(correlationId);
return parts; // release aggregated result
}
return null; // still collecting
}
// Note: in production, replace Map with a persistent store to survive restartsJava Example
// Aggregator — Apache Camel Java DSL
// Source: camel.apache.org/components/4.x/eips/aggregate-eip.html
from("direct:parts")
.aggregate(header("correlationId"), new GroupedBodyAggregationStrategy())
.completionSize(3) // release when 3 parts arrive
// .completionTimeout(5000) // or release after 5 seconds, whichever comes first
.to("direct:assembled");Lineage Backward
- Composite-Pattern — Lineage chain 7: Composite → Aggregate (DDD) → Message Aggregator (EIP). The Composite pattern treats a tree of objects as a single object; the EIP Aggregator similarly combines multiple messages into one assembled result.
- Aggregate — The DDD Aggregate is a consistency boundary enforcing invariants over a cluster of domain objects; the EIP Aggregator is a routing component that assembles correlated messages. Different concepts sharing a root word (see disambiguation callout above).
Lineage Forward
- CQRS-Pattern (Phase 12) — Event projections aggregate domain events into read models: the projection handler is an Aggregator that correlates events by aggregate ID and builds the read model state incrementally.
- Splitter — Splitter and Aggregator are the inverse pair. The canonical Scatter-Gather workflow: Splitter fans out one composite message into N messages, parallel processing occurs, Aggregator reassembles N results into one.
Related Concepts
| Pattern | Relationship |
|---|---|
| Splitter | Inverse — Scatter-Gather pair |
| Aggregate | Naming collision — unrelated DDD pattern |
| Composite-Pattern | Structural ancestor (combines many into one) |
| CQRS-Pattern | Event aggregation at read model layer |
| Idempotent-Consumer | Guard on aggregation output for exactly-once semantics |
Sources
- Hohpe, G. & Woolf, B. (2003). Enterprise Integration Patterns. Addison-Wesley.
- https://www.enterpriseintegrationpatterns.com/Aggregator.html
- https://camel.apache.org/components/4.x/eips/aggregate-eip.html