Fabriq

Projections

How the graph and search planes are derived from the event stream as engine-neutral mutations, kept idempotent by version gating, and always rebuildable from Postgres.

The graph and search planes are projections: derived, rebuildable views of the source of truth, never written directly. The projection engine consumes the committed event stream, translates each event into engine-neutral mutations through a pure applier, and lets the adapters render them into their own dialect. Because every projection is reproducible from Postgres, a corrupt or stale projection is always recoverable. This page is conceptual; the operator workflow lives in Rebuild and Reconcile.

Engine-neutral mutations

Appliers are pure: apply(Event) -> []Mutation. A Mutation carries no engine dialect — no Cypher, no Elasticsearch DSL. The adapters translate the closed set of mutations (FalkorDB MERGE, Elasticsearch bulk ops) and gate each one on its Version for idempotency.

type Mutation interface{ isMutation() }

// graph mutations
type NodeUpsert struct { Label string; ID string; Props map[string]any; Version int64 }
type EdgeUpsert struct { Rel, FromLabel, FromID, ToLabel, ToID string; Version int64 }
type NodeDelete struct { Label, ID string }
type EdgeDelete struct { Rel, FromLabel, FromID string; Version int64 }

// search mutations
type DocIndex   struct { Index, ID string; Doc map[string]any; Version int64 }
type DocDeindex struct { Index, ID string; Version int64 }

The Version on each mutation is the originating event's aggregate version. Adapters gate on it: a mutation is skipped when the stored target version is already greater than or equal to it. This is what makes at-least-once delivery safe — a redelivered or out-of-order event cannot regress the projection. In Elasticsearch this is external_gte versioning; in the graph it is a version comparison on the node.

The projection engine

The engine consumes one projection's events through a Redis consumer group and applies them: upcast → pure applier → sink (per target) → applied bookkeeping.

type Engine struct {
	Projection string // "graph" | "search"
	Group      string // "proj:graph" | "proj:search"
	Source     Source
	Sink       Sink
	Applier    Applier
	Upcasters  *event.UpcasterChain // optional; appliers see the latest shape
	State      AppliedRecorder
	TargetsFor func(ctx context.Context, tenantID string) ([]string, error)
}

Consumer groups give the engine two properties for free. Scale-out without a leader: run replicas with distinct consumer names and the group distributes entries — no leader election needed, unlike the relay. Recovery: stuck entries are reclaimed via XAUTOCLAIM, so a crashed consumer's in-flight work is picked up by another.

Delivery is at-least-once end to end. On the happy path an applied event is acked; a transient sink failure leaves the entry pending for redelivery (version gating makes the replay safe). A poison event — one whose payload cannot be upcast, cannot be applied, or whose tenant cannot be derived — is skipped rather than left to wedge the group, because it can never apply; the reconciler heals the aggregate from Postgres later.

Projection state bookkeeping

Each (tenant, projection) has one bookkeeping row tracking the live pointer and stream position:

type State struct {
	TenantID     string
	Projection   string // "graph" | "search"
	ModelVersion int    // bumped by rebuilds (the _v{N} suffix)
	EventVersion string // last applied event ULID / stream position
	Status       string // "live" | "building" | "soaking" | "abandoned"
	TargetName   string // engine target currently receiving applies
}

TargetName is the blue-green pointer: the tenant_{id}_v{N} graph or the versioned index behind a tenant's alias. ModelVersion is bumped by each rebuild. Status tracks the rebuild lifecycle. Readers resolve their target through this row, so flipping the pointer is atomic for them.

Separately, the engine records per-aggregate applied versions through an AppliedRecorder (SetApplied), and WaitForProjection reads them back through the StateReader.AppliedVersion port.

Read-your-writes: WaitForProjection

A command commits and its event propagates to the projections asynchronously, so a projection-backed read immediately after a write may not yet reflect it. WaitForProjection is the read-your-writes helper: it blocks until the named projection has applied the aggregate at or beyond a version, or the context deadline expires.

res, err := f.Exec(tenantCtx, command.Command{Entity: "asset", Op: command.OpUpdate, AggID: id, Payload: &a})
// ensure the graph reflects this write before querying it
if err := f.WaitForProjection(tenantCtx, "graph", "asset", res.AggID, res.Version); err != nil {
	if errors.Is(err, fabriq.ErrProjectionLag) {
		// deadline hit before the projection caught up
	}
}
var related []domain.Asset
_ = f.Graph().TraverseAndHydrate(tenantCtx, cypher, params, &related)

It polls the projection-state port on an interval (WithWaitPollInterval, default 25ms) and returns fabriq.ErrProjectionLag when the deadline arrives before the version is reached.

Rebuild and reconcile (at a glance)

Two operations restore a projection from the source of truth. Both are covered in depth in Rebuild and Reconcile; the conceptual shape:

Blue-green rebuild rebuilds a tenant's projection without downtime. The state is marked building, at which point the live engine dual-applies every new event to both the live and the building targets (TargetsFor returns both). The Postgres snapshot is replayed into the building target — rebuilds always replay from Postgres, never from another projection — and version gating makes the overlap with live applies safe in either order. The pointer then flips atomically (ModelVersion++, status soaking); for Elasticsearch the alias swap happens in the flip's OnFlip cutover. After a soak window, Finalize drops the old target and marks the projection live. The graph rebuild is integration-verified to produce an identical graph.

Reconciliation detects per-aggregate drift between Postgres and a projection and repairs it through the ordinary pipeline. It compares id→version maps and classifies three kinds of drift:

type Drift struct {
	Entity           string
	AggID            string
	TruthVersion     int64 // 0 = the row no longer exists (zombie in projection)
	ProjectedVersion int64 // 0 = the projection never saw it (missing)
}
  • missing — Postgres has it, the projection does not (ProjectedVersion == 0).
  • stale — the projection's version trails Postgres.
  • zombie — the projection holds an aggregate Postgres no longer has (TruthVersion == 0).

Repair never writes an engine directly: it republishes the aggregate's latest event (missing/stale) or emits a synthetic deleted event (zombie) through the outbox, so the normal version-gated pipeline heals the drift. A projection legitimately ahead of the truth scan is not drift — events land between the two reads and the pipeline converges on its own.

Where to go next

On this page