Use MongoDB transactions with Change Stream handlers for exactly-once processing semantics
By default, FlowWarden provides at-least-once delivery: if the application crashes between processing an event and saving the checkpoint, the event is replayed on restart. For most use cases (cache invalidation, notifications, projections), this is sufficient as long as your handler is idempotent.For critical streams where duplicate processing is unacceptable (e.g., billing, payments, inventory), you can achieve exactly-once semantics by wrapping your business writes and checkpoint save in a single MongoDB transaction.
On success: both the business write and the checkpoint are committed atomically.On exception: Spring rolls back the transaction — neither the business write nor the checkpoint are persisted. By default, FlowWarden logs the error and moves on to the next event. The failed event is lost unless you configure retry and DLQ:
@ChangeStream(documentType = Order.class)@Checkpoint(saveEveryN = 1)@RetryPolicy(maxAttempts = 3)@DeadLetterQueuepublic class OrderProcessor { @Autowired private MongoTemplate mongoTemplate; @OnInsert @Transactional void handle(Order order, ChangeStreamContext<Order> ctx) { mongoTemplate.save(new ProcessedOrder(order), "processed_orders"); ctx.saveCheckpointNow(); // On failure: @RetryPolicy retries in a new transaction, // then @DeadLetterQueue captures the event if all retries fail }}
For critical streams using transactions, always combine @RetryPolicy and @DeadLetterQueue to avoid losing events on failure. See the Retry & DLQ guide for configuration details.
Spring’s @Transactional opens a MongoDB session and binds it to the current thread
All MongoTemplate operations on the same thread automatically use the bound session
ctx.saveCheckpointNow() saves the checkpoint through MongoTemplate — it joins the same transaction
On success, Spring commits both the business write and the checkpoint atomically
On failure, Spring rolls back everything — the event will be redelivered on the next attempt
ctx.saveCheckpointNow() is required inside the handler. Without it, FlowWarden saves the checkpoint after the handler returns — outside the transaction boundary.
Duplicate processing has real consequences (double charges, wrong inventory)
You need exactly-once guarantees
You use SINGLE_LEADER deployment mode
Skip transactions
Handler calls external APIs (HTTP, Kafka, email)
Handler is naturally idempotent
At-least-once is acceptable
High-throughput streams where transaction overhead matters
MongoDB transactions cannot roll back external side-effects (HTTP calls, message queue publishes, emails). If your handler performs external calls, transactions only protect the MongoDB writes — the side-effects will not be undone on rollback. Design these handlers to be idempotent instead.
If your @ChangeStream uses mongoTemplateRef to point to a different MongoTemplate, the exactly-once guarantee does not apply.
Spring binds the transaction session to a specific MongoDatabaseFactory. The checkpoint store uses the defaultMongoTemplate from auto-configuration. If your handler uses a different template via mongoTemplateRef, the checkpoint save and the handler’s writes use different database factories — they cannot participate in the same transaction.
// This does NOT provide exactly-once:@ChangeStream(collection = "orders", mongoTemplateRef = "secondaryMongoTemplate")@Checkpoint(saveEveryN = 1)public class OrderProcessor { @Autowired @Qualifier("secondaryMongoTemplate") private MongoTemplate secondaryMongoTemplate; @OnInsert @Transactional // transaction on secondary factory void handle(Order order, ChangeStreamContext<Order> ctx) { secondaryMongoTemplate.save(...); // secondary factory ctx.saveCheckpointNow(); // default factory — NOT in the same transaction }}
For transactional exactly-once, use @Checkpoint(saveEveryN = 1). With saveEveryN > 1, FlowWarden skips checkpoint saves between batches — events processed between checkpoints would not have transactional guarantees on crash recovery.
FlowWarden’s automatic checkpoint save happens after the handler returns — outside the @Transactional boundary. You must explicitly call ctx.saveCheckpointNow() inside the handler to include the checkpoint in the transaction. If you forget, FlowWarden falls back to at-least-once (the checkpoint saves outside the transaction).