Fabriq

Live Queries

Maintained result sets — filter + sort + limit subscriptions that emit exact enter/leave/move/update deltas, with Postgres as the ordering oracle.

A subscription (previous page) tells you that some row in a declared scope changed. A live query goes further: you hand fabriq a filter, a sort, and a window size, and it maintains the result set for you, emitting enter / leave / move / update deltas as the world changes — RethinkDB-changefeed / Convex / Meteor / Firestore live-query semantics. The visible window is always the true first-N of the total order: exact top-N at all times.

Live queries reuse the structured filter from the relational plane (query.Where) and keep Postgres authoritative for ordering and the window boundary, so the in-engine fast path never has to reproduce SQL collation semantics to stay correct.

Live queries are additive. Scope subscriptions (f.Subscribe) remain the right tool for "tell me when anything in this scope changes"; reach for a live query when the client wants a maintained, ordered, filtered window it can bind directly to a list UI.

The request

type LiveQuery struct {
	Entity string        // registry entity name
	Where  query.Where   // the same filter AST as relational List (Eq/In/Gt/Like/Or/IsNull…)
	Sort   []SortKey     // ordered; fabriq appends id ASC as the final unique tiebreak
	Limit  int           // window size N
	Cursor *Cursor       // keyset anchor (sort-key tuple); nil = from the head
	Mode   Mode          // ModeMaintained (default)
}

type SortKey struct { Column string; Desc bool }

The filter columns are validated against the entity (the injection guard, via query.ValidateConds), and the sort columns against the entity's declared sortable set (see LiveSpec). Fabriq always appends id ASC as the final tiebreak, so (Sort…, id) is a total order — required for stable keyset pagination. There is deliberately no offset: offset pagination is unstable under churn, so live windows use keyset cursors only.

Two phases over one stream

LiveQuery returns an initial snapshot plus a channel of live deltas:

func (f *Fabriq) LiveQuery(ctx context.Context, q livequery.LiveQuery) (
	livequery.Snapshot, <-chan livequery.LiveDelta, func(), error)
snap, deltas, cancel, err := f.LiveQuery(tenantCtx, livequery.LiveQuery{
	Entity: "asset",
	Where:  query.Where{query.Eq("kind", "pump"), query.Eq("site_id", siteID)},
	Sort:   []livequery.SortKey{{Column: "name"}},
	Limit:  50,
})
if err != nil { /* unknown entity, no LiveSpec, bad column, denied authz, not configured */ }
defer cancel()

render(snap.Rows)            // the initial ordered window, straight from Postgres
for d := range deltas {       // then splice each delta into the rendered list
	apply(d)
}

The snapshot is read through the relational path (RLS-enforced) and carries the first N rows in total order. The handoff is gapless: the engine attaches to the live change feed before taking the snapshot and discards any change whose version the snapshot already reflects, so no row is missed or double-applied across the seam.

The delta

type LiveDelta struct {
	Op       DeltaOp         // OpEnter | OpLeave | OpMove | OpUpdate | OpReset
	AggID    string
	Version  int64           // idempotency key with AggID; stale versions are discarded
	Row      json.RawMessage // column-keyed payload (enter/update/move)
	OldIndex int             // window position before (leave/move/update); -1 if N/A
	NewIndex int             // window position after  (enter/move/update); -1 on leave
	Cursor   Cursor          // stable sort-key tuple — for array splicing AND resume
	StreamID string          // → SSE id:, drives Last-Event-ID resume
	At       time.Time
}

Each delta carries both a numeric window index (for array-splicing list UIs) and the stable sort-key Cursor (for resume and correctness):

OpMeaningClient action
enterrow entered the visible windowinsert at NewIndex (may push the last row out)
leaverow left the visible windowremove at OldIndex
moverow stayed visible, position changedmove OldIndexNewIndex
updaterow stayed at its position, payload changedreplace at NewIndex
resetdiscard the window and re-snapshotrefetch (emitted on failover / overflow)

Exact top-N: the cushion + Postgres oracle

The maintenance contract is exact top-N at all times. The engine holds an ordered prefix of the result — the visible window [0, N) plus a cushion [N, N+C) — maintained as a true prefix of the Postgres-ordered result:

Invariant: the buffered rows are exactly the first len(rows) of the Postgres-ordered result from the anchor. Postgres owns ordering; the in-memory fast path only splices.

