Skip to main content
@Pipeline defines a server-side aggregation pipeline sent to MongoDB when the Change Stream starts. Only matching events are transmitted over the network, reducing traffic and application load. The pipeline is evaluated once at stream startup — not per event.

Basic Usage

Annotate a method in your @ChangeStream class. At most one @Pipeline method per class. The method must take no parameters.
@ChangeStream(collection = "orders")
public class OrderHandler {

    @Pipeline
    List<Bson> pipeline() {
        return List.of(
            Aggregates.match(Filters.in("fullDocument.status", "PAID", "SHIPPED", "CANCELLED"))
        );
    }

    @OnInsert
    void handle(ChangeStreamContext<Order> ctx) {
        // only receives events where status is PAID, SHIPPED, or CANCELLED
    }
}

Supported Return Types

Return typeDescription
List<Bson>Raw MongoDB BSON pipeline stages
List<AggregationOperation>Spring Data aggregation operations
AggregationSpring Data Aggregation object
// From PipelineBsonCapture sample
@Pipeline
List<Bson> pipeline() {
    return List.of(
        Aggregates.match(Filters.in("operationType", "insert", "update"))
    );
}

Combining with @Filter

@Pipeline can coexist with @Filter on the same @ChangeStream, forming a double funnel — pre-filter server-side, then refine with Java logic: The following example is from the PipelineFilterCapture sample — it pre-filters by operation type server-side, then checks the order status application-side:
@ChangeStream(documentType = Order.class, fullDocument = FullDocumentMode.UPDATE_LOOKUP)
public class PipelineFilterCapture {

    @Pipeline
    Aggregation pipeline() {
        return Aggregation.newAggregation(
            Aggregation.match(Criteria.where("operationType").in("insert", "update"))
        );
    }

    @Filter
    Predicate<ChangeStreamContext<?>> filter() {
        return ctx -> ((ChangeStreamContext<Order>) ctx).getFullDocument(Order.class)
            .map(order -> "CONFIRMED".equals(order.getStatus()))
            .orElse(false);
    }

    @OnInsert
    void onInsert(Order doc, ChangeStreamContext<Order> ctx) { ... }
}
Use @Pipeline to eliminate events you never need (reduce network traffic), then @Filter for logic that requires Spring beans, external service calls, or dynamic conditions.

Checkpoint Interaction

When @Pipeline is present, dualCheckpoint=true (the default) is essential for fast restart.The pipeline filters events server-side, creating a gap between the oplog position and the last event your handler processed (lastProcessedToken). Without dual checkpoint, a restart forces MongoDB to re-scan the entire oplog from lastProcessedToken — potentially tens of thousands of filtered events.With dualCheckpoint=true, FlowWarden maintains a separate lastSeenToken (advanced via oplog sampling every 5 seconds) that tracks the actual oplog position, regardless of which events matched the pipeline.A warning is logged at startup if @Pipeline is present with dualCheckpoint=false.

Oplog Internals and Limitations

Change Streams Cannot Use Indexes

MongoDB does not support creating indexes on the oplog. All Change Stream pipelines perform a sequential scan (COLLSCAN) of the oplog to evaluate your $match filters. This is a fundamental MongoDB limitation, not a FlowWarden one.

Pushdown-Eligible Fields

Not all filters are equal. Through empirical testing by the MongoDB community (this is not officially documented by MongoDB), only a handful of fields allow an optimized evaluation in the oplog scan:
FieldPushdown
operationTypeYes
ns.dbYes
ns.collYes
documentKey._idYes
fullDocument.*No — forces full deserialization
updateDescription.*No — forces full deserialization
Computed or nested fieldsNo — forces full deserialization
Filtering on fullDocument fields (like fullDocument.status) forces MongoDB to deserialize and evaluate every oplog entry. This matters most during restarts, when MongoDB must scan the oplog forward from the resume token.

Best Practice: Layer Your Filters

Put pushdown-eligible filters first in your pipeline to reduce the number of entries that require full deserialization:
@Pipeline
List<Bson> pipeline() {
    return List.of(
        // Stage 1: pushdown-eligible — fast
        Aggregates.match(Filters.in("operationType", "insert", "update")),
        // Stage 2: requires deserialization — but fewer entries reach this stage
        Aggregates.match(Filters.eq("fullDocument.status", "PAID"))
    );
}

Performance Characteristics

  • Normal operation: No performance concern. MongoDB evaluates the pipeline on each new oplog entry as it arrives — there is no bulk scan of historical data.
  • At restart: MongoDB scans the oplog from the resume token forward. If the pipeline filters out most events, this scan can be slow — this is why dualCheckpoint=true exists.
  • Oplog retention: The oplog is a capped collection with a fixed size. If the application is down longer than the oplog retention window, the resume token expires and the stream cannot resume. Use @Checkpoint(onHistoryLost = ...) to handle this case.
MongoDB restricts which aggregation stages can be used in a Change Stream pipeline:
StageAllowed
$matchYes
$project / $addFieldsYes
$replaceRoot / $replaceWithYes
$redactYes
$set / $unsetYes
$groupNo
$sortNo
$limit / $skipNo
$lookupNo
$out / $mergeNo
Using an unsupported stage causes a MongoDB error at stream startup (fail-fast).

Constraints

ConstraintDetail
One per classAt most one @Pipeline method per @ChangeStream class
No parametersThe method must take zero arguments
Evaluated onceThe pipeline is resolved at stream startup, not per event
ImmutableChanging the pipeline requires restarting the stream

See Also

@Filter

Application-side Java predicate filtering

Filtering Events

When to use @Pipeline vs @Filter

Checkpoint & Resume

How dual checkpoint interacts with @Pipeline

How it Works

Full event processing pipeline architecture