Skip to content

Aggregate Roots

An Aggregate Root is a core concept from Domain-Driven Design (DDD). It acts as a consistency boundary around a set of entities and value objects. In the context of Event Sourcing, the Aggregate Root is responsible for:

  1. Accepting Commands: It receives requests to change state (e.g., update).
  2. Validating Business Rules: It checks if the command is valid given its current state.
  3. Emitting Events: If the command is valid, it produces one or more events that represent the outcome (e.g., ProjectRenamed).
  4. Applying Events: It updates its own internal state based on the events it has just emitted or loaded from history.

The Aggregate Root is the only part of your system that can write new events to the event stream for its domain.

In @requence/event-sourcing, you define an Aggregate Root by using createAggregateRoot(). A definition consists of four main parts, chained in a fluent API:

  1. Initial State (.withInitialState()): The default state of a new aggregate.
  2. Events (.withEvents()): Zod schemas defining the shape of each event the aggregate can emit.
  3. Event Handlers (.withEventHandlers()): Functions that mutate the aggregate’s internal state when an event occurs.
  4. Commands (.withCommands()): Methods that validate input and emit events.

Here is a Project aggregate root, modeled after a real-world implementation:

import { createAggregateRoot } from '@requence/event-sourcing'
import { z } from 'zod/v4'
export default createAggregateRoot('project')
// 1. Define the initial state
.withInitialState({
id: null as string | null,
name: null as string | null,
description: null as string | null,
groupId: null as string | null,
deleted: false,
})
// 2. Define event schemas using Zod
.withEvents({
ProjectCreated: z.object({
name: z.string().min(1),
description: z.string().nullable().default(null),
groupId: z.uuid(),
}),
ProjectUpdated: z.object({
name: z.string().min(1),
description: z.string().nullable(),
}).partial(),
ProjectDeleted: z.null(), // Event with no payload
})
// 3. Define how events mutate the state
.withEventHandlers((state) => ({
onProjectCreated({ streamId, payload }) {
state.id = streamId
state.name = payload.name
state.description = payload.description
state.groupId = payload.groupId
},
onProjectUpdated({ payload }) {
state.name = payload.name || state.name
state.description = payload.description || state.description
},
onProjectDeleted() {
state.deleted = true
},
}))
// 4. Define commands that emit events
.withCommands((state, event) => ({
create(groupId: string, name: string, description?: string) {
if (state.id) throw new Error('Project already created')
return event('ProjectCreated', { groupId, name, description })
},
update(name?: string, description?: string) {
if (state.deleted) throw new Error('Cannot update a deleted project')
// Only emit if something actually changed
if (state.name === name && state.description === description) return
return event('ProjectUpdated', { name, description })
},
delete() {
if (state.deleted) throw new Error('Project already deleted')
return event('ProjectDeleted')
},
}))
  • Event names must be Capitalized (e.g. ProjectCreated, not projectCreated). The library enforces this at the type level.
  • Event handlers follow the naming convention on<EventName> (e.g., onProjectCreated).
  • Commands return the result of calling event(EventName, payload). Return undefined (or nothing) to perform a no-op if no state change is needed.
  • For events with no payload, define the schema as z.null() and call event('EventName') without a second argument.

Once an Aggregate Root is defined and registered with your Event Store, you interact with it through two primary methods:

Use newStream() to create a brand-new aggregate instance. This generates a random UUID as the stream ID:

import aggregateRoot from './aggregateRoot'
// Create a new project (generates a UUID stream ID)
const project = aggregateRoot
.newStream()
.create(groupId, 'My Project', 'A description')
// Wait for all events to be persisted and downstream listeners to finish
await project.settled()

Use loadStream(id) to load an existing aggregate by its stream ID. This replays all past events to reconstruct the current state before executing any commands:

// Load an existing project by its stream ID
aggregateRoot
.loadStream(projectId)
.update('New Name', 'New Description')

Commands are designed to be chained. The underlying system queues each command and executes them sequentially against the aggregate’s state:

aggregateRoot
.loadStream(projectId)
.update('New Name')
.delete()
  • .settled(): Waits for all commands to be executed, events to be persisted, and all downstream listeners (projections, process managers, event listeners) to finish processing.
  • .state(): Returns the aggregate’s current state after all pending commands have been applied (but does not wait for downstream processing).
// Get the state after commands are applied
const state = await aggregateRoot
.newStream()
.create(groupId, 'My Project')
.state()
console.log(state.id) // the generated UUID
console.log(state.name) // 'My Project'

Use .transformError() to map specific error types to more descriptive errors:

aggregateRoot
.loadStream(projectId)
.transformError(Error, (commandError) => {
return new CustomApiError(
`Failed to update project: ${commandError.message}`
)
})
.update('New Name')