Aggregator

"How do we combine the results of individual, but related messages so that they can be processed as a whole?" — Hohpe & Woolf, Enterprise Integration Patterns, 2003

Intent

The EIP Aggregator is a stateful message-routing component that collects a set of related messages and combines them into a single aggregated message for downstream processing. Unlike a filter or router — which act on one message at a time — the Aggregator must make three explicit design decisions before it can release a result.

The three design decisions are: (1) Correlation — which messages belong together, identified by a correlation key such as orderId stamped on every message by an upstream Splitter or scatter step; (2) Completeness condition — when to release the assembled result, expressed as completionSize(N) (release when N messages arrive), completionTimeout(ms) (release after a deadline even if incomplete), or a custom predicate; (3) Aggregation algorithm — how individual message bodies or headers merge into one (collect into a list, sum a numeric field, merge JSON objects, pick the first non-null value).

The Aggregator holds state between messages: it maintains a bucket per correlation key until the completeness condition is met. This statefulness distinguishes it from all stateless routing components and is the primary operational concern — the bucket store must survive restarts. The Splitter pattern is the natural inverse: Splitter decomposes one composite message into N individual messages; Aggregator reassembles N related messages into one. Together they form the canonical Scatter-Gather workflow.

When NOT to Use

  • When all related messages arrive in a single composite message — use Splitter followed by direct processing instead; no reassembly step is needed.
  • When the aggregation window is unbounded and no completeness condition can be defined — an unbounded bucket store is a memory leak waiting to happen; define a timeout or reject the approach.
  • When processing is strictly sequential and parallelism is not required — the Aggregator adds coordination overhead (state management, completeness logic) that serves no purpose in a serial pipeline.
  • When exactly-once delivery semantics are needed at the aggregation layer — wrap the aggregated output with Idempotent-Consumer rather than embedding deduplication inside the Aggregator itself.

When to Use

  • Scatter-Gather workflows: fan-out requests to N services, collect all N responses, combine into one reply for the caller.
  • Batch assembly: an upstream Splitter decomposes a composite message into parts, parts are processed in parallel by different consumers, the Aggregator reassembles the processed parts before the next pipeline stage.
  • Event correlation: aggregate all domain events for a given saga step before advancing the Saga state machine — each event is a partial result; the Aggregator releases when the step is complete.

How It Works

Three design decisions in detail:

  1. Correlation Expression — defines which messages belong to the same group. Typically a correlationId header set by the upstream Splitter (or scatter step). Every message in a group must carry the same correlationId.

  2. Completeness Condition — determines when to release the assembled result:

    • completionSize(N) — release when exactly N messages have arrived for this correlationId
    • completionTimeout(ms) — release after a deadline even if fewer than N parts arrived (partial results)
    • Custom predicate — release when an application-specific condition on the accumulated payload is true
  3. Aggregation Strategy — how individual message bodies and headers merge:

    • Collect into a list (most common default)
    • Sum or average a numeric field across all messages
    • Merge JSON objects (last-write-wins or deep merge)
    • Pick the first non-null value (priority aggregation)

After the completeness condition fires, the Aggregator emits the assembled message and removes the bucket from its state store. In production, replace an in-memory store with a persistent store (database, Redis) to survive process restarts.

Flow Diagram

flowchart TD
    M1["Message 1<br/>correlationId=X"]
    M2["Message 2<br/>correlationId=X"]
    M3["Message 3<br/>correlationId=X"]

    subgraph Aggregator
        CORR["Correlation<br/>group by correlationId"]
        BUCKET["Bucket<br/>accumulate parts"]
        CHECK{"Completeness?<br/>size=3 or timeout"}
        STRAT["Aggregation Strategy<br/>merge / collect / sum"]
    end

    M1 --> CORR
    M2 --> CORR
    M3 --> CORR
    CORR --> BUCKET
    BUCKET --> CHECK
    CHECK -->|"complete"| STRAT
    CHECK -->|"waiting"| BUCKET
    STRAT --> OUT["Assembled Message"]

    style CHECK fill:#fff3e0,stroke:#f57c00
    style OUT fill:#e8f5e9,stroke:#388e3c

Confusion with DDD Aggregate The EIP Aggregator and the DDD Aggregate share a root word but are unrelated patterns. The EIP Aggregator is a message-routing component that collects and combines related messages (e.g., all responses to a scatter-gather request). The DDD Aggregate is a domain consistency boundary that enforces invariants over a cluster of domain objects. See Aggregate for the DDD pattern.


TypeScript Example

// Aggregator — TypeScript (stateful message accumulator)
// Source: Hohpe & Woolf, Enterprise Integration Patterns, 2003
const buckets = new Map<string, string[]>();
 
function aggregate(correlationId: string, part: string, totalParts: number): string[] | null {
  const parts = buckets.get(correlationId) ?? [];
  parts.push(part);
  buckets.set(correlationId, parts);
  if (parts.length === totalParts) {   // completeness condition: size-based
    buckets.delete(correlationId);
    return parts;                       // release aggregated result
  }
  return null;                          // still collecting
}
// Note: in production, replace Map with a persistent store to survive restarts

Java Example

// Aggregator — Apache Camel Java DSL
// Source: camel.apache.org/components/4.x/eips/aggregate-eip.html
from("direct:parts")
    .aggregate(header("correlationId"), new GroupedBodyAggregationStrategy())
    .completionSize(3)           // release when 3 parts arrive
    // .completionTimeout(5000)  // or release after 5 seconds, whichever comes first
    .to("direct:assembled");

Lineage Backward

  • Composite-Pattern — Lineage chain 7: Composite → Aggregate (DDD) → Message Aggregator (EIP). The Composite pattern treats a tree of objects as a single object; the EIP Aggregator similarly combines multiple messages into one assembled result.
  • Aggregate — The DDD Aggregate is a consistency boundary enforcing invariants over a cluster of domain objects; the EIP Aggregator is a routing component that assembles correlated messages. Different concepts sharing a root word (see disambiguation callout above).

Lineage Forward

  • CQRS-Pattern (Phase 12) — Event projections aggregate domain events into read models: the projection handler is an Aggregator that correlates events by aggregate ID and builds the read model state incrementally.
  • Splitter — Splitter and Aggregator are the inverse pair. The canonical Scatter-Gather workflow: Splitter fans out one composite message into N messages, parallel processing occurs, Aggregator reassembles N results into one.
PatternRelationship
SplitterInverse — Scatter-Gather pair
AggregateNaming collision — unrelated DDD pattern
Composite-PatternStructural ancestor (combines many into one)
CQRS-PatternEvent aggregation at read model layer
Idempotent-ConsumerGuard on aggregation output for exactly-once semantics

Sources