createProjection
Projections are created via eventStore.createProjection(name). They transform the append-only event stream into query-optimized read models — typically database tables. Projections support full replay from scratch and per-stream replay.
Creation
Section titled “Creation”const projection = eventStore.createProjection('user-list')Builder Methods
Section titled “Builder Methods”.withEventHandlers(handlers)
Section titled “.withEventHandlers(handlers)”Registers event handlers for this projection. Each handler is named on{EventName} and receives the event object and a helpers object.
projection.withEventHandlers({ async onUserCreated(event, { replaying, replayUntil }) { await db.insert(usersTable).values({ id: event.streamId, name: event.payload.name, email: event.payload.email, }) },
async onUserRenamed(event) { await db .update(usersTable) .set({ name: event.payload.name }) .where(eq(usersTable.id, event.streamId)) },})Event Handler Helpers
Section titled “Event Handler Helpers”| Property | Type | Description |
|---|---|---|
replaying | boolean | true when the handler is called during a replay operation. |
replayUntil(position, filter?) | Promise<void> | Replays events for the current stream up to the given position. Useful for complex projections that need historical context. |
.withGlobalEventHandler(handler)
Section titled “.withGlobalEventHandler(handler)”Registers a catch-all handler that is called for every event, regardless of type. Receives the event and a ReplayProjection methods object.
projection.withGlobalEventHandler(async (event, methods) => { await db.insert(eventLogTable).values({ position: event.position, type: event.type, streamId: event.streamId, })}).withReplay(config)
Section titled “.withReplay(config)”Enables replay support. This allows the projection to be fully rebuilt from the event history.
projection.withReplay({ // Called before replaying all events — clear the entire table deleteAll: async () => { await db.delete(usersTable) }, // Called before replaying a single stream — clear one record deleteOne: async (id) => { await db.delete(usersTable).where(eq(usersTable.id, id)) },})| Property | Type | Required | Description |
|---|---|---|---|
deleteAll | () => MaybePromise<void> | Enables full replay via .replay(). | |
deleteOne | (id: string) => MaybePromise<void> | Enables per-stream replay via .replayOne(id). |
.withSnapshots(handler)
Section titled “.withSnapshots(handler)”Enables snapshot support for this projection. Snapshots periodically capture derived state to speed up hydration.
projection.withSnapshots((generate) => generate .every(50) .schema(z.object({ count: z.number() })) .apply((snapshot) => { /* restore state */ }) .capture((event) => ({ count: currentCount }))).concurrent()
Section titled “.concurrent()”Disables the default exclusive (per-stream) locking. When called, event handlers may be invoked concurrently for different events on the same stream.
projection.concurrent()Replay Methods
Section titled “Replay Methods”These methods are available on the projection after calling .withReplay():
.replay()
Section titled “.replay()”Performs a full replay: deletes all projected data, then re-applies every event from the store. Requires deleteAll in the replay config.
await projection.replay().replayOne(streamId)
Section titled “.replayOne(streamId)”Replays events for a single stream: deletes the projected data for that stream, then re-applies all events for it. Requires deleteOne in the replay config.
await projection.replayOne('user-123').replayUntil(streamId, position, filter?)
Section titled “.replayUntil(streamId, position, filter?)”Replays events for a single stream up to a specific event position. Optionally accepts a filter function.
await projection.replayUntil('user-123', 500)Other Methods
Section titled “Other Methods”.isReady()
Section titled “.isReady()”Returns a promise that resolves when the projection has finished its initial catch-up hydration.
await projection.isReady()Context Helpers
Section titled “Context Helpers”These functions can be called from anywhere in your code to detect whether the current execution context is inside a projection handler.
import { isInsideProjection, getProjectionInfo } from '@requence/event-sourcing'| Function | Return Type | Description |
|---|---|---|
isInsideProjection() | boolean | Returns true if called within a projection event handler. |
getProjectionInfo() | { name, event } | undefined | Returns the projection name and triggering event, or undefined. |
Full Example
Section titled “Full Example”const userList = eventStore.createProjection('user-list') .withEventHandlers({ async onUserCreated({ streamId, payload }) { await db.insert(usersTable).values({ id: streamId, name: payload.name, email: payload.email, }) }, async onUserRenamed({ streamId, payload }) { await db .update(usersTable) .set({ name: payload.name }) .where(eq(usersTable.id, streamId)) }, async onUserDeleted({ streamId }) { await db .delete(usersTable) .where(eq(usersTable.id, streamId)) }, }) .withReplay({ deleteAll: () => db.delete(usersTable), deleteOne: (id) => db.delete(usersTable).where(eq(usersTable.id, id)), })