Use this detailed reference when you need the full evtstore toolbox for TypeScript/Node.js backends.
Use this skill when building or extending backend features with evtstore in a TypeScript/Node.js project and you need deeper implementation guidance.
Choose this skill when you need:
Use evtstore-quickstart instead when you want a compact first-feature walkthrough.
By the end, you should be able to:
createAggregate(...)createCommands(...)createDomain(...)is typically used in a CQRS + event sourcing setup.
evtstoreTypical flow:
Official references:
This structure is flexible, but a clean baseline is:
src/
backend/
es/
aggregates/
command/
types/
domain.ts
projection.ts
projection-shared.ts
queries.ts
shared.ts
For a new aggregate department, you will often add:
types/department.tsaggregates/department.tscommand/department.tsdomain.tsprojection.tsprojection-shared.tsqueries.tsnpm install evtstore mongodb
If your commands need hashing, validation, or IDs, add those separately:
npm install bcrypt uuid zod
Each aggregate usually has three core types.
Use discriminated unions with past-tense event names.
export type DepartmentEvt =
| {
type: 'departmentCreated';
tenantId: string;
name: string;
description?: string;
dataContext: 'live' | 'test';
performedBy?: string;
}
| {
type: 'departmentUpdated';
name?: string;
description?: string;
performedBy?: string;
}
| {
type: 'departmentManagerAdded';
userPhone: string;
performedBy?: string;
}
| {
type: 'departmentManagerRemoved';
userPhone: string;
performedBy?: string;
}
| {
type: 'departmentDeleted';
deletedBy?: string;
};
This is the reconstructed current state.
export type DepartmentAgg = {
tenantId?: string;
name: string;
managers: string[];
description?: string;
dataContext: 'live' | 'test';
deleted?: boolean;
deletedBy?: string;
};
Keep command names intention-based.
export type DepartmentCmd =
| {
type: 'create';
tenantId: string;
name: string;
description?: string;
dataContext: 'live' | 'test';
performedBy?: string;
}
| {
type: 'update';
name?: string;
description?: string;
performedBy?: string;
}
| {
type: 'addManager';
userPhone: string;
performedBy?: string;
}
| {
type: 'removeManager';
userPhone: string;
performedBy?: string;
}
| {
type: 'delete';
deletedBy?: string;
};
Define aggregates with createAggregate(...).
Official references:
import { createAggregate } from 'evtstore';
import type { DepartmentEvt, DepartmentAgg } from '../types/department';
export const departmentAgg = createAggregate<DepartmentEvt, DepartmentAgg, 'departments'>({
stream: 'departments',
create: (): DepartmentAgg => ({
tenantId: '',
name: '',
managers: [],
description: '',
dataContext: 'test',
deleted: false
}),
fold: (evt, prev) => {
switch (evt.type) {
case 'departmentCreated':
return {
...prev,
tenantId: evt.tenantId,
name: evt.name,
description: evt.description || '',
managers: [],
dataContext: evt.dataContext,
deleted: false,
deletedBy: undefined
};
case 'departmentUpdated':
return {
...prev,
name: evt.name ?? prev.name,
description: evt.description ?? prev.description
};
case 'departmentManagerAdded':
return {
...prev,
managers: [...prev.managers, evt.userPhone]
};
case 'departmentManagerRemoved':
return {
...prev,
managers: prev.managers.filter((phone) => phone !== evt.userPhone)
};
case 'departmentDeleted':
return {
...prev,
deleted: true,
deletedBy: evt.deletedBy
};
default:
return prev;
}
}
});
...prev unless you are certain your evtstore setup merges partial results as expectedPass the stream name as a string literal generic when creating an aggregate:
createAggregate<DepartmentEvt, DepartmentAgg, 'departments'>(...)
This improves TypeScript inference for createHandler(...) and helps evtstore narrow valid streams and event types.
Official references:
evtstore supports persisted aggregate snapshots via createAggregate({ version, persistAggregate }).
Example:
export const departmentAgg = createAggregate<DepartmentEvt, DepartmentAgg, 'departments'>({
stream: 'departments',
create: () => ({ tenantId: '', name: '', managers: [], description: '', dataContext: 'test' }),
fold,
version: '1',
persistAggregate: true
});
Use this only when aggregate hydration cost is high and you need the optimization.
Important warning:
fold, bump versionversion, evtstore may reuse a persisted aggregate built with old fold logicSafe default: do not enable aggregate persistence until you actually need it.
Official references:
Set up the provider and register all aggregates in one domain.
Official references:
import { createDomain } from 'evtstore';
import { type Bookmark, createProvider, migrate } from 'evtstore/provider/mongo';
import { createEventsMapper } from 'evtstore/provider/util';
import { MongoClient, Timestamp } from 'mongodb';
import type { Provider, StoreEvent } from 'evtstore';
import { departmentAgg } from './aggregates/department';
import { userAccountAgg } from './aggregates/user-account';
export type Event = DepartmentEvt | UserAccountEvt;
const LIMIT = 1000;
let providerPromise: Promise<Provider<Event>> | undefined;
async function getMongoProvider(): Promise<Provider<Event>> {
if (!providerPromise) {
providerPromise = (async () => {
const mongoUri = process.env.MONGO_URI;
if (!mongoUri) throw new Error('MongoDB connection not configured');
const client = await MongoClient.connect(mongoUri);
const events = client.db().collection<StoreEvent<Event>>('events');
const bookmarks = client.db().collection<Bookmark>('bookmarks');
const provider = createProvider({
limit: LIMIT,
events,
bookmarks
});
await migrate(events, bookmarks);
return provider;
})();
}
return providerPromise;
}
const createEvents = createEventsMapper<Event>(new Timestamp({ t: 0, i: 0 }));
const providerInstance: Provider<Event> = {
limit: LIMIT,
driver: 'mongo',
onError: () => {},
getPosition: async (bookmark) => (await getMongoProvider()).getPosition(bookmark),
setPosition: async (bookmark, position) =>
(await getMongoProvider()).setPosition(bookmark, position),
getEventsFrom: async (stream, position, limit) =>
(await getMongoProvider()).getEventsFrom(stream, position, limit),
getEventsFor: async (stream, aggregateId, fromPosition) =>
(await getMongoProvider()).getEventsFor(stream, aggregateId, fromPosition),
getLastEventFor: async (stream, aggregateId) =>
(await getMongoProvider()).getLastEventFor(stream, aggregateId),
createEvents,
append: async (stream, aggregateId, version, event) =>
(await getMongoProvider()).append(stream, aggregateId, version, event)
};
export const { domain, createHandler } = createDomain(
{ provider: providerInstance },
{
departments: departmentAgg,
userAccounts: userAccountAgg
}
);
Event for registered aggregate eventsMONGO_URI is missingcreateDomain(...) supports useCache:
export const { domain, createHandler } = createDomain(
{ provider: providerInstance, useCache: true },
{ departments: departmentAgg, userAccounts: userAccountAgg }
);
Use it as a performance optimization when aggregate reload cost matters. Treat it as an optimization, not a replacement for good modeling.
Official references:
Define command handlers with createCommands(...).
Official references:
import { createCommands } from 'evtstore';
import type { DepartmentEvt, DepartmentAgg, DepartmentCmd } from '../types/department';
import { domain } from '../domain';
function normalizePhone(input: string) {
return input.trim();
}
function getChangedFields<T extends object>(
input: T,
current: Record<string, unknown>,
fields: Array<keyof T>
) {
const changes: Record<string, unknown> = {};
for (const field of fields) {
const next = input[field];
if (next !== undefined && next !== current[field as string]) {
changes[field as string] = next;
}
}
return changes;
}
export const departmentCmd = createCommands<DepartmentEvt, DepartmentAgg, DepartmentCmd>(
domain.departments,
{
async create(cmd, agg) {
if (agg.version) throw new Error('Department already exists');
return {
type: 'departmentCreated',
tenantId: cmd.tenantId,
name: cmd.name,
description: cmd.description,
dataContext: cmd.dataContext,
performedBy: cmd.performedBy
};
},
async update(cmd, agg) {
if (!agg.version) throw new Error('Department not found');
if (agg.deleted) throw new Error('Department is deleted');
const changes = getChangedFields(cmd, agg, ['name', 'description']);
if (Object.keys(changes).length === 0) return [];
return {
...changes,
type: 'departmentUpdated',
performedBy: cmd.performedBy
} as DepartmentEvt;
},
async addManager(cmd, agg) {
if (!agg.version) throw new Error('Department not found');
if (agg.deleted) throw new Error('Department is deleted');
const normalizedPhone = normalizePhone(cmd.userPhone);
if (agg.managers.includes(normalizedPhone)) {
throw new Error('User is already a manager');
}
return {
type: 'departmentManagerAdded',
userPhone: normalizedPhone,
performedBy: cmd.performedBy
};
},
async removeManager(cmd, agg) {
if (!agg.version) throw new Error('Department not found');
if (agg.deleted) throw new Error('Department is deleted');
const normalizedPhone = normalizePhone(cmd.userPhone);
if (!agg.managers.includes(normalizedPhone)) {
throw new Error('User is not a manager');
}
return {
type: 'departmentManagerRemoved',
userPhone: normalizedPhone,
performedBy: cmd.performedBy
};
},
async delete(cmd, agg) {
if (!agg.version) throw new Error('Department not found');
if (agg.deleted) throw new Error('Department already deleted');
return {
type: 'departmentDeleted',
deletedBy: cmd.deletedBy
};
}
}
);
The method name selects the command. Do not pass type in the payload.
Correct:
await departmentCmd.create('department-123', {
tenantId: 'tenant-1',
name: 'Engineering',
description: 'Engineering team',
dataContext: 'live',
performedBy: '08012345678'
});
await departmentCmd.addManager('department-123', {
userPhone: '08012345678',
performedBy: '08099999999'
});
Wrong:
await departmentCmd.create('department-123', {
type: 'create',
tenantId: 'tenant-1',
name: 'Engineering',
dataContext: 'live'
});
Official references:
agg.versionExample multi-event return:
return [
{ type: 'requisitionReviewed', reviewedBy: cmd.reviewedBy, reviewType: cmd.reviewType },
{ type: 'requisitionStatusChanged', status: 'approved', performedBy: cmd.reviewedBy }
];
Projections build read models from the event stream.
Official references:
import { createHandler } from './domain';
import { getMongoClient } from './projection-shared';
export interface DepartmentProjection {
departmentId: string;
tenantId: string;
name: string;
description: string;
managers: string[];
dataContext: 'live' | 'test';
deleted: boolean;
deletedBy?: string;
createdAt: string;
updatedAt: string;
}
const departmentsModel = createHandler('departments-model', ['departments']);
departmentsModel.handle('departments', 'departmentCreated', async (id, event, meta) => {
const client = await getMongoClient();
const collection = client.db().collection<DepartmentProjection>('departments');
await collection.replaceOne(
{ departmentId: id },
{
departmentId: id,
tenantId: event.tenantId,
name: event.name,
description: event.description || '',
managers: [],
dataContext: event.dataContext,
deleted: false,
createdAt: meta.timestamp.toISOString(),
updatedAt: meta.timestamp.toISOString()
},
{ upsert: true }
);
});
departmentsModel.handle('departments', 'departmentUpdated', async (id, event, meta) => {
const client = await getMongoClient();
const collection = client.db().collection<DepartmentProjection>('departments');
const updateFields: Partial<DepartmentProjection> = {
updatedAt: meta.timestamp.toISOString()
};
if (event.name !== undefined) updateFields.name = event.name;
if (event.description !== undefined) updateFields.description = event.description;
await collection.updateOne({ departmentId: id }, { $set: updateFields });
});
departmentsModel.handle('departments', 'departmentManagerAdded', async (id, event, meta) => {
const client = await getMongoClient();
const collection = client.db().collection<DepartmentProjection>('departments');
await collection.updateOne(
{ departmentId: id },
{
$addToSet: { managers: event.userPhone },
$set: { updatedAt: meta.timestamp.toISOString() }
}
);
});
departmentsModel.handle('departments', 'departmentManagerRemoved', async (id, event, meta) => {
const client = await getMongoClient();
const collection = client.db().collection<DepartmentProjection>('departments');
await collection.updateOne(
{ departmentId: id },
{
$pull: { managers: event.userPhone },
$set: { updatedAt: meta.timestamp.toISOString() }
}
);
});
departmentsModel.handle('departments', 'departmentDeleted', async (id, event, meta) => {
const client = await getMongoClient();
const collection = client.db().collection<DepartmentProjection>('departments');
await collection.updateOne(
{ departmentId: id },
{
$set: {
deleted: true,
deletedBy: event.deletedBy,
updatedAt: meta.timestamp.toISOString()
}
}
);
});
handle(stream, eventType, handler)meta.timestamp for projection timestampsreplaceOne(..., { upsert: true }) for create events$set, $addToSet, $pull, $unset for updatescreateHandler(...) accepts options that are useful for projections and process managers:
const departmentsModel = createHandler('departments-model', ['departments'], {
tailStream: false,
alwaysTailStream: false,
continueOnError: false
});
tailStream: when first started, begin at the end of stream historyalwaysTailStream: every start begins at the end of historycontinueOnError: continue processing after a handler throws, while delegating the error to the providerUse tailStream for new subscribers that should ignore history. Use continueOnError only when you have acceptable error handling and observability.
Official references:
Useful handler methods:
start() to begin the processing loopstop() to stop processingrunOnce() to process all currently unhandled events oncereset() to clear local bookmark state and force reloading bookmark positionOfficial references:
Example startup:
export function startProjection() {
departmentsModel.start();
userAccountsModel.start();
}
runOnce()runOnce() is especially useful in tests and one-shot processing:
await departmentCmd.create('department-123', {
tenantId: 'tenant-1',
name: 'Engineering',
dataContext: 'live'
});
await departmentsModel.runOnce();
const department = await queryDepartments.getById('department-123');
expect(department?.name).toBe('Engineering');
If you need the handler to re-read its bookmark state, call reset() before the next run.
Official references:
One handler can consume events from multiple streams as long as they come from the same provider.
const auditModel = createHandler('audit-model', ['users', 'profiles', 'posts']);
auditModel.handle('users', 'userCreated', async (id, event, meta) => {
// ...
});
auditModel.handle('posts', 'postCreated', async (id, event, meta) => {
// ...
});
Use this for composite read models or process managers that coordinate across aggregates.
Official references:
Read from projection collections, not from event streams.
import type { WithId } from 'mongodb';
import { getMongoClient } from './projection-shared';
import type { DepartmentProjection } from './projection';
function cleanDoc<T>(doc: WithId<T> | null): T | null {
if (!doc) return null;
const { _id: _, ...rest } = doc;
return rest as T;
}
function cleanDocs<T>(docs: WithId<T>[]): T[] {
return docs.map((doc) => {
const { _id: _, ...rest } = doc;
return rest as T;
});
}
export const queryDepartments = {
getById: async (id: string): Promise<DepartmentProjection | null> => {
const client = await getMongoClient();
const doc = await client
.db()
.collection<DepartmentProjection>('departments')
.findOne({ departmentId: id });
return cleanDoc(doc as WithId<DepartmentProjection> | null);
},
getByTenant: async (tenantId: string): Promise<DepartmentProjection[]> => {
const client = await getMongoClient();
const docs = await client
.db()
.collection<DepartmentProjection>('departments')
.find({ tenantId, deleted: { $ne: true } })
.toArray();
return cleanDocs(docs as WithId<DepartmentProjection>[]);
}
};
_id before returning app-level typesA shared singleton client is a good default.
import { MongoClient } from 'mongodb';
const singletonClientPromise: Record<string, Promise<MongoClient> | null> = {};
export async function getMongoClient(mongoUrl?: string): Promise<MongoClient> {
const uri = mongoUrl ?? process.env.MONGO_URI;
if (!uri) {
throw new Error('MONGO_URI environment variable is not set.');
}
if (singletonClientPromise[uri]) {
return singletonClientPromise[uri] as Promise<MongoClient>;
}
const client = new MongoClient(uri, {
maxPoolSize: 50,
minPoolSize: 5,
maxIdleTimeMS: 30000,
connectTimeoutMS: 10000,
socketTimeoutMS: 60000,
serverSelectionTimeoutMS: 5000,
retryWrites: true,
retryReads: true
});
singletonClientPromise[uri] = client.connect();
return await singletonClientPromise[uri];
}
MongoDB is a strong default and a good fit for many Node.js backends, but evtstore is not limited to MongoDB.
Official references:
Common provider options include:
postgrespgknexThe modeling approach stays mostly the same across providers:
If your app supports sandbox/test/live separation, include a dataContext field in create events and projection documents.
Example:
type DataContext = 'live' | 'test';
Guidelines:
dataContext only on createProjection reads may lag behind writes.
If you need immediate feedback after a write, choose one of these patterns:
Example:
await departmentCmd.create(id, payload);
await new Promise((resolve) => setTimeout(resolve, 300));
const department = await queryDepartments.getById(id);
Use waiting sparingly. Prefer designing flows that tolerate projection lag.
Best candidates for unit tests:
Example fold test:
describe('foldDepartmentEvent', () => {
it('applies departmentCreated', () => {
const prev = {
tenantId: '',
name: '',
managers: [],
description: '',
dataContext: 'test' as const,
deleted: false
};
const evt: DepartmentEvt = {
type: 'departmentCreated',
tenantId: 'tenant-1',
name: 'Engineering',
dataContext: 'live'
};
const next = departmentAgg.fold(evt, prev);
expect(next.name).toBe('Engineering');
expect(next.dataContext).toBe('live');
});
});
For command tests, prefer integration-style tests that call command methods the same way production code does:
await departmentCmd.create('department-123', {
tenantId: 'tenant-1',
name: 'Engineering',
dataContext: 'live'
});
performedBy, timestamps, and deletion metadata where useful for auditabilitytype in command invocation payloadsWhen adding a new evtstore-backed feature:
Evt, Agg, and Cmd typescreateAggregate(...)createCommands(...)createDomain(...)handle(...)await userAccountCmd.create(phone, {
phone,
dataContext: 'live',
performedBy: phone
});
const changes = getChangedFields(cmd, agg, ['name', 'description']);
if (Object.keys(changes).length === 0) return [];
await collection.updateOne(
{ departmentId: id },
{ $set: { updatedAt: meta.timestamp.toISOString(), name: event.name } }
);
const { _id: _, ...rest } = doc;
return rest as DepartmentProjection;
If you are unsure how to implement a new feature, start with one aggregate, one command file, one projection collection, and one query module. Keep the first version small, then extend it.