Skip to main content
The @ChangeStream annotation is the foundation of the FlowWarden programming model. It turns any class into a Spring-managed Change Stream handler that watches a MongoDB collection for real-time data changes.

Basic Usage

Annotate a class with @ChangeStream and add at least one event handler method (@OnChange, @OnInsert, @OnUpdate, @OnDelete, @OnReplace):
@ChangeStream(collection = "orders", documentType = Order.class)
public class OrderHandler {

    @OnChange
    void handleOrderChange(ChangeStreamContext<Order> ctx) {
        System.out.println(ctx.summary());
    }
}
@ChangeStream is meta-annotated with @Component — there is no need to add @Component or @Service separately. The class is automatically registered as a Spring bean.

Attributes Reference

Identification

AttributeTypeDefaultDescription
valueString""Alias for name.
nameString"" (kebab-case of class name)Unique stream name. Used in logs, checkpoints, and metrics.

MongoDB Target

AttributeTypeDefaultDescription
collectionString""Collection to watch. Required unless documentType has a @Document annotation or watchDatabase/watchDeployment is true.
databaseString""Target database. Defaults to the Spring-configured database.
documentTypeClass<?>Document.classJava type for deserialization. Also used to infer collection via @Document if not specified.

Document Options

AttributeTypeDefaultDescription
fullDocumentFullDocumentModeDEFAULTControls inclusion of the full document in change events. See values below.
fullDocumentBeforeChangeFullDocumentBeforeChangeModeOFFControls pre-image inclusion (MongoDB 6.0+). See values below.

FullDocumentMode values

ValueDescription
DEFAULTServer default. INSERT and REPLACE include the document; UPDATE only includes the change delta.
UPDATE_LOOKUPMongoDB looks up the current document for UPDATE events. Required when using @Filter with @OnUpdate.
WHEN_AVAILABLEInclude full document when available, null otherwise (MongoDB 6.0+).
REQUIREDRequire full document — error if not available (MongoDB 6.0+).

FullDocumentBeforeChangeMode values

ValueDescription
OFFDisabled (default). No pre-image is requested.
WHEN_AVAILABLEInclude the document before the change when available. Returns Optional.empty() if not enabled on the collection.
REQUIREDRequire the pre-image — throws an error if the collection does not have pre-images enabled.
Pre-images (fullDocumentBeforeChange) require MongoDB 6.0+ and that pre-images are explicitly enabled on the target collection:
db.runCommand({
  collMod: "orders",
  changeStreamPreAndPostImages: { enabled: true }
})
See the MongoDB documentation for details. Without this, WHEN_AVAILABLE returns empty and REQUIRED throws an error.

Behaviour

AttributeTypeDefaultDescription
enabledbooleantrueEnable or disable this stream.
autoStartbooleantrueAutomatically start the stream on application ready.
mongoTemplateRefString""Name of the MongoTemplate or ReactiveMongoTemplate bean to use. Validated at startup.

Scope

AttributeTypeDefaultDescription
watchDatabasebooleanfalseWatch the entire database instead of a single collection. Makes collection optional.
watchDeploymentbooleanfalseWatch the entire cluster (all databases). Makes collection optional.

Collection Resolution

If collection is not explicitly set, FlowWarden resolves it from documentType:
  1. If documentType has a Spring Data @Document(collection = "...") annotation, that value is used.
  2. Otherwise, the decapitalized simple class name is used (e.g. Orderorder).
  3. If neither works and watchDatabase/watchDeployment is false, startup fails with an error.
// Explicit collection
@ChangeStream(collection = "orders")
public class OrderHandler { ... }

// Inferred from @Document annotation on Order class
@ChangeStream(documentType = Order.class)
public class OrderHandler { ... }

Name Resolution

The stream name is resolved in this order:
  1. name() attribute if non-empty
  2. value() attribute if non-empty
  3. Kebab-case of the class simple name (e.g. OrderStreamHandlerorder-stream-handler)
The stream name must be unique across all @ChangeStream classes in the application.
Always set a meaningful name — it appears in logs, metrics, and checkpoint keys. If omitted, FlowWarden generates one from the class name, which may break checkpoints if you rename the class.

Handler Methods

A @ChangeStream class must declare at least one handler method. FlowWarden supports two styles:

Generic: @OnChange

Catches all operation types (or a subset via its operationTypes filter). Only one @OnChange method is allowed per class.
@ChangeStream(documentType = Order.class)
public class OrderHandler {

    @OnChange
    void handle(ChangeStreamContext<Order> ctx) {
        System.out.println(ctx.getOperationType() + " on " + ctx.getCollectionName());
    }
}

Typed: @OnInsert, @OnUpdate, @OnDelete, @OnReplace

Route events to specific methods by operation type. At most one method per typed annotation.
@ChangeStream(name = "typed-order-capture", documentType = Order.class)
public class TypedOrderHandler {

    @OnInsert
    void onInsert(Order doc, ChangeStreamContext<Order> ctx) {
        log.info("New order: {}", doc.getId());
    }

