The Big Picture
Phase 1 — Startup: Bean Discovery
When the application context boots, Spring initializes all beans.ChangeStreamBeanPostProcessor is a BeanPostProcessor registered by the auto-configuration — it intercepts every bean after initialization and inspects its class for the @ChangeStream annotation.
For each discovered @ChangeStream class, the processor:
- Resolves the stream name and target collection — from
name,value,collection, or inferred fromdocumentTypevia the Spring Data@Documentannotation. - Discovers handler methods — scans declared methods for
@OnChange,@OnInsert,@OnUpdate,@OnDelete, and@OnReplace, validates their signatures and return types. - Discovers the pipeline — finds the optional
@Pipelinemethod and inspects its return type (List<Bson>,List<AggregationOperation>, orAggregation). - Discovers the filter — finds the optional
@Filtermethod and checks it returnsPredicate<ChangeStreamContext<?>>orboolean filter(ChangeStreamContext<?> ctx). - Validates the configuration — catches misconfigurations early (e.g.,
@Filtercombined with@OnDelete, missing handlers, wrong return types for the configured mode). - Registers a
ChangeStreamDefinitionin theStreamRegistry.
All validation happens at startup, not at runtime. Misconfigured streams fail fast with a clear
BeanCreationException before the application finishes booting.Phase 2 — Startup: Stream Launch
Once the application context is fully ready (ApplicationReadyEvent), the StreamManager iterates the StreamRegistry and starts every stream where autoStart = true and enabled = true.
_fw_checkpoints, the resume token is loaded and passed to MongoDB so the stream resumes exactly where it left off — no events are missed or replayed.
Phase 3 — Runtime: Event Processing Pipeline
Every Change Stream event flows through the following stages before reaching your handler:@Pipeline vs @Filter
- @Pipeline — Server-side
- @Filter — Application-side
- Combined — Funnel approach
Pushed to MongoDB at stream subscription time. MongoDB evaluates the aggregation pipeline in the oplog and only sends matching events over the wire. This is the most efficient approach — it reduces network traffic and application load.
Dual Execution Mode
FlowWarden supports two execution modes, configured viaflowwarden.default-mode.
| Mode | Infrastructure | Handler return type | Use with |
|---|---|---|---|
IMPERATIVE | MongoTemplate + MessageListenerContainer | void | Spring MVC, blocking I/O |
REACTIVE | ReactiveMongoTemplate + Project Reactor | Mono<Void> | Spring WebFlux, non-blocking I/O |
void handler in REACTIVE mode or a Mono<Void> handler in IMPERATIVE mode causes a BeanCreationException.
Internal MongoDB Collections
FlowWarden uses three internal collections in your database. You do not need to create them — they are managed automatically.| Collection | Purpose |
|---|---|
_fw_checkpoints | Stores resume tokens for each stream. Loaded on startup to resume from the last processed event. |
_fw_dlq | Dead Letter Queue. Failed events that exhausted all retry attempts are written here. |
_fw_locks | Distributed locks for SINGLE_LEADER deployment mode. Used for leader election and heartbeat coordination between instances. |
All internal collections are prefixed with
_fw_ to avoid collisions with your application data. They are created with appropriate indexes on first use.Handler Signatures
FlowWarden resolves the correct invocation strategy at startup based on the method signature.Typed handler signatures (@OnInsert, @OnUpdate, @OnDelete, @OnReplace)
Typed handler signatures (@OnInsert, @OnUpdate, @OnDelete, @OnReplace)
| Signature | Description |
|---|---|
void handle(ChangeStreamContext<T> ctx) | Full event context — operation type, document key, update description, resume token |
void handle(T doc) | Document directly — requires documentType on @ChangeStream |
void handle(T doc, ChangeStreamContext<T> ctx) | Both document and context |
Mono<Void> handle(ChangeStreamContext<T> ctx) | Reactive variant (REACTIVE mode only) |
@OnChange — generic handler
@OnChange — generic handler
@OnChange handles all operation types not covered by a specific typed handler (or all types if no typed handlers are declared). It always receives a ChangeStreamContext<?>.operationTypes to restrict which operations this handler covers:See Also
@ChangeStream
Full annotation reference with all attributes
Checkpoint & Resume
How resume tokens are stored and replayed on restart
Filtering Events
When to use @Pipeline vs @Filter
Imperative vs Reactive
Choosing the right execution mode

