createEventStore
createEventStore is the central orchestrator of the library. It connects your aggregate roots to the underlying storage, manages event distribution to projections, process managers, and event listeners, and handles initialization and replay.
Import
Section titled “Import”import { createEventStore } from '@requence/event-sourcing'Signature
Section titled “Signature”function createEventStore<Root extends AnyAggregateRoot>( params: EventStoreParams<Root>): EventStore<Event, Root>Parameters
Section titled “Parameters”The function accepts a single configuration object:
| Parameter | Type | Required | Description |
|---|---|---|---|
loadEvents | LoadEvents | ✓ | Async generator that loads events from the underlying storage. |
appendEvents | AppendEvents | ✓ | Function that persists new events and returns them with assigned positions. |
aggregateRoots | Root[] | ✓ | Array of aggregate root instances to register. |
checkpoint | CheckpointMethods | ✓ | Methods for persisting projection/process manager checkpoints. |
lock | LockCreator | Custom locking strategy. Defaults to an in-memory lock with a 5-second timeout. | |
aggregateRootSnapshot | AggregateRootSnapshotMethods | Enables snapshot support for aggregate roots. | |
projectionSnapshot | ProjectionSnapshotMethods | Enables snapshot support for projections. | |
postProcessEvent | (event, aggregateRoot) => WrappedEvent | Hook to mutate or decorate events before they are appended. | |
onProjectionReplay | OnProgress | Callbacks for tracking projection replay progress. | |
onProcessManagerRefresh | OnProgress | Callbacks for tracking process manager refresh progress. | |
autoInit | boolean | When set to false, the event store will not initialize automatically. You must call .init() manually. Defaults to true. |
Return Value
Section titled “Return Value”Returns an EventStore object with the following methods:
createProjection(name)
Section titled “createProjection(name)”Creates and registers a new Projection with the given name. Each name must be unique within the event store.
const userList = eventStore.createProjection('user-list') .withEventHandlers({ async onUserCreated({ streamId, payload }) { /* ... */ } })createProcessManager(name)
Section titled “createProcessManager(name)”Creates and registers a new Process Manager with the given name.
const cascadeDelete = eventStore.createProcessManager('cascade-delete') .withEventHandlers({ async onOrganizationDeleted({ payload }) { /* ... */ } })createEventListener(name)
Section titled “createEventListener(name)”Creates and registers a new Event Listener with the given name.
const auditLog = eventStore.createEventListener('audit-log') .withEventHandlers({ async onUserCreated({ payload }) { /* ... */ } })getAggregateRoot(type)
Section titled “getAggregateRoot(type)”Returns a registered aggregate root by its type name. Fully type-safe — the return type narrows based on the provided type string.
const userRoot = eventStore.getAggregateRoot('user')transaction(handler)
Section titled “transaction(handler)”Executes the given async handler inside a transaction. Events from all aggregate roots within the handler are batched and committed atomically.
await eventStore.transaction(async () => { const org = orgRoot.newStream() org.create({ name: 'Acme' })
const user = userRoot.newStream() user.create({ name: 'Alice', orgId: org.streamId })})init()
Section titled “init()”Manually initializes the event store. Required when autoInit is set to false. Returns the event store instance for chaining.
const store = createEventStore({ autoInit: false, /* ... */ })await store.init()isReady()
Section titled “isReady()”Returns a promise that resolves when all projections and process managers have finished their initial catch-up hydration.
await eventStore.isReady()// All projections and process managers are now up to daterebuild()
Section titled “rebuild()”Performs a full rebuild of all replayable projections and stateful process managers. This re-reads every event from the store and re-applies them from scratch.
await eventStore.rebuild()Usage Example
Section titled “Usage Example”import { createEventStore } from '@requence/event-sourcing'import { createDrizzleAdapter } from '@requence/event-sourcing/drizzle'import userRoot from './aggregates/user.ts'import orgRoot from './aggregates/organization.ts'
const adapter = createDrizzleAdapter(db)
const eventStore = createEventStore({ loadEvents: adapter.loadEvents, appendEvents: adapter.appendEvents, checkpoint: adapter.checkpoint, aggregateRoots: [userRoot, orgRoot],})
await eventStore.isReady()OnProgress Callback
Section titled “OnProgress Callback”The onProjectionReplay and onProcessManagerRefresh parameters accept an object with the following optional callbacks:
type OnProgress = { begin?: (name: string) => void progress?: (name: string, index: number, event: BaseOutputEvent) => void end?: (name: string) => void}