Skip to content

Process Managers

While Aggregate Roots ensure consistency within a single boundary, business processes often span multiple aggregates or require interaction with external systems.

A Process Manager (sometimes called a Saga) is responsible for coordinating these workflows. It listens to events from one or more aggregates and, in response, dispatches commands to other aggregates or triggers side effects.

In @requence/event-sourcing, you define a Process Manager using eventStore.createProcessManager().

The most common pattern is a stateless process manager that reacts to events by dispatching commands to other aggregates. This is the primary mechanism for cascading operations — for example, archiving all projects when a group is deleted.

import eventStore from './eventStore'
import projectAggregate from './projectAggregate'
import db from './db'
import { projects } from './tables'
import { eq } from 'drizzle-orm'
eventStore.createProcessManager('project').withEventHandlers({
async onGroupDeleted({ streamId: groupId }) {
// Find all projects belonging to this group
const groupProjects = await db.query.projects.findMany({
where: eq(projects.groupId, groupId),
})
// Dispatch a 'delete' command to each project's aggregate root
groupProjects.forEach((project) => {
projectAggregate.loadStream(project.id).delete()
})
},
})

Note: Commands dispatched inside a process manager are queued and processed asynchronously. You do not need to (and cannot) call .settled() inside a process manager — the library automatically handles the lifecycle.

Process Managers Listening to Multiple Aggregates

Section titled “Process Managers Listening to Multiple Aggregates”

A single process manager can listen to events from multiple aggregate types. Event handler names must match event names defined in any registered aggregate root:

eventStore.createProcessManager('notifications').withEventHandlers({
async onProjectCreated({ payload, actorId }) {
await sendEmail(actorId, `Your project "${payload.name}" was created.`)
},
async onProjectDeleted({ streamId, actorId }) {
await sendEmail(actorId, `Project ${streamId} was deleted.`)
},
})

Process managers support two types of handlers, each running at a different phase of event processing:

  • withEventHandlers: Runs during event processing. Use this for updating internal state or dispatching commands to aggregates. The handlers receive an on<EventName> naming convention.
  • withAfterEffects: Runs after all event handlers (across all listeners) have completed. Use this for triggering external side effects or commands that depend on the full state being settled. The handlers use an after<EventName> naming convention.
eventStore
.createProcessManager('search-index')
.withAfterEffects({
async afterProjectUpdated({ streamId }) {
// Runs after all projections and event handlers have processed
// Safe to query updated read models here
await rebuildSearchIndex('project', streamId)
},
})

For workflows that need to track progress across multiple events, a process manager can maintain its own state using .withState(). When the event store is rebuilt, stateful process managers are automatically “refreshed” — their state is reconstructed by replaying all relevant events.

eventStore
.createProcessManager('orderFulfillment')
.withState({
paymentReceived: false,
inventoryReserved: false,
})
.withEventHandlers((state) => ({
onPaymentSucceeded() {
state.paymentReceived = true
},
onInventoryReserved() {
state.inventoryReserved = true
},
}))
.withAfterEffects((state) => ({
async afterPaymentSucceeded({ streamId }) {
if (state.paymentReceived && state.inventoryReserved) {
shippingAggregate.loadStream(streamId).shipOrder()
}
},
async afterInventoryReserved({ streamId }) {
if (state.paymentReceived && state.inventoryReserved) {
shippingAggregate.loadStream(streamId).shipOrder()
}
},
}))

When .withState() is used, additional methods become available:

  • .state(): Returns the current state of the process manager (after hydration).
  • .refreshState(): Triggers a full state refresh by replaying all relevant events from the beginning.

Both withEventHandlers and withAfterEffects handlers receive a second argument with a refreshing boolean:

.withEventHandlers({
async onPaymentSucceeded(event, { refreshing }) {
if (refreshing) {
// Skip external API calls during refresh
return
}
await notifyPaymentGateway(event.streamId)
},
})

The library includes built-in infinite loop detection. If a process manager listens to an event and then emits the exact same event (same type, same stream, same payload), the library will throw an InfiniteLoopError to prevent runaway event cycles.