Aggregate Roots
What is an Aggregate Root?
Section titled “What is an Aggregate Root?”An Aggregate Root is a core concept from Domain-Driven Design (DDD). It acts as a consistency boundary around a set of entities and value objects. In the context of Event Sourcing, the Aggregate Root is responsible for:
- Accepting Commands: It receives requests to change state (e.g.,
update). - Validating Business Rules: It checks if the command is valid given its current state.
- Emitting Events: If the command is valid, it produces one or more events that represent the outcome (e.g.,
ProjectRenamed). - Applying Events: It updates its own internal state based on the events it has just emitted or loaded from history.
The Aggregate Root is the only part of your system that can write new events to the event stream for its domain.
Defining an Aggregate Root
Section titled “Defining an Aggregate Root”In @requence/event-sourcing, you define an Aggregate Root by using createAggregateRoot(). A definition consists of four main parts, chained in a fluent API:
- Initial State (
.withInitialState()): The default state of a new aggregate. - Events (
.withEvents()): Zod schemas defining the shape of each event the aggregate can emit. - Event Handlers (
.withEventHandlers()): Functions that mutate the aggregate’s internal state when an event occurs. - Commands (
.withCommands()): Methods that validate input and emit events.
Example
Section titled “Example”Here is a Project aggregate root, modeled after a real-world implementation:
import { createAggregateRoot } from '@requence/event-sourcing'import { z } from 'zod/v4'
export default createAggregateRoot('project') // 1. Define the initial state .withInitialState({ id: null as string | null, name: null as string | null, description: null as string | null, groupId: null as string | null, deleted: false, }) // 2. Define event schemas using Zod .withEvents({ ProjectCreated: z.object({ name: z.string().min(1), description: z.string().nullable().default(null), groupId: z.uuid(), }), ProjectUpdated: z.object({ name: z.string().min(1), description: z.string().nullable(), }).partial(), ProjectDeleted: z.null(), // Event with no payload }) // 3. Define how events mutate the state .withEventHandlers((state) => ({ onProjectCreated({ streamId, payload }) { state.id = streamId state.name = payload.name state.description = payload.description state.groupId = payload.groupId }, onProjectUpdated({ payload }) { state.name = payload.name || state.name state.description = payload.description || state.description }, onProjectDeleted() { state.deleted = true }, })) // 4. Define commands that emit events .withCommands((state, event) => ({ create(groupId: string, name: string, description?: string) { if (state.id) throw new Error('Project already created') return event('ProjectCreated', { groupId, name, description }) }, update(name?: string, description?: string) { if (state.deleted) throw new Error('Cannot update a deleted project') // Only emit if something actually changed if (state.name === name && state.description === description) return return event('ProjectUpdated', { name, description }) }, delete() { if (state.deleted) throw new Error('Project already deleted') return event('ProjectDeleted') }, }))Key Details
Section titled “Key Details”- Event names must be Capitalized (e.g.
ProjectCreated, notprojectCreated). The library enforces this at the type level. - Event handlers follow the naming convention
on<EventName>(e.g.,onProjectCreated). - Commands return the result of calling
event(EventName, payload). Returnundefined(or nothing) to perform a no-op if no state change is needed. - For events with no payload, define the schema as
z.null()and callevent('EventName')without a second argument.
Using the Aggregate Root
Section titled “Using the Aggregate Root”Once an Aggregate Root is defined and registered with your Event Store, you interact with it through two primary methods:
Creating a New Stream
Section titled “Creating a New Stream”Use newStream() to create a brand-new aggregate instance. This generates a random UUID as the stream ID:
import aggregateRoot from './aggregateRoot'
// Create a new project (generates a UUID stream ID)const project = aggregateRoot .newStream() .create(groupId, 'My Project', 'A description')
// Wait for all events to be persisted and downstream listeners to finishawait project.settled()Loading an Existing Stream
Section titled “Loading an Existing Stream”Use loadStream(id) to load an existing aggregate by its stream ID. This replays all past events to reconstruct the current state before executing any commands:
// Load an existing project by its stream IDaggregateRoot .loadStream(projectId) .update('New Name', 'New Description')Command Chaining
Section titled “Command Chaining”Commands are designed to be chained. The underlying system queues each command and executes them sequentially against the aggregate’s state:
aggregateRoot .loadStream(projectId) .update('New Name') .delete()Awaiting Results
Section titled “Awaiting Results”.settled(): Waits for all commands to be executed, events to be persisted, and all downstream listeners (projections, process managers, event listeners) to finish processing..state(): Returns the aggregate’s current state after all pending commands have been applied (but does not wait for downstream processing).
// Get the state after commands are appliedconst state = await aggregateRoot .newStream() .create(groupId, 'My Project') .state()
console.log(state.id) // the generated UUIDconsole.log(state.name) // 'My Project'Error Handling
Section titled “Error Handling”Use .transformError() to map specific error types to more descriptive errors:
aggregateRoot .loadStream(projectId) .transformError(Error, (commandError) => { return new CustomApiError( `Failed to update project: ${commandError.message}` ) }) .update('New Name')