Commands and Events
The command plane as the only write path for aggregates, the transactional outbox, optimistic concurrency, and the versioned event envelope.
The command plane is the only write path for aggregate entities. A command runs spec-driven validation, then — inside one tenant-stamped transaction — writes the aggregate row and appends exactly one versioned event to a transactional outbox. A leader-elected relay publishes that event to Redis Streams on commit. This page covers the command shape, the executor flow, optimistic concurrency, batches, and the event envelope.
The command
type Command struct {
Entity string // registry name, e.g. "asset"
Op Op // OpCreate | OpUpdate | OpDelete
AggID string // required for update/delete; minted (ULID) on empty create
Payload any // the entity's grove model for create/update; nil for delete
ExpectedVersion *int64 // optional optimistic-concurrency guard
}Op is the operation, and Op.Verb() maps it to the event verb used in the event type:
const (
OpCreate Op = iota // inserts a new aggregate (version 1)
OpUpdate // replaces an existing aggregate's row (version+1)
OpDelete // removes the row; the deletion is still a versioned event
)
func (o Op) Verb() string // OpCreate -> "created", OpUpdate -> "updated", OpDelete -> "deleted"The result reports the outcome of one command:
type Result struct {
AggID string
Version int64
EventID string
}res, err := f.Exec(tenantCtx, command.Command{
Entity: "asset",
Op: command.OpCreate,
Payload: &domain.Asset{Name: "Pump 7", Kind: "pump", SiteID: siteID},
})
// res.AggID is the minted ULID, res.Version == 1, res.EventID is the outbox event id.The structural columns — id, tenant_id, version — are stamped by the executor. Caller-provided id and version are ignored, and a payload carrying a foreign tenant_id is rejected. Document entities (KindDocument) cannot be written here at all: the executor rejects them, because document writes go through the document plane.
Executor flow
A single Exec runs one command in its own transaction. The flow is: validate everything possible before opening a transaction, then inside one tenant-stamped transaction do the version check, the row write, and exactly one outbox append.
Prepare (pre-transaction). Assert the context tenant, look up the entity in the registry, reject non-aggregate kinds, bind the payload to column values, check the tenant-forgery rule, and enforce required (NOT NULL, no default) columns. For an empty-AggID create, a ULID is minted here.
Version check (in transaction). Read the aggregate's CurrentVersion (0 if absent) and apply the create/update/delete existence rules plus any ExpectedVersion guard. The next version is current + 1.
Apply the row. ApplyChange writes the row with the structurally stamped column values (or deletes it for OpDelete).
Append exactly one event. AppendOutbox appends one versioned event.Envelope to the transactional outbox in the same transaction.
Commit. The row write and the outbox append commit atomically. There is no window where one exists without the other.
// the core of Executor.apply, inside InTenantTx
current, err := tx.CurrentVersion(ctx, p.entity, p.aggID)
// ... checkVersion ...
next := current + 1
vals := p.stampedValues(next)
tx.ApplyChange(ctx, p.entity, p.cmd.Op, p.aggID, next, vals)
env, _ := newEnvelope(p, next, vals, x.now(), x.traceparent(ctx))
tx.AppendOutbox(ctx, env)
return Result{AggID: p.aggID, Version: next, EventID: env.ID}Optimistic concurrency
Set ExpectedVersion to make a write conditional on the stored version. If it does not match, the command fails with a VersionConflictError (which matches errors.Is(err, fabriq.ErrVersionConflict)):
v := int64(3)
_, err := f.Exec(tenantCtx, command.Command{
Entity: "asset",
Op: command.OpUpdate,
AggID: assetID,
Payload: &updated,
ExpectedVersion: &v,
})
if errors.Is(err, fabriq.ErrVersionConflict) {
// someone else wrote since you read; reload and retry
}The same check also enforces existence: an OpCreate against an existing aggregate conflicts (expected 0, found N), and an OpUpdate/OpDelete against a missing one returns NotFoundError.
Batches: ExecBatch
ExecBatch runs N commands in one transaction: ordered, all-or-nothing. Every command is prepared and validated before the transaction opens; then each is applied in order, and any failure rolls back the whole batch.
results, err := f.ExecBatch(tenantCtx, []command.Command{
{Entity: "site", Op: command.OpCreate, Payload: &site},
{Entity: "asset", Op: command.OpCreate, Payload: &asset}, // references the new site
})Each command still produces exactly one versioned event, so a batch of N commands appends N envelopes — but they commit together, and the relay publishes them in order.
The transactional outbox and the event envelope
Each applied command appends one event.Envelope — the wire shape of one domain event — to the outbox in the same transaction as the row write:
type Envelope struct {
ID string // ULID: lexically sortable, minted at command time
TenantID string // consumers never re-derive it from the payload
Aggregate string // registry entity name, e.g. "asset"
AggID string // aggregate instance id
Version int64 // the aggregate version after this event
Type string // derived type, e.g. "asset.updated"
At time.Time // commit-side timestamp
PayloadSchemaVersion int // payload shape version; upcasters migrate forward
Payload json.RawMessage // column-keyed JSON of the row after the change ({} for deletes)
Traceparent string // W3C trace context across the async hop
}The payload is the column-keyed row after the change, so projections and subscribers always see the table shape. The ID is a monotonic ULID minted at command time, which the relay relies on for ordered publishing. The Traceparent carries the active W3C trace context across the async hop, so a single trace spans command → outbox → relay → projection apply (the executor stamps it by default).
Delivery on commit
The leader-elected relay publishes envelopes to Redis Streams after the transaction commits. It polls the outbox with FOR UPDATE SKIP LOCKED and is woken by a pg_notify issued inside the command transaction — so the wake-up can never outrun the data, and the interval poll is the correctness mechanism while the notify is only a latency optimization. Exactly one relay is active at a time, elected via a Postgres advisory lock on a dedicated connection (ADR 0004); the LISTEN/NOTIFY wake-up uses grove's Listener (ADR 0005).
This is at-least-once delivery. The single source stream is consumed by the projection engine and the subscription hub through their own consumer groups. Idempotency is restored downstream by version gating: a consumer skips an event whose version it has already applied. See Projections for the consumer side and Subscriptions for the delta side.
Lifecycle hooks
For cross-cutting concerns that must run on every write — auditing being the canonical one — register a LifecycleHook. It fires inside the write transaction, once per change, after the row and its outbox event are staged and before commit:
type LifecycleHook interface {
OnChange(ctx context.Context, tx command.Tx, change command.Change) error
}
type Change struct {
Entity *registry.Entity
Op command.Op // resolved op: OpCreate / OpUpdate / OpDelete
Envelope event.Envelope // the same event written to the outbox
}A hook can do two things a post-commit consumer cannot. It can participate — write its own rows atomically through tx.Exec(ctx, sql, args...), committed or rolled back with the change — and it can veto: returning an error aborts the whole command (and any ExecBatch it belongs to), rolling back the aggregate write, the outbox event, and the hook's own writes together. Hooks run in registration order; the first error short-circuits. The transaction is already tenant-stamped, so a hook's writes into an RLS-guarded side table are scoped to the calling tenant.
Register globally with fabriq.WithLifecycleHook(...). An auditing/chronicle extension hooks in here to record every change atomically with the data, so the audit trail can never diverge from the source of truth:
audit := command.HookFunc(func(ctx context.Context, tx command.Tx, ch command.Change) error {
return tx.Exec(ctx,
`INSERT INTO chronicle_audit (id, tenant_id, entity, agg_id, version, op, payload)
VALUES ($1, $2, $3, $4, $5, $6, $7)`,
ch.Envelope.ID, ch.Envelope.TenantID, ch.Envelope.Aggregate, ch.Envelope.AggID,
ch.Envelope.Version, ch.Op.Verb(), []byte(ch.Envelope.Payload))
})
f, _, _ := fabriq.Open(ctx, reg, cfg, fabriq.WithLifecycleHook(audit))This is the in-transaction, write-side seam — distinct from the per-entity Validate input check (which runs before the transaction) and the post-commit projection appliers (which run asynchronously off the outbox). Use Validate to reject bad input, a lifecycle hook to record or gate every committed change, and an applier to derive read models.
Payload schema evolution: upcasters
When a payload shape changes, register an upcaster instead of rewriting history. An Upcaster is a pure function that migrates one event type's payload from FromVersion to FromVersion+1:
type Upcaster struct {
Type string
FromVersion int
Fn func(json.RawMessage) (json.RawMessage, error)
}Register a chain and install it with fabriq.WithUpcasters(chain). The chain is applied at decode time, walking from the envelope's PayloadSchemaVersion upward until no upcaster matches — so appliers and consumers only ever see the latest shape. Register one vN -> vN+1 step per evolved event type.
chain := event.NewUpcasterChain()
chain.MustRegister(event.Upcaster{
Type: "asset.updated", FromVersion: 1,
Fn: func(p json.RawMessage) (json.RawMessage, error) {
// e.g. rename a field, add a default
return migrated, nil
},
})
f, _, _ := fabriq.Open(ctx, reg, cfg, fabriq.WithUpcasters(chain))