    @OnUpdate
    void onUpdate(ChangeStreamContext<Order> ctx) {
        log.info("Order updated: {}", ctx.getDocumentKey());
    }

    @OnDelete
    void onDelete(ChangeStreamContext<Order> ctx) {
        log.info("Order deleted: {}", ctx.getDocumentKey());
    }
}

Combining Both

When both typed handlers and @OnChange are present, typed handlers take priority. @OnChange acts as a fallback for operation types without a dedicated handler.
@ChangeStream(name = "typed-order-capture", documentType = Order.class)
public class OrderHandler {

    @OnInsert
    void onInsert(Order doc, ChangeStreamContext<Order> ctx) { ... }

    @OnUpdate
    void onUpdate(ChangeStreamContext<Order> ctx) { ... }

    @OnDelete
    void onDelete(ChangeStreamContext<Order> ctx) { ... }

    @OnChange
    void onFallback(ChangeStreamContext<Order> ctx) {
        // Catches REPLACE and any other unhandled operation
    }
}

Supported Signatures

Typed handler methods (@OnInsert, @OnUpdate, @OnDelete, @OnReplace) support three signature styles:
SignatureStyleDescription
void handle(ChangeStreamContext<T> ctx)CONTEXT_ONLYAccess context; get the document from ctx.
void handle(T doc)DOCUMENT_ONLYReceive the deserialized document directly.
void handle(T doc, ChangeStreamContext<T> ctx)DOCUMENT_AND_CONTEXTBoth the document and the context.
@OnChange only supports the CONTEXT_ONLY style: void handle(ChangeStreamContext ctx).
If you use DOCUMENT_ONLY or DOCUMENT_AND_CONTEXT signatures, you must set a concrete documentType on @ChangeStream (not Document.class). Otherwise, startup fails with a validation error.

Examples

Minimal

@ChangeStream(documentType = Order.class)
public class OrderStream {

    @OnChange
    void handleOrderChange(ChangeStreamContext<Order> ctx) {
        System.out.println(ctx.summary());
    }
}
The handler class code is identical in both modes. The execution mode is determined globally by the flowwarden.default-mode property (IMPERATIVE or REACTIVE), which selects the appropriate stream manager at auto-configuration time.

With Explicit Name and Collection

@ChangeStream(name = "order-watcher", collection = "orders")
public class OrderHandler {

    @OnChange
    void handle(ChangeStreamContext<?> ctx) {
        System.out.printf("Event %s on %s%n",
            ctx.getOperationType(), ctx.getCollectionName());
    }
}

With Pre-Image (Before/After Comparison)

@ChangeStream(
    documentType = Order.class,
    fullDocument = FullDocumentMode.UPDATE_LOOKUP,
    fullDocumentBeforeChange = FullDocumentBeforeChangeMode.WHEN_AVAILABLE
)
public class OrderAuditHandler {

    @OnUpdate
    void onStatusChange(ChangeStreamContext<Order> ctx) {
        Optional<Order> before = ctx.getFullDocumentBeforeChange(Order.class);
        Optional<Order> after = ctx.getFullDocument(Order.class);

        if (before.isPresent() && after.isPresent()) {
            log.info("Order {} status: {} → {}",
                after.get().getId(),
                before.get().getStatus(),
                after.get().getStatus());
        }
    }
}

Disabled Stream

@ChangeStream(
    collection = "orders",
    enabled    = false   // Will not start automatically
)
public class OrderHandler { ... }

Custom MongoTemplate

When your application uses multiple MongoDB connections, specify which template to use:
@ChangeStream(
    collection       = "audit_events",
    mongoTemplateRef = "auditMongoTemplate"
)
public class AuditStreamHandler {

    @OnChange
    void handle(ChangeStreamContext<?> ctx) {
        // Uses the "auditMongoTemplate" bean
    }
}

Programmatic Start/Stop

Streams can be controlled at runtime via the FlowWardenStreamManager interface:
@Autowired
FlowWardenStreamManager streamManager;

// Stop a running stream
streamManager.stopStream("order-watcher");

// Restart it later
streamManager.startStream("order-watcher");

// Check status
boolean running = streamManager.isRunning("order-watcher");

Best Practices

  • One handler class per collection — keep stream handlers focused and cohesive.
  • Set documentType when you need typed access — otherwise documents arrive as org.bson.Document.
  • Use enabled = false to temporarily disable a stream without removing the code.
  • Set autoStart = false if you need to start the stream programmatically after some initialization logic.
MongoDB must be configured as a Replica Set for Change Streams to work. This applies to production and development environments. Testcontainers automatically provisions a single-node Replica Set for tests.

See Also

Event Handlers

@OnInsert, @OnUpdate, @OnDelete, @OnChange — handler method signatures in detail.

@Filter

Push server-side predicates to MongoDB’s aggregation pipeline.

@Checkpoint

Configure resume token persistence for crash-resilient streams.

Configuration

flowwarden.default-mode and other application properties.