Skip to content

Projections

A Projection (also known as a Read Model) is a way to transform the append-only stream of events into a format that is optimized for querying.

While Event Sourcing is great for writing and maintaining a perfect history, querying an event store directly is inefficient. For example, to find all projects created by a specific user, you would have to read every event in the entire system.

Projections solve this by listening to events as they happen and updating a separate data store (like a PostgreSQL database, an in-memory object, or a search index) into a structure designed specifically for your application’s read operations.

In @requence/event-sourcing, you define a Projection using eventStore.createProjection(). You provide a set of event handlers that specify how the projection’s database should be updated when an event is received.

Here is a projection that updates a SQL database (using Drizzle ORM) whenever project-related events occur:

import eventStore from '~/eventSourcing/eventStore'
import db from '~/db/index'
import { projects } from './tables'
import { eq } from 'drizzle-orm'
eventStore
.createProjection('projects')
.withEventHandlers({
async onProjectCreated({ streamId, payload, createdAt, actorId }) {
await db.insert(projects).values({
id: streamId,
name: payload.name,
createdAt,
createdBy: actorId,
...payload,
})
},
async onProjectUpdated({ streamId, payload, createdAt }) {
await db
.update(projects)
.set({
...payload,
updatedAt: createdAt,
})
.where(eq(projects.id, streamId))
},
async onProjectDeleted({ streamId }) {
await db
.delete(projects)
.where(eq(projects.id, streamId))
},
})

Each event handler receives an event object with these properties:

PropertyTypeDescription
streamIdstringThe ID of the aggregate stream that emitted the event
streamTypestringThe type of the aggregate (e.g., 'project')
typestringThe event type name (e.g., 'ProjectCreated')
payloadobjectThe event data, typed according to the Zod schema
createdAtDateWhen the event was created
actorIdstring?The ID of the user who triggered the event
positionnumberThe global position of the event in the event store
correlationIdstring?ID linking related events across aggregates
causationIdstring?ID of the command that caused this event
metadataobject?Arbitrary metadata attached to the event

If you need to react to every event (regardless of type), use .withGlobalEventHandler():

eventStore
.createProjection('audit-log')
.withGlobalEventHandler(async (event) => {
// Runs for every event that passes through the projection
await db.insert(auditLog).values({
streamId: event.streamId,
eventType: event.type,
actorId: event.actorId,
createdAt: event.createdAt,
})
})

One of the most powerful features of Event Sourcing is the ability to recreate projections from scratch. If you add a new field to your read model, or if you create a completely new projection, you can “replay” all past events through it to populate it with historical data.

Enable replay by chaining .withReplay() after .withEventHandlers():

eventStore
.createProjection('projects')
.withEventHandlers({
async onProjectCreated({ streamId, payload, createdAt, actorId }) {
await db.insert(projects).values({
id: streamId,
createdAt,
createdBy: actorId,
...payload,
})
},
async onProjectUpdated({ streamId, payload, createdAt }) {
await db
.update(projects)
.set({ ...payload, updatedAt: createdAt })
.where(eq(projects.id, streamId))
},
async onProjectDeleted({ streamId }) {
await db.delete(projects).where(eq(projects.id, streamId))
},
})
.withReplay({
// Required for full replays via eventStore.rebuild()
async deleteAll() {
await db.delete(projects)
},
// Required for single-stream replays via projection.replayOne(id)
async deleteOne(id) {
await db.delete(projects).where(eq(projects.id, id))
},
})

Once .withReplay() is configured, the projection exposes these methods:

  • deleteAll → enables eventStore.rebuild() to fully rebuild all projections from scratch.
  • deleteOne → enables projection.replayOne(streamId) to rebuild a single stream’s data.

Event handlers receive a second argument with a replaying boolean, which you can use to skip side effects during replay:

async onProjectCreated(event, { replaying }) {
await db.insert(projects).values({ /* ... */ })
// Don't send notifications during replay
if (!replaying) {
await sendSlackNotification(event.payload.name)
}
}

By default, projections process events exclusively per stream — events for the same streamId are serialized in order. Use .concurrent() to opt out of this locking if your projection can safely handle concurrent events:

eventStore
.createProjection('analytics')
.concurrent()
.withEventHandlers({ /* ... */ })