Projections
What is a Projection?
Section titled “What is a Projection?”A Projection (also known as a Read Model) is a way to transform the append-only stream of events into a format that is optimized for querying.
While Event Sourcing is great for writing and maintaining a perfect history, querying an event store directly is inefficient. For example, to find all projects created by a specific user, you would have to read every event in the entire system.
Projections solve this by listening to events as they happen and updating a separate data store (like a PostgreSQL database, an in-memory object, or a search index) into a structure designed specifically for your application’s read operations.
Defining a Projection
Section titled “Defining a Projection”In @requence/event-sourcing, you define a Projection using eventStore.createProjection(). You provide a set of event handlers that specify how the projection’s database should be updated when an event is received.
Example
Section titled “Example”Here is a projection that updates a SQL database (using Drizzle ORM) whenever project-related events occur:
import eventStore from '~/eventSourcing/eventStore'import db from '~/db/index'import { projects } from './tables'import { eq } from 'drizzle-orm'
eventStore .createProjection('projects') .withEventHandlers({ async onProjectCreated({ streamId, payload, createdAt, actorId }) { await db.insert(projects).values({ id: streamId, name: payload.name, createdAt, createdBy: actorId, ...payload, }) },
async onProjectUpdated({ streamId, payload, createdAt }) { await db .update(projects) .set({ ...payload, updatedAt: createdAt, }) .where(eq(projects.id, streamId)) },
async onProjectDeleted({ streamId }) { await db .delete(projects) .where(eq(projects.id, streamId)) }, })Event Handler Parameters
Section titled “Event Handler Parameters”Each event handler receives an event object with these properties:
| Property | Type | Description |
|---|---|---|
streamId | string | The ID of the aggregate stream that emitted the event |
streamType | string | The type of the aggregate (e.g., 'project') |
type | string | The event type name (e.g., 'ProjectCreated') |
payload | object | The event data, typed according to the Zod schema |
createdAt | Date | When the event was created |
actorId | string? | The ID of the user who triggered the event |
position | number | The global position of the event in the event store |
correlationId | string? | ID linking related events across aggregates |
causationId | string? | ID of the command that caused this event |
metadata | object? | Arbitrary metadata attached to the event |
Global Event Handler
Section titled “Global Event Handler”If you need to react to every event (regardless of type), use .withGlobalEventHandler():
eventStore .createProjection('audit-log') .withGlobalEventHandler(async (event) => { // Runs for every event that passes through the projection await db.insert(auditLog).values({ streamId: event.streamId, eventType: event.type, actorId: event.actorId, createdAt: event.createdAt, }) })Replayability
Section titled “Replayability”One of the most powerful features of Event Sourcing is the ability to recreate projections from scratch. If you add a new field to your read model, or if you create a completely new projection, you can “replay” all past events through it to populate it with historical data.
Enable replay by chaining .withReplay() after .withEventHandlers():
eventStore .createProjection('projects') .withEventHandlers({ async onProjectCreated({ streamId, payload, createdAt, actorId }) { await db.insert(projects).values({ id: streamId, createdAt, createdBy: actorId, ...payload, }) }, async onProjectUpdated({ streamId, payload, createdAt }) { await db .update(projects) .set({ ...payload, updatedAt: createdAt }) .where(eq(projects.id, streamId)) }, async onProjectDeleted({ streamId }) { await db.delete(projects).where(eq(projects.id, streamId)) }, }) .withReplay({ // Required for full replays via eventStore.rebuild() async deleteAll() { await db.delete(projects) }, // Required for single-stream replays via projection.replayOne(id) async deleteOne(id) { await db.delete(projects).where(eq(projects.id, id)) }, })Replay Methods
Section titled “Replay Methods”Once .withReplay() is configured, the projection exposes these methods:
deleteAll→ enableseventStore.rebuild()to fully rebuild all projections from scratch.deleteOne→ enablesprojection.replayOne(streamId)to rebuild a single stream’s data.
The replaying Flag
Section titled “The replaying Flag”Event handlers receive a second argument with a replaying boolean, which you can use to skip side effects during replay:
async onProjectCreated(event, { replaying }) { await db.insert(projects).values({ /* ... */ })
// Don't send notifications during replay if (!replaying) { await sendSlackNotification(event.payload.name) }}Concurrency
Section titled “Concurrency”By default, projections process events exclusively per stream — events for the same streamId are serialized in order. Use .concurrent() to opt out of this locking if your projection can safely handle concurrent events:
eventStore .createProjection('analytics') .concurrent() .withEventHandlers({ /* ... */ })