On each change the engine evaluates the filter in Go (fast path) and decides where the row sorts relative to the buffer. When a row leaves the visible window, the first cushion row is promoted to take its slot; when the cushion runs low, a single bounded keyset refill (WHERE (sort…, id) > cursor ORDER BY … LIMIT k) tops it back up from Postgres. Because Postgres — not hand-rolled Go comparison — fills and orders the buffer, text collation and type-coercion subtleties can never corrupt the window. This is the hybrid design: an in-engine incremental matcher on the hot path, Postgres authoritative at exactly the two moments correctness is hard (snapshot and boundary).

Opting in: LiveSpec

An entity opts into live queries by declaring a LiveSpec (nil = disabled), mirroring how SearchSpec opts into the search plane:

r.MustRegister(registry.EntitySpec{
	Name:  "asset",
	Model: (*domain.Asset)(nil),
	Live: &registry.LiveSpec{
		Filterable: []string{"name", "kind", "site_id"}, // columns allowed in Where (empty = all)
		Sortable:   []string{"name", "kind"},            // columns allowed in Sort  (empty = all)
		MaxWindow:  500,                                  // cap on Limit
	},
})

Columns are validated against the model at registration — an unknown filterable/sortable column fails fast.

The SSE bridge

Because the query body (filter + sort + limit) does not fit a query string, the subscribe is a POST that upgrades to a Server-Sent Events stream. The example service exposes it at POST /api/v1/live: it writes the snapshot as a snapshot event, then each delta as an event named for its op (enter/leave/move/update) with id = StreamID for Last-Event-ID resume, over the same proxy-safe SSEWriter the scope-subscription plane uses.

snap, deltas, cancel, err := s.fabric.LiveQuery(tctx, q)
// ...
_ = sse.WriteEvent("", "snapshot", snap)
for d := range deltas {
	_ = sse.WriteEvent(d.StreamID, d.Op.String(), d)
}

Authorization

A WithLiveAuthz hook runs before the snapshot, receiving the tenant-stamped context and the query:

fabriq.Open(ctx, reg, cfg, fabriq.WithLiveAuthz(func(ctx context.Context, q livequery.LiveQuery) error {
	// allow/deny; later phases may also inject mandatory row-visibility predicates into q.Where
	return nil
}))

Tenancy is always structural — the tenant comes from the authenticated context and is never client-supplied, and snapshots/refills run under the RLS-scoped app role.

Performance

The package ships microbenchmarks for the hot path (go test ./core/livequery/ -bench=. -benchmem). Representative numbers (Apple M-series, illustrative — run them on your own hardware):

BenchmarkWhat it measuresCost
PredicateEvalone filter match against a row~80 ns, 0 allocs
WindowApplyUpdate/N=10per-event maintenance, 10-row window~120 ns
WindowApplyUpdate/N=1000per-event maintenance, 1000-row window~940 ns
WindowApplyChurnfull enter/evict ↔ leave/promote cycle~1.2 µs
WindowFanout/subs=1000routing one change to 1000 subscriptions~140 µs
WindowFanout/subs=10000routing one change to 10000 subscriptions~1.5 ms

Two things to read from these. Per-subscription maintenance is cheap and grows with the window size (the membership lookup), not the result-set size — deep windows stay affordable. Fan-out, however, is linear in subscriber count because P1 evaluates every subscription per event; that linear curve is exactly what the roadmap's content-based predicate index removes.

Roadmap

P1 is the single-node Maintained engine described above. Planned phases:

  • Streamed mode — a ModeStreamed policy that forwards matched change events and lets the client order them, with shared per-query state; the variant that scales to enormous/unbounded result sets and very high subscriber counts.
  • Predicate index — content-based candidate selection so a change is matched against only the subscriptions it could affect, replacing the linear fan-out above.
  • Templated sharing — identical saved-view queries share one compiled matcher and buffer.
  • Sharding & gateway tier — data-partitioned matcher shards fed by partitioned Redis streams, a durable subscription registry, and failover via re-snapshot.
  • Virtualized viewport — a Reanchor control frame that slides a maintained window for deep/infinite scroll at O(window) server cost.
  • Reconcile backstop — a low-cadence loop that repairs any drift against Postgres truth.

Where to go next

On this page