Skip to main content

Overview

@Filter performs application-side filtering in Java. All events arrive from MongoDB, and a predicate decides which ones are forwarded to handler methods. Events that don’t pass the filter are silently skipped — but their resume token is still checkpointed as lastSeenToken. Unlike @Pipeline which runs a MongoDB aggregation pipeline once at stream startup to reduce network traffic, @Filter executes on every event and can leverage Spring beans, service calls, and any Java logic.
At most one @Filter method is allowed per @ChangeStream class.

Supported Signatures

@Filter
Predicate<ChangeStreamContext<?>> myFilter() {
    return ctx -> "PAID".equals(
        ctx.getFullDocument(Order.class).getStatus()
    );
}
Return typeParametersDescription
Predicate<ChangeStreamContext<?>>NoneReturns a reusable predicate. Called once, predicate tested per event.
booleanChangeStreamContext<?>Direct evaluation. Called on every event.

Basic Example

@ChangeStream(collection = "orders", documentType = Order.class)
public class PaidOrderHandler {

    @Filter
    boolean onlyPaid(ChangeStreamContext<?> ctx) {
        Order order = ctx.getFullDocument(Order.class);
        return order != null && "PAID".equals(order.getStatus());
    }

    @OnInsert
    void handle(Order order, ChangeStreamContext<Order> ctx) {
        // Only receives events where status == PAID
        log.info("Paid order: {}", order.getId());
    }
}

Startup Validation (Fail-Fast)

FlowWarden validates @Filter compatibility at application startup and rejects invalid configurations immediately with a clear error message. The application will fail to start if @Filter is combined with handlers that cover operations where MongoDB does not provide a fullDocument.

Why?

@Filter predicates typically call ctx.getFullDocument(), which returns null for DELETE, DROP, and INVALIDATE operations. Running the filter on these events would be error-prone at runtime — so FlowWarden rejects the combination at startup instead.

Rules

The following combinations are rejected:
These configurations cause a BeanCreationException at startup — the application will not start.
1. @Filter + typed handler for a no-fullDocument operation
// REJECTED: @OnDelete handles DELETE which has no fullDocument
@ChangeStream(collection = "orders")
public class InvalidHandler {

    @OnDelete
    void onDelete(ChangeStreamContext<?> ctx) { }

    @Filter
    boolean filter(ChangeStreamContext<?> ctx) { return true; }
}
The error message:
@ChangeStream class InvalidHandler declares @Filter and @OnDelete, which is not allowed.
DELETE events have no fullDocument, so the @Filter predicate cannot safely access the document.
Use a server-side @Pipeline to filter these events, or move the filtering logic into the
handler method.
2. @Filter + @OnChange without operationTypes restriction
// REJECTED: @OnChange covers ALL operations including DELETE/DROP/INVALIDATE
@ChangeStream(collection = "orders")
public class InvalidHandler2 {

    @OnChange
    void handle(ChangeStreamContext<?> ctx) { }

    @Filter
    boolean filter(ChangeStreamContext<?> ctx) { return true; }
}
3. @Filter + @OnChange with operationTypes that includes DELETE, DROP, or INVALIDATE
// REJECTED: operationTypes includes DELETE
@ChangeStream(collection = "orders")
public class InvalidHandler3 {

    @OnChange(operationTypes = {OperationType.INSERT, OperationType.DELETE})
    void handle(ChangeStreamContext<?> ctx) { }

    @Filter
    boolean filter(ChangeStreamContext<?> ctx) { return true; }
}

Valid Combinations

The following combinations are accepted:
// OK: @Filter + typed handlers for operations WITH fullDocument
@ChangeStream(collection = "orders")
public class ValidHandler {

    @OnInsert
    void onInsert(ChangeStreamContext<?> ctx) { }

    @OnUpdate
    void onUpdate(ChangeStreamContext<?> ctx) { }

    @Filter
    boolean filter(ChangeStreamContext<?> ctx) { return true; }
}
// OK: @Filter + @OnChange restricted to fullDocument operations
@ChangeStream(collection = "orders")
public class ValidHandler2 {

    @OnChange(operationTypes = {OperationType.INSERT, OperationType.UPDATE, OperationType.REPLACE})
    void handle(ChangeStreamContext<?> ctx) { }

    @Filter
    boolean filter(ChangeStreamContext<?> ctx) { return true; }
}

Quick Reference

Handler combination@Filter allowed?
@OnInsertYes
@OnUpdateYes
@OnReplaceYes
@OnInsert + @OnUpdateYes
@OnDeleteNo
@OnChange (no restriction)No
@OnChange(operationTypes = {INSERT, UPDATE})Yes
@OnChange(operationTypes = {INSERT, DELETE})No
@OnInsert + @OnDeleteNo
If you need to handle both DELETE and other operations while filtering, use @Pipeline for server-side filtering instead, or move the filter logic directly into your handler method.

Checkpoint Interaction

Events rejected by @Filter are not forwarded to the handler, but their resume token is still checkpointed as lastSeenToken. This prevents reprocessing filtered events after a restart. When @Filter is used alone (without @Pipeline), there is no gap between lastSeenToken and lastProcessedToken — the checkpoint behavior is neutral.

Combining with @Pipeline

@Filter can coexist with @Pipeline on the same @ChangeStream, forming a double filtering funnel: Typical use case: pre-filter operationType = insert | update and status = PAID server-side with @Pipeline, then verify application-side with @Filter that the tenant is active via a Spring service call.
@ChangeStream(collection = "orders", documentType = Order.class)
public class PaidOrderHandler {

    @Pipeline
    List<Bson> pipeline() {
        return List.of(
            Aggregates.match(Filters.and(
                Filters.in("operationType", "insert", "update"),
                Filters.eq("fullDocument.status", "PAID")
            ))
        );
    }

    @Filter
    boolean filter(ChangeStreamContext<?> ctx) {
        // @Pipeline already filtered server-side; refine with Java logic
        return tenantService.isActive(ctx.getFullDocument(Order.class).getTenantId());
    }

    @OnInsert
    void onNewPaidOrder(Order order, ChangeStreamContext<Order> ctx) {
        // Only receives PAID orders from active tenants
    }
}
When combining @Pipeline + @Filter, setting dualCheckpoint = true on @Checkpoint is recommended to keep lastSeenToken advancing independently.

See Also

@Pipeline

Server-side aggregation pipeline filtering

Filtering Events Guide

Complete guide combining @Pipeline and @Filter

@Checkpoint

Resume token persistence and dual checkpoint

Event Handlers

@OnInsert, @OnUpdate, @OnDelete, @OnChange