Skip to content

createProjection

Projections are created via eventStore.createProjection(name). They transform the append-only event stream into query-optimized read models — typically database tables. Projections support full replay from scratch and per-stream replay.

const projection = eventStore.createProjection('user-list')

Registers event handlers for this projection. Each handler is named on{EventName} and receives the event object and a helpers object.

projection.withEventHandlers({
async onUserCreated(event, { replaying, replayUntil }) {
await db.insert(usersTable).values({
id: event.streamId,
name: event.payload.name,
email: event.payload.email,
})
},
async onUserRenamed(event) {
await db
.update(usersTable)
.set({ name: event.payload.name })
.where(eq(usersTable.id, event.streamId))
},
})
PropertyTypeDescription
replayingbooleantrue when the handler is called during a replay operation.
replayUntil(position, filter?)Promise<void>Replays events for the current stream up to the given position. Useful for complex projections that need historical context.

Registers a catch-all handler that is called for every event, regardless of type. Receives the event and a ReplayProjection methods object.

projection.withGlobalEventHandler(async (event, methods) => {
await db.insert(eventLogTable).values({
position: event.position,
type: event.type,
streamId: event.streamId,
})
})

Enables replay support. This allows the projection to be fully rebuilt from the event history.

projection.withReplay({
// Called before replaying all events — clear the entire table
deleteAll: async () => {
await db.delete(usersTable)
},
// Called before replaying a single stream — clear one record
deleteOne: async (id) => {
await db.delete(usersTable).where(eq(usersTable.id, id))
},
})
PropertyTypeRequiredDescription
deleteAll() => MaybePromise<void>Enables full replay via .replay().
deleteOne(id: string) => MaybePromise<void>Enables per-stream replay via .replayOne(id).

Enables snapshot support for this projection. Snapshots periodically capture derived state to speed up hydration.

projection.withSnapshots((generate) =>
generate
.every(50)
.schema(z.object({ count: z.number() }))
.apply((snapshot) => { /* restore state */ })
.capture((event) => ({ count: currentCount }))
)

Disables the default exclusive (per-stream) locking. When called, event handlers may be invoked concurrently for different events on the same stream.

projection.concurrent()

These methods are available on the projection after calling .withReplay():

Performs a full replay: deletes all projected data, then re-applies every event from the store. Requires deleteAll in the replay config.

await projection.replay()

Replays events for a single stream: deletes the projected data for that stream, then re-applies all events for it. Requires deleteOne in the replay config.

await projection.replayOne('user-123')

Replays events for a single stream up to a specific event position. Optionally accepts a filter function.

await projection.replayUntil('user-123', 500)

Returns a promise that resolves when the projection has finished its initial catch-up hydration.

await projection.isReady()

These functions can be called from anywhere in your code to detect whether the current execution context is inside a projection handler.

import { isInsideProjection, getProjectionInfo } from '@requence/event-sourcing'
FunctionReturn TypeDescription
isInsideProjection()booleanReturns true if called within a projection event handler.
getProjectionInfo(){ name, event } | undefinedReturns the projection name and triggering event, or undefined.
const userList = eventStore.createProjection('user-list')
.withEventHandlers({
async onUserCreated({ streamId, payload }) {
await db.insert(usersTable).values({
id: streamId,
name: payload.name,
email: payload.email,
})
},
async onUserRenamed({ streamId, payload }) {
await db
.update(usersTable)
.set({ name: payload.name })
.where(eq(usersTable.id, streamId))
},
async onUserDeleted({ streamId }) {
await db
.delete(usersTable)
.where(eq(usersTable.id, streamId))
},
})
.withReplay({
deleteAll: () => db.delete(usersTable),
deleteOne: (id) => db.delete(usersTable).where(eq(usersTable.id, id)),
})