createAggregateRoot
createAggregateRoot defines a domain entity — its events, state, business rules, and commands — using a type-safe fluent builder API. Aggregate roots are the primary write-side primitive: they enforce invariants and emit events.
Import
Section titled “Import”import { createAggregateRoot } from '@requence/event-sourcing'Signature
Section titled “Signature”function createAggregateRoot<Name extends string>( type: Name): AggregateRootApi<Name>type — A unique name that identifies this aggregate root. This becomes the streamType on all events emitted by this root.
Builder Methods
Section titled “Builder Methods”The builder enforces a specific order through its type system. Each method returns a narrowed type that only exposes the valid next steps.
.withInitialState(state)
Section titled “.withInitialState(state)”Defines the mutable state object that event handlers will modify. The state is deep-cloned for each new stream.
createAggregateRoot('user') .withInitialState({ name: '', email: '', active: true, })| Parameter | Type | Description |
|---|---|---|
state | Record<string, unknown> | The initial state shape. |
.withEvents(events)
Section titled “.withEvents(events)”Registers the events this aggregate root can emit, using Zod schemas for runtime payload validation. Event names must be capitalized.
Accepts either a plain object or a callback with utilities:
// Plain object form.withEvents({ UserCreated: z.object({ name: z.string(), email: z.string().email(), }), UserRenamed: z.object({ name: z.string(), }),})
// Callback form (provides z and version helper).withEvents(({ z, version }) => ({ UserCreated: z.object({ name: z.string(), email: z.string().email(), }), // Versioned event UserRenamed: [ version(0, z.object({ name: z.string() })), version(1, z.object({ name: z.string(), reason: z.string().optional() })), ],})).withEventHandlers(handler)
Section titled “.withEventHandlers(handler)”Defines how the aggregate’s internal state changes when events are applied. These handlers are called both when events are emitted and when they are replayed during stream loading.
.withEventHandlers((state) => ({ onUserCreated({ payload }) { state.name = payload.name state.email = payload.email }, onUserRenamed({ payload }) { state.name = payload.name },}))The callback receives:
state— The mutable state object.utils— An object withreset()andreplayUntil(streamVersion)helpers.
The event handlers themselves receive an event object with: type, payload, version, streamId, and streamType.
.withCommands(handler)
Section titled “.withCommands(handler)”Defines the commands that external code can invoke. Commands validate business rules and emit events.
.withCommands((state, event, { streamId, extendLock }) => ({ create(data: { name: string; email: string }) { return event('UserCreated', { name: data.name, email: data.email, }) }, rename(name: string) { if (!state.active) { throw new Error('Cannot rename an inactive user') } return event('UserRenamed', { name }) },}))The callback receives:
state— A read-only view of the current state (or omitted if no state was defined).event— The typed event builder function. Callevent('EventName', payload)to emit.options.streamId— The ID of the current stream.options.extendLock— Extend the lock duration for long-running commands.
Commands can return:
- A single
WrappedEvent - An array of
WrappedEvent - An
IterableorAsyncIterableofWrappedEvent void(no events emitted)
.withSnapshots(handler)
Section titled “.withSnapshots(handler)”Enables snapshot support for this aggregate root. Snapshots periodically capture state to avoid replaying the entire event history on every load.
.withSnapshots((state, generate, { z }) => generate .every(100) .schema(z.object({ name: z.string(), email: z.string(), active: z.boolean(), })) .apply((snapshot) => { Object.assign(state, snapshot.state) }) .capture(() => ({ name: state.name, email: state.email, active: state.active, })))See the Snapshot Generator section below for the full snapshot API.
Stream API
Section titled “Stream API”Once an aggregate root is linked to an event store, you interact with it through streams.
root.newStream(id?)
Section titled “root.newStream(id?)”Creates a new stream. Optionally accepts a custom ID; otherwise a UUID is generated.
const user = userRoot.newStream()user.create({ name: 'Alice', email: 'alice@example.com' })await user.settled()root.loadStream(id, filter?)
Section titled “root.loadStream(id, filter?)”Loads an existing stream by ID. The state is rebuilt by replaying all past events (or from the latest snapshot). An optional filter function can exclude specific events during replay.
const user = userRoot.loadStream('user-123')user.rename('Bob')await user.settled()Stream Methods
Section titled “Stream Methods”All streams expose the following methods alongside the commands defined via withCommands:
| Method | Return Type | Description |
|---|---|---|
.settled() | Promise<Stream> | Waits for all queued commands to be persisted and all downstream listeners to finish processing. Returns the stream for chaining. |
.state() | Promise<State> | Returns the current state after all pending commands have been applied. |
.transformError(ErrorClass, handler) | Stream | Registers an error transformer for a specific error type thrown inside commands. |
.streamId | string | The unique ID of this stream instance. |
Command Chaining
Section titled “Command Chaining”Commands are chainable and execute in order:
const user = userRoot.newStream()user .create({ name: 'Alice', email: 'alice@example.com' }) .rename('Bob') .deactivate()
await user.settled()Error Handling
Section titled “Error Handling”Use .transformError() to map domain errors to application-level errors:
const user = userRoot.loadStream('user-123') .transformError(DomainError, (err) => new AppError(err.originalError.message))
user.rename('Bob')await user.settled() // throws AppError instead of CommandErrorSnapshot Generator
Section titled “Snapshot Generator”The snapshot builder (available via withSnapshots) uses a fluent API:
| Method | Description |
|---|---|
.every(n) | Capture a snapshot every n events applied. |
.schema(zodSchema, version?) | Register a Zod schema for snapshot validation. The version defaults to 0. |
.apply(fn) | Define how a loaded snapshot restores state. Receives the snapshot object. |
.capture(fn) | Define how to serialize the current state into a snapshot. Receives the triggering event. |
Type Inference Helpers
Section titled “Type Inference Helpers”Aggregate roots expose inference helpers for use in generic code:
| Property | Description |
|---|---|
root.$inferState | The state type managed by this aggregate root. |
root.$inferEvents | Union type of all output events. |
root.$inferEvent | Mapped type: { EventName: EventType }. |
root.$inferCommandParameters | Mapped type: { commandName: Parameters<Command> }. |
root.type | The string literal type name. |