Process Managers
What is a Process Manager?
Section titled “What is a Process Manager?”While Aggregate Roots ensure consistency within a single boundary, business processes often span multiple aggregates or require interaction with external systems.
A Process Manager (sometimes called a Saga) is responsible for coordinating these workflows. It listens to events from one or more aggregates and, in response, dispatches commands to other aggregates or triggers side effects.
Defining a Process Manager
Section titled “Defining a Process Manager”In @requence/event-sourcing, you define a Process Manager using eventStore.createProcessManager().
Stateless Process Managers
Section titled “Stateless Process Managers”The most common pattern is a stateless process manager that reacts to events by dispatching commands to other aggregates. This is the primary mechanism for cascading operations — for example, archiving all projects when a group is deleted.
import eventStore from './eventStore'import projectAggregate from './projectAggregate'import db from './db'import { projects } from './tables'import { eq } from 'drizzle-orm'
eventStore.createProcessManager('project').withEventHandlers({ async onGroupDeleted({ streamId: groupId }) { // Find all projects belonging to this group const groupProjects = await db.query.projects.findMany({ where: eq(projects.groupId, groupId), })
// Dispatch a 'delete' command to each project's aggregate root groupProjects.forEach((project) => { projectAggregate.loadStream(project.id).delete() }) },})Note: Commands dispatched inside a process manager are queued and processed asynchronously. You do not need to (and cannot) call
.settled()inside a process manager — the library automatically handles the lifecycle.
Process Managers Listening to Multiple Aggregates
Section titled “Process Managers Listening to Multiple Aggregates”A single process manager can listen to events from multiple aggregate types. Event handler names must match event names defined in any registered aggregate root:
eventStore.createProcessManager('notifications').withEventHandlers({ async onProjectCreated({ payload, actorId }) { await sendEmail(actorId, `Your project "${payload.name}" was created.`) }, async onProjectDeleted({ streamId, actorId }) { await sendEmail(actorId, `Project ${streamId} was deleted.`) },})withEventHandlers vs withAfterEffects
Section titled “withEventHandlers vs withAfterEffects”Process managers support two types of handlers, each running at a different phase of event processing:
withEventHandlers: Runs during event processing. Use this for updating internal state or dispatching commands to aggregates. The handlers receive anon<EventName>naming convention.withAfterEffects: Runs after all event handlers (across all listeners) have completed. Use this for triggering external side effects or commands that depend on the full state being settled. The handlers use anafter<EventName>naming convention.
eventStore .createProcessManager('search-index') .withAfterEffects({ async afterProjectUpdated({ streamId }) { // Runs after all projections and event handlers have processed // Safe to query updated read models here await rebuildSearchIndex('project', streamId) }, })Stateful Process Managers
Section titled “Stateful Process Managers”For workflows that need to track progress across multiple events, a process manager can maintain its own state using .withState(). When the event store is rebuilt, stateful process managers are automatically “refreshed” — their state is reconstructed by replaying all relevant events.
eventStore .createProcessManager('orderFulfillment') .withState({ paymentReceived: false, inventoryReserved: false, }) .withEventHandlers((state) => ({ onPaymentSucceeded() { state.paymentReceived = true }, onInventoryReserved() { state.inventoryReserved = true }, })) .withAfterEffects((state) => ({ async afterPaymentSucceeded({ streamId }) { if (state.paymentReceived && state.inventoryReserved) { shippingAggregate.loadStream(streamId).shipOrder() } }, async afterInventoryReserved({ streamId }) { if (state.paymentReceived && state.inventoryReserved) { shippingAggregate.loadStream(streamId).shipOrder() } }, }))Stateful Process Manager Methods
Section titled “Stateful Process Manager Methods”When .withState() is used, additional methods become available:
.state(): Returns the current state of the process manager (after hydration)..refreshState(): Triggers a full state refresh by replaying all relevant events from the beginning.
The refreshing Flag
Section titled “The refreshing Flag”Both withEventHandlers and withAfterEffects handlers receive a second argument with a refreshing boolean:
.withEventHandlers({ async onPaymentSucceeded(event, { refreshing }) { if (refreshing) { // Skip external API calls during refresh return } await notifyPaymentGateway(event.streamId) },})Infinite Loop Protection
Section titled “Infinite Loop Protection”The library includes built-in infinite loop detection. If a process manager listens to an event and then emits the exact same event (same type, same stream, same payload), the library will throw an InfiniteLoopError to prevent runaway event cycles.