Skip to main content
FlowWarden wraps MongoDB Change Streams into a declarative, annotation-driven programming model managed by Spring Boot auto-configuration. This page explains what happens under the hood — from application startup to handler invocation.

The Big Picture


Phase 1 — Startup: Bean Discovery

When the application context boots, Spring initializes all beans. ChangeStreamBeanPostProcessor is a BeanPostProcessor registered by the auto-configuration — it intercepts every bean after initialization and inspects its class for the @ChangeStream annotation. For each discovered @ChangeStream class, the processor:
  1. Resolves the stream name and target collection — from name, value, collection, or inferred from documentType via the Spring Data @Document annotation.
  2. Discovers handler methods — scans declared methods for @OnChange, @OnInsert, @OnUpdate, @OnDelete, and @OnReplace, validates their signatures and return types.
  3. Discovers the pipeline — finds the optional @Pipeline method and inspects its return type (List<Bson>, List<AggregationOperation>, or Aggregation).
  4. Discovers the filter — finds the optional @Filter method and checks it returns Predicate<ChangeStreamContext<?>> or boolean filter(ChangeStreamContext<?> ctx).
  5. Validates the configuration — catches misconfigurations early (e.g., @Filter combined with @OnDelete, missing handlers, wrong return types for the configured mode).
  6. Registers a ChangeStreamDefinition in the StreamRegistry.
All validation happens at startup, not at runtime. Misconfigured streams fail fast with a clear BeanCreationException before the application finishes booting.

Phase 2 — Startup: Stream Launch

Once the application context is fully ready (ApplicationReadyEvent), the StreamManager iterates the StreamRegistry and starts every stream where autoStart = true and enabled = true.
// ImperativeStreamManager — uses Spring Data's MessageListenerContainer
MessageListenerContainer container = new DefaultMessageListenerContainer(mongoTemplate);
container.start();

ChangeStreamRequest<Document> request = ChangeStreamRequest.builder(listener)
    .collection(def.collection())
    .pipeline(pipeline)          // @Pipeline stages, evaluated once here
    .resumeToken(lastToken)      // null on first start; loaded from _fw_checkpoints on restart
    .build();

container.register(request, Document.class);
If a checkpoint was previously saved in _fw_checkpoints, the resume token is loaded and passed to MongoDB so the stream resumes exactly where it left off — no events are missed or replayed.

Phase 3 — Runtime: Event Processing Pipeline

Every Change Stream event flows through the following stages before reaching your handler:

@Pipeline vs @Filter

Pushed to MongoDB at stream subscription time. MongoDB evaluates the aggregation pipeline in the oplog and only sends matching events over the wire. This is the most efficient approach — it reduces network traffic and application load.
@Pipeline
List<Bson> pipeline() {
    return List.of(
        Aggregates.match(Filters.in("operationType", "insert", "update")),
        Aggregates.match(Filters.eq("fullDocument.status", "PAID"))
    );
}
Change Stream pipelines scan the oplog, not a collection. Unlike regular MongoDB queries, aggregation stages in a Change Stream pipeline cannot use indexes — MongoDB performs a COLLSCAN on the oplog to evaluate your $match filters. This is expected behavior, not a performance issue. Keep your pipeline stages lightweight ($match, $project) and avoid heavy operations like $lookup or $group. For complex filtering logic, use @Filter on the application side instead.

Dual Execution Mode

FlowWarden supports two execution modes, configured via flowwarden.default-mode.
ModeInfrastructureHandler return typeUse with
IMPERATIVEMongoTemplate + MessageListenerContainervoidSpring MVC, blocking I/O
REACTIVEReactiveMongoTemplate + Project ReactorMono<Void>Spring WebFlux, non-blocking I/O
The mode is validated at startup: a void handler in REACTIVE mode or a Mono<Void> handler in IMPERATIVE mode causes a BeanCreationException.
@ChangeStream(collection = "orders")
public class OrderStream {

    @OnInsert
    void onInsert(Order order, ChangeStreamContext<Order> ctx) {
        // blocking code — fine in IMPERATIVE mode
        repository.save(order);
    }
}

Internal MongoDB Collections

FlowWarden uses three internal collections in your database. You do not need to create them — they are managed automatically.
CollectionPurpose
_fw_checkpointsStores resume tokens for each stream. Loaded on startup to resume from the last processed event.
_fw_dlqDead Letter Queue. Failed events that exhausted all retry attempts are written here.
_fw_locksDistributed locks for SINGLE_LEADER deployment mode. Used for leader election and heartbeat coordination between instances.
All internal collections are prefixed with _fw_ to avoid collisions with your application data. They are created with appropriate indexes on first use.

Handler Signatures

FlowWarden resolves the correct invocation strategy at startup based on the method signature.
SignatureDescription
void handle(ChangeStreamContext<T> ctx)Full event context — operation type, document key, update description, resume token
void handle(T doc)Document directly — requires documentType on @ChangeStream
void handle(T doc, ChangeStreamContext<T> ctx)Both document and context
Mono<Void> handle(ChangeStreamContext<T> ctx)Reactive variant (REACTIVE mode only)
@OnChange handles all operation types not covered by a specific typed handler (or all types if no typed handlers are declared). It always receives a ChangeStreamContext<?>.
@OnChange
void handle(ChangeStreamContext<?> ctx) {
    // handles INSERT, UPDATE, DELETE, REPLACE, DROP, INVALIDATE, ...
}
Use operationTypes to restrict which operations this handler covers:
@OnChange(operationTypes = { OperationType.INSERT, OperationType.REPLACE })
void handle(ChangeStreamContext<?> ctx) { ... }

See Also

@ChangeStream

Full annotation reference with all attributes

Checkpoint & Resume

How resume tokens are stored and replayed on restart

Filtering Events

When to use @Pipeline vs @Filter

Imperative vs Reactive

Choosing the right execution mode