Aggregate Roots
What is an Aggregate Root?
Section titled “What is an Aggregate Root?”An Aggregate Root is a core concept from Domain-Driven Design (DDD). It acts as a consistency boundary around a set of entities and value objects. In the context of Event Sourcing, the Aggregate Root is responsible for:
- Accepting Commands: It receives requests to change state (e.g.,
update). - Validating Business Rules: It checks if the command is valid given its current state.
- Emitting Events: If the command is valid, it produces one or more events that represent the outcome (e.g.,
ProjectRenamed). - Applying Events: It updates its own internal state based on the events it has just emitted or loaded from history.
The Aggregate Root is the only part of your system that can write new events to the event stream for its domain.
Defining an Aggregate Root
Section titled “Defining an Aggregate Root”In @requence/event-sourcing, you define an Aggregate Root by using createAggregateRoot(). A definition consists of four main parts, chained in a fluent API:
- Initial State (
.withInitialState()): The default state of a new aggregate. - Events (
.withEvents()): Zod schemas defining the shape of each event the aggregate can emit. - Event Handlers (
.withEventHandlers()): Functions that mutate the aggregate’s internal state when an event occurs. - Commands (
.withCommands()): Methods that validate input and emit events.
Example
Section titled “Example”Here is a Project aggregate root, modeled after a real-world implementation:
import { createAggregateRoot } from '@requence/event-sourcing'import { z } from 'zod/v4'
export default createAggregateRoot('project') // 1. Define the initial state .withInitialState({ id: null as string | null, name: null as string | null, description: null as string | null, groupId: null as string | null, deleted: false, }) // 2. Define event schemas using Zod .withEvents({ ProjectCreated: z.object({ name: z.string().min(1), description: z.string().nullable().default(null), groupId: z.uuid(), }), ProjectUpdated: z.object({ name: z.string().min(1), description: z.string().nullable(), }).partial(), ProjectDeleted: z.null(), // Event with no payload }) // 3. Define how events mutate the state .withEventHandlers((state) => ({ onProjectCreated({ streamId, payload }) { state.id = streamId state.name = payload.name state.description = payload.description state.groupId = payload.groupId }, onProjectUpdated({ payload }) { state.name = payload.name || state.name state.description = payload.description || state.description }, onProjectDeleted() { state.deleted = true }, })) // 4. Define commands that emit events .withCommands((state, event) => ({ create(groupId: string, name: string, description?: string) { if (state.id) throw new Error('Project already created') return event('ProjectCreated', { groupId, name, description }) }, update(name?: string, description?: string) { if (state.deleted) throw new Error('Cannot update a deleted project') // Only emit if something actually changed if (state.name === name && state.description === description) return return event('ProjectUpdated', { name, description }) }, delete() { if (state.deleted) throw new Error('Project already deleted') return event('ProjectDeleted') }, }))Key Details
Section titled “Key Details”- Event names must be Capitalized (e.g.
ProjectCreated, notprojectCreated). The library enforces this at the type level. - Event handlers follow the naming convention
on<EventName>(e.g.,onProjectCreated). - Commands return the result of calling
event(EventName, payload). Returnundefined(or nothing) to perform a no-op if no state change is needed. - For events with no payload, define the schema as
z.null()and callevent('EventName')without a second argument.
Concurrency & Locking
Section titled “Concurrency & Locking”Every time you call newStream() or loadStream(id), the library automatically acquires a lock on that specific stream before any commands are executed. This prevents simultaneous edits to the same aggregate instance and eliminates event stream version collisions (ConcurrencyError).
The lock is held for the duration of the command chain and released after the events have been appended. If no commands are issued, the lock is released immediately.
How It Works
Section titled “How It Works”- Lock acquired — Before any command runs, a mutex is acquired for the stream key (
<type>:<id>). - Commands execute — All chained commands run sequentially against the locked stream.
- Events appended — The resulting events are persisted with the expected version.
- Lock released — The lock is released after persistence (or on error).
By default, the library uses an in-memory lock backed by async-mutex with a 5-second TTL. This is safe for single-process deployments but does not protect across multiple instances.
Distributed Locking with Redis
Section titled “Distributed Locking with Redis”For multi-instance deployments (e.g. behind a load balancer), use redisLock to coordinate locks across all instances via Redis:
import { createEventStore } from '@requence/event-sourcing/drizzle'import { redisLock } from '@requence/event-sourcing/redis'import Redis from 'ioredis'
const redis = new Redis(process.env.REDIS_URL)
const eventStore = createEventStore({ database: db, aggregateRoots: [project, user], lock: redisLock(redis),})redisLock uses atomic Redis SET NX PX commands and Lua scripts for safe extend/release, ensuring no two instances can modify the same stream concurrently.
Using the Aggregate Root
Section titled “Using the Aggregate Root”Once an Aggregate Root is defined and registered with your Event Store, you interact with it through two primary methods:
Creating a New Stream
Section titled “Creating a New Stream”Use newStream() to create a brand-new aggregate instance. This generates a random UUID as the stream ID:
import aggregateRoot from './aggregateRoot'
// Create a new project (generates a UUID stream ID)const project = aggregateRoot .newStream() .create(groupId, 'My Project', 'A description')
// Wait for all events to be persisted and downstream listeners to finishawait project.settled()Loading an Existing Stream
Section titled “Loading an Existing Stream”Use loadStream(id) to load an existing aggregate by its stream ID. This replays all past events to reconstruct the current state before executing any commands:
// Load an existing project by its stream IDaggregateRoot .loadStream(projectId) .update('New Name', 'New Description')Command Chaining
Section titled “Command Chaining”Commands are designed to be chained. The underlying system queues each command and executes them sequentially against the aggregate’s state:
aggregateRoot .loadStream(projectId) .update('New Name') .delete()Awaiting Results
Section titled “Awaiting Results”.settled(): Waits for all commands to be executed, events to be persisted, and all downstream listeners (projections, process managers, event listeners) to finish processing..state(): Returns the aggregate’s current state after all pending commands have been applied (but does not wait for downstream processing).
// Get the state after commands are appliedconst state = await aggregateRoot .newStream() .create(groupId, 'My Project') .state()
console.log(state.id) // the generated UUIDconsole.log(state.name) // 'My Project'Error Handling
Section titled “Error Handling”Use .transformError() to map specific error types to more descriptive errors:
aggregateRoot .loadStream(projectId) .transformError(Error, (commandError) => { return new CustomApiError( `Failed to update project: ${commandError.message}` ) }) .update('New Name')