Skip to content

Storage Adapters

@requence/event-sourcing ships with pluggable storage adapters that implement the low-level persistence layer — event storage, checkpoints, and snapshots. The core library defines the interfaces; each adapter provides a concrete createEventStore function that wires everything together.

AdapterImportUse case
In-Memory@requence/event-sourcing/memoryTesting, prototyping, single-process apps
Drizzle (PostgreSQL)@requence/event-sourcing/drizzleProduction workloads

Both adapters expose the same createEventStore and createAggregateRoot API, so you can swap between them by changing a single import.


The in-memory adapter stores events, checkpoints, and snapshots in plain JavaScript arrays and maps. It requires no external dependencies and works out of the box.

import { createEventStore, createAggregateRoot } from '@requence/event-sourcing/memory'
const eventStore = createEventStore({
aggregateRoots: [userAggregateRoot],
})
  • Unit and integration tests — no database setup needed, tests run instantly.
  • Prototyping — focus on your domain logic before committing to a database.
  • Single-process applications — lightweight apps that don’t need durable persistence.
import { createEventStore, createAggregateRoot } from '@requence/event-sourcing/memory'
const eventStore = createEventStore({
// Required — one or more aggregate roots
aggregateRoots: [user, project],
// Optional — disable automatic initialization (default: true)
autoInit: false,
// Optional — callback invoked after events are appended
onEventsAppended(events) {
console.log(`${events.length} event(s) appended`)
},
// Optional — progress callbacks for projection replays
onProjectionReplay: {
begin(name) { console.log(`Replaying ${name}…`) },
end(name) { console.log(`${name} replay complete`) },
},
// Optional — progress callbacks for process manager refreshes
onProcessManagerRefresh: {
begin(name) { console.log(`Refreshing ${name}…`) },
end(name) { console.log(`${name} refresh complete`) },
},
// Optional — post-process events before they are appended
postProcessEvent(event, aggregateRoot) {
return event
},
})

The in-memory adapter provides full support for all event store features:

  • Event storage with position tracking and concurrency checks
  • Checkpoints for projection and process manager catch-up
  • Aggregate root snapshots for fast stream rehydration
  • Projection snapshots with applied-count tracking
  • ConcurrencyError thrown when the expected stream version doesn’t match

The Drizzle adapter persists events to a PostgreSQL database using Drizzle ORM. It is the recommended choice for production.

import { createEventStore, createAggregateRoot } from '@requence/event-sourcing/drizzle'
import { drizzle } from 'drizzle-orm/node-postgres'
const db = drizzle(process.env.DATABASE_URL!)
const eventStore = createEventStore({
database: db,
aggregateRoots: [userAggregateRoot],
})

The adapter uses a dedicated event_sourcing PostgreSQL schema with the following tables:

TablePurpose
eventsAppend-only event log with position, stream, and metadata
checkpointsLast-processed position per projection/process manager
aggregate_root_snapshotsCached aggregate state for fast rehydration
projection_snapshotsCached projection state for snapshot-based projections
projection_applied_eventsCounter for snapshot frequency tracking

The table definitions are exported from @requence/event-sourcing/drizzle/postgres for use in Drizzle migrations:

import {
events,
checkpoints,
aggregateRootSnapshots,
projectionSnapshots,
projectionAppliedEvents,
} from '@requence/event-sourcing/drizzle/postgres'
const eventStore = createEventStore({
// Required — Drizzle database instance or factory
database: db,
// Required — one or more aggregate roots
aggregateRoots: [user, project],
// Optional — disable automatic initialization
autoInit: false,
// Optional — callback invoked after events are appended
onEventsAppended(events) {
// e.g. broadcast via WebSocket
},
// Optional — replay and refresh progress callbacks
onProjectionReplay: { /* ... */ },
onProcessManagerRefresh: { /* ... */ },
// Optional — post-process events before persistence
postProcessEvent(event, aggregateRoot) {
return event
},
})

Because both adapters conform to the same EventStore interface, switching between them is a one-line change:

import { createEventStore, createAggregateRoot } from '@requence/event-sourcing/drizzle'
import { createEventStore, createAggregateRoot } from '@requence/event-sourcing/memory'
const eventStore = createEventStore({
database: db,
aggregateRoots: [user],
})

A common pattern is to select the adapter based on the environment:

const { createEventStore } = process.env.NODE_ENV === 'test'
? await import('@requence/event-sourcing/memory')
: await import('@requence/event-sourcing/drizzle')