Skip to content

createAggregateRoot

createAggregateRoot defines a domain entity — its events, state, business rules, and commands — using a type-safe fluent builder API. Aggregate roots are the primary write-side primitive: they enforce invariants and emit events.

import { createAggregateRoot } from '@requence/event-sourcing'
function createAggregateRoot<Name extends string>(
type: Name
): AggregateRootApi<Name>

type — A unique name that identifies this aggregate root. This becomes the streamType on all events emitted by this root.

The builder enforces a specific order through its type system. Each method returns a narrowed type that only exposes the valid next steps.

Defines the mutable state object that event handlers will modify. The state is deep-cloned for each new stream.

createAggregateRoot('user')
.withInitialState({
name: '',
email: '',
active: true,
})
ParameterTypeDescription
stateRecord<string, unknown>The initial state shape.

Registers the events this aggregate root can emit, using Zod schemas for runtime payload validation. Event names must be capitalized.

Accepts either a plain object or a callback with utilities:

// Plain object form
.withEvents({
UserCreated: z.object({
name: z.string(),
email: z.string().email(),
}),
UserRenamed: z.object({
name: z.string(),
}),
})
// Callback form (provides z and version helper)
.withEvents(({ z, version }) => ({
UserCreated: z.object({
name: z.string(),
email: z.string().email(),
}),
// Versioned event
UserRenamed: [
version(0, z.object({ name: z.string() })),
version(1, z.object({ name: z.string(), reason: z.string().optional() })),
],
}))

Defines how the aggregate’s internal state changes when events are applied. These handlers are called both when events are emitted and when they are replayed during stream loading.

.withEventHandlers((state) => ({
onUserCreated({ payload }) {
state.name = payload.name
state.email = payload.email
},
onUserRenamed({ payload }) {
state.name = payload.name
},
}))

The callback receives:

  • state — The mutable state object.
  • utils — An object with reset() and replayUntil(streamVersion) helpers.

The event handlers themselves receive an event object with: type, payload, version, streamId, and streamType.

Defines the commands that external code can invoke. Commands validate business rules and emit events.

.withCommands((state, event, { streamId, extendLock }) => ({
create(data: { name: string; email: string }) {
return event('UserCreated', {
name: data.name,
email: data.email,
})
},
rename(name: string) {
if (!state.active) {
throw new Error('Cannot rename an inactive user')
}
return event('UserRenamed', { name })
},
}))

The callback receives:

  • state — A read-only view of the current state (or omitted if no state was defined).
  • event — The typed event builder function. Call event('EventName', payload) to emit.
  • options.streamId — The ID of the current stream.
  • options.extendLock — Extend the lock duration for long-running commands.

Commands can return:

  • A single WrappedEvent
  • An array of WrappedEvent
  • An Iterable or AsyncIterable of WrappedEvent
  • void (no events emitted)

Enables snapshot support for this aggregate root. Snapshots periodically capture state to avoid replaying the entire event history on every load.

.withSnapshots((state, generate, { z }) =>
generate
.every(100)
.schema(z.object({
name: z.string(),
email: z.string(),
active: z.boolean(),
}))
.apply((snapshot) => {
Object.assign(state, snapshot.state)
})
.capture(() => ({
name: state.name,
email: state.email,
active: state.active,
}))
)

See the Snapshot Generator section below for the full snapshot API.

Once an aggregate root is linked to an event store, you interact with it through streams.

Creates a new stream. Optionally accepts a custom ID; otherwise a UUID is generated.

const user = userRoot.newStream()
user.create({ name: 'Alice', email: 'alice@example.com' })
await user.settled()

Loads an existing stream by ID. The state is rebuilt by replaying all past events (or from the latest snapshot). An optional filter function can exclude specific events during replay.

const user = userRoot.loadStream('user-123')
user.rename('Bob')
await user.settled()

All streams expose the following methods alongside the commands defined via withCommands:

MethodReturn TypeDescription
.settled()Promise<Stream>Waits for all queued commands to be persisted and all downstream listeners to finish processing. Returns the stream for chaining.
.state()Promise<State>Returns the current state after all pending commands have been applied.
.transformError(ErrorClass, handler)StreamRegisters an error transformer for a specific error type thrown inside commands.
.streamIdstringThe unique ID of this stream instance.

Commands are chainable and execute in order:

const user = userRoot.newStream()
user
.create({ name: 'Alice', email: 'alice@example.com' })
.rename('Bob')
.deactivate()
await user.settled()

Use .transformError() to map domain errors to application-level errors:

const user = userRoot.loadStream('user-123')
.transformError(DomainError, (err) => new AppError(err.originalError.message))
user.rename('Bob')
await user.settled() // throws AppError instead of CommandError

The snapshot builder (available via withSnapshots) uses a fluent API:

MethodDescription
.every(n)Capture a snapshot every n events applied.
.schema(zodSchema, version?)Register a Zod schema for snapshot validation. The version defaults to 0.
.apply(fn)Define how a loaded snapshot restores state. Receives the snapshot object.
.capture(fn)Define how to serialize the current state into a snapshot. Receives the triggering event.

Aggregate roots expose inference helpers for use in generic code:

PropertyDescription
root.$inferStateThe state type managed by this aggregate root.
root.$inferEventsUnion type of all output events.
root.$inferEventMapped type: { EventName: EventType }.
root.$inferCommandParametersMapped type: { commandName: Parameters<Command> }.
root.typeThe string literal type name.