Skip to content

createEventStore

createEventStore is the central orchestrator of the library. It connects your aggregate roots to the underlying storage, manages event distribution to projections, process managers, and event listeners, and handles initialization and replay.

import { createEventStore } from '@requence/event-sourcing'
function createEventStore<Root extends AnyAggregateRoot>(
params: EventStoreParams<Root>
): EventStore<Event, Root>

The function accepts a single configuration object:

ParameterTypeRequiredDescription
loadEventsLoadEventsAsync generator that loads events from the underlying storage.
appendEventsAppendEventsFunction that persists new events and returns them with assigned positions.
aggregateRootsRoot[]Array of aggregate root instances to register.
checkpointCheckpointMethodsMethods for persisting projection/process manager checkpoints.
lockLockCreatorCustom locking strategy. Defaults to an in-memory lock with a 5-second timeout.
aggregateRootSnapshotAggregateRootSnapshotMethodsEnables snapshot support for aggregate roots.
projectionSnapshotProjectionSnapshotMethodsEnables snapshot support for projections.
postProcessEvent(event, aggregateRoot) => WrappedEventHook to mutate or decorate events before they are appended.
onProjectionReplayOnProgressCallbacks for tracking projection replay progress.
onProcessManagerRefreshOnProgressCallbacks for tracking process manager refresh progress.
autoInitbooleanWhen set to false, the event store will not initialize automatically. You must call .init() manually. Defaults to true.

Returns an EventStore object with the following methods:

Creates and registers a new Projection with the given name. Each name must be unique within the event store.

const userList = eventStore.createProjection('user-list')
.withEventHandlers({
async onUserCreated({ streamId, payload }) { /* ... */ }
})

Creates and registers a new Process Manager with the given name.

const cascadeDelete = eventStore.createProcessManager('cascade-delete')
.withEventHandlers({
async onOrganizationDeleted({ payload }) { /* ... */ }
})

Creates and registers a new Event Listener with the given name.

const auditLog = eventStore.createEventListener('audit-log')
.withEventHandlers({
async onUserCreated({ payload }) { /* ... */ }
})

Returns a registered aggregate root by its type name. Fully type-safe — the return type narrows based on the provided type string.

const userRoot = eventStore.getAggregateRoot('user')

Executes the given async handler inside a transaction. Events from all aggregate roots within the handler are batched and committed atomically.

await eventStore.transaction(async () => {
const org = orgRoot.newStream()
org.create({ name: 'Acme' })
const user = userRoot.newStream()
user.create({ name: 'Alice', orgId: org.streamId })
})

Manually initializes the event store. Required when autoInit is set to false. Returns the event store instance for chaining.

const store = createEventStore({ autoInit: false, /* ... */ })
await store.init()

Returns a promise that resolves when all projections and process managers have finished their initial catch-up hydration.

await eventStore.isReady()
// All projections and process managers are now up to date

Performs a full rebuild of all replayable projections and stateful process managers. This re-reads every event from the store and re-applies them from scratch.

await eventStore.rebuild()
import { createEventStore } from '@requence/event-sourcing'
import { createDrizzleAdapter } from '@requence/event-sourcing/drizzle'
import userRoot from './aggregates/user.ts'
import orgRoot from './aggregates/organization.ts'
const adapter = createDrizzleAdapter(db)
const eventStore = createEventStore({
loadEvents: adapter.loadEvents,
appendEvents: adapter.appendEvents,
checkpoint: adapter.checkpoint,
aggregateRoots: [userRoot, orgRoot],
})
await eventStore.isReady()

The onProjectionReplay and onProcessManagerRefresh parameters accept an object with the following optional callbacks:

type OnProgress = {
begin?: (name: string) => void
progress?: (name: string, index: number, event: BaseOutputEvent) => void
end?: (name: string) => void
}