@Pipeline defines a server-side aggregation pipeline sent to MongoDB when the Change Stream starts. Only matching events are transmitted over the network, reducing traffic and application load.
The pipeline is evaluated once at stream startup — not per event.
Basic Usage
Annotate a method in your@ChangeStream class. At most one @Pipeline method per class. The method must take no parameters.
Supported Return Types
| Return type | Description |
|---|---|
List<Bson> | Raw MongoDB BSON pipeline stages |
List<AggregationOperation> | Spring Data aggregation operations |
Aggregation | Spring Data Aggregation object |
Combining with @Filter
@Pipeline can coexist with @Filter on the same @ChangeStream, forming a double funnel — pre-filter server-side, then refine with Java logic:
The following example is from the PipelineFilterCapture sample — it pre-filters by operation type server-side, then checks the order status application-side:
Checkpoint Interaction
Oplog Internals and Limitations
How Change Stream pipelines interact with the oplog
How Change Stream pipelines interact with the oplog
Change Streams Cannot Use Indexes
MongoDB does not support creating indexes on the oplog. All Change Stream pipelines perform a sequential scan (COLLSCAN) of the oplog to evaluate your$match filters. This is a fundamental MongoDB limitation, not a FlowWarden one.Pushdown-Eligible Fields
Not all filters are equal. Through empirical testing by the MongoDB community (this is not officially documented by MongoDB), only a handful of fields allow an optimized evaluation in the oplog scan:| Field | Pushdown |
|---|---|
operationType | Yes |
ns.db | Yes |
ns.coll | Yes |
documentKey._id | Yes |
fullDocument.* | No — forces full deserialization |
updateDescription.* | No — forces full deserialization |
| Computed or nested fields | No — forces full deserialization |
Best Practice: Layer Your Filters
Put pushdown-eligible filters first in your pipeline to reduce the number of entries that require full deserialization:Performance Characteristics
- Normal operation: No performance concern. MongoDB evaluates the pipeline on each new oplog entry as it arrives — there is no bulk scan of historical data.
- At restart: MongoDB scans the oplog from the resume token forward. If the pipeline filters out most events, this scan can be slow — this is why
dualCheckpoint=trueexists. - Oplog retention: The oplog is a capped collection with a fixed size. If the application is down longer than the oplog retention window, the resume token expires and the stream cannot resume. Use
@Checkpoint(onHistoryLost = ...)to handle this case.
Allowed aggregation stages in Change Stream pipelines
Allowed aggregation stages in Change Stream pipelines
MongoDB restricts which aggregation stages can be used in a Change Stream pipeline:
Using an unsupported stage causes a MongoDB error at stream startup (fail-fast).
| Stage | Allowed |
|---|---|
$match | Yes |
$project / $addFields | Yes |
$replaceRoot / $replaceWith | Yes |
$redact | Yes |
$set / $unset | Yes |
$group | No |
$sort | No |
$limit / $skip | No |
$lookup | No |
$out / $merge | No |
Constraints
| Constraint | Detail |
|---|---|
| One per class | At most one @Pipeline method per @ChangeStream class |
| No parameters | The method must take zero arguments |
| Evaluated once | The pipeline is resolved at stream startup, not per event |
| Immutable | Changing the pipeline requires restarting the stream |
See Also
@Filter
Application-side Java predicate filtering
Filtering Events
When to use @Pipeline vs @Filter
Checkpoint & Resume
How dual checkpoint interacts with @Pipeline
How it Works
Full event processing pipeline architecture

