Skip to content

createEventStore

createEventStore is the central orchestrator of the library. It connects your aggregate roots to the underlying storage, manages event distribution to projections, process managers, and event listeners, and handles initialization and replay.

import { createEventStore } from '@requence/event-sourcing'
function createEventStore<Root extends AnyAggregateRoot>(
params: EventStoreParams<Root>
): EventStore<Event, Root>

The function accepts a single configuration object:

ParameterTypeRequiredDescription
loadEventsLoadEventsAsync generator that loads events from the underlying storage.
appendEventsAppendEventsFunction that persists new events and returns them with assigned positions.
aggregateRootsRoot[]Array of aggregate root instances to register.
checkpointCheckpointMethodsMethods for persisting projection/process manager checkpoints.
lockLockCreatorCustom locking strategy for aggregate root streams. Defaults to an in-memory lock with a 5-second TTL. Use redisLock from @requence/event-sourcing/redis for distributed deployments.
aggregateRootSnapshotAggregateRootSnapshotMethodsEnables snapshot support for aggregate roots.
projectionSnapshotProjectionSnapshotMethodsEnables snapshot support for projections.
postProcessEvent(event, aggregateRoot) => WrappedEventHook to mutate or decorate events before they are appended.
onProjectionReplayOnProgressCallbacks for tracking projection replay progress.
onProcessManagerRefreshOnProgressCallbacks for tracking process manager refresh progress.
autoInitbooleanWhen set to false, the event store will not initialize automatically. You must call .init() manually. Defaults to true.

Aggregate root streams are automatically locked before any command is executed. This prevents concurrent modifications to the same stream and eliminates event stream version collisions.

By default, the library uses an in-memory mutex (via async-mutex) with a 5-second TTL. This is suitable for single-process deployments.

For multi-instance deployments, pass redisLock as the lock parameter to coordinate across all instances via Redis:

import { createEventStore } from '@requence/event-sourcing/drizzle'
import { redisLock } from '@requence/event-sourcing/redis'
import Redis from 'ioredis'
const redis = new Redis(process.env.REDIS_URL)
const eventStore = createEventStore({
database: db,
aggregateRoots: [user, project],
lock: redisLock(redis), // distributed lock via Redis
})
ParameterTypeDefaultDescription
clientRedis (ioredis)An ioredis client instance.
defaultTtlnumber5000Lock time-to-live in milliseconds.

The implementation uses atomic SET NX PX for acquisition and Lua scripts for safe extend/release, preventing split-brain issues.

Returns an EventStore object with the following methods:

Creates and registers a new Projection with the given name. Each name must be unique within the event store.

const userList = eventStore.createProjection('user-list')
.withEventHandlers({
async onUserCreated({ streamId, payload }) { /* ... */ }
})

Creates and registers a new Process Manager with the given name.

const cascadeDelete = eventStore.createProcessManager('cascade-delete')
.withEventHandlers({
async onOrganizationDeleted({ payload }) { /* ... */ }
})

Creates and registers a new Event Listener with the given name.

const auditLog = eventStore.createEventListener('audit-log')
.withEventHandlers({
async onUserCreated({ payload }) { /* ... */ }
})

Returns a registered aggregate root by its type name. Fully type-safe — the return type narrows based on the provided type string.

const userRoot = eventStore.getAggregateRoot('user')

Executes the given async handler inside a transaction. Events from all aggregate roots within the handler are batched and committed atomically.

await eventStore.transaction(async () => {
const org = orgRoot.newStream()
org.create({ name: 'Acme' })
const user = userRoot.newStream()
user.create({ name: 'Alice', orgId: org.streamId })
})

Manually initializes the event store. Required when autoInit is set to false. Returns the event store instance for chaining.

const store = createEventStore({ autoInit: false, /* ... */ })
await store.init()

Returns a promise that resolves when all projections and process managers have finished their initial catch-up hydration.

await eventStore.isReady()
// All projections and process managers are now up to date

Performs a full rebuild of all replayable projections and stateful process managers. This re-reads every event from the store and re-applies them from scratch.

await eventStore.rebuild()
import { createEventStore } from '@requence/event-sourcing'
import { createDrizzleAdapter } from '@requence/event-sourcing/drizzle'
import userRoot from './aggregates/user.ts'
import orgRoot from './aggregates/organization.ts'
const adapter = createDrizzleAdapter(db)
const eventStore = createEventStore({
loadEvents: adapter.loadEvents,
appendEvents: adapter.appendEvents,
checkpoint: adapter.checkpoint,
aggregateRoots: [userRoot, orgRoot],
})
await eventStore.isReady()

The onProjectionReplay and onProcessManagerRefresh parameters accept an object with the following optional callbacks:

type OnProgress = {
begin?: (name: string) => void
progress?: (name: string, index: number, event: BaseOutputEvent) => void
end?: (name: string) => void
}