Live Queries
Maintained result sets — filter + sort + limit subscriptions that emit exact enter/leave/move/update deltas, with Postgres as the ordering oracle.
A subscription (previous page) tells you that some row in a declared scope changed. A live query goes further: you hand fabriq a filter, a sort, and a window size, and it maintains the result set for you, emitting enter / leave / move / update deltas as the world changes — RethinkDB-changefeed / Convex / Meteor / Firestore live-query semantics. The visible window is always the true first-N of the total order: exact top-N at all times.
Live queries reuse the structured filter from the relational plane (query.Where) and keep Postgres authoritative for ordering and the window boundary, so the in-engine fast path never has to reproduce SQL collation semantics to stay correct.
Live queries are additive. Scope subscriptions (f.Subscribe) remain the right tool for "tell me when anything in this scope changes"; reach for a live query when the client wants a maintained, ordered, filtered window it can bind directly to a list UI.
The request
type LiveQuery struct {
Entity string // registry entity name
Where query.Where // the same filter AST as relational List (Eq/In/Gt/Like/Or/IsNull…)
Sort []SortKey // ordered; fabriq appends id ASC as the final unique tiebreak
Limit int // window size N
Cursor *Cursor // keyset anchor (sort-key tuple); nil = from the head
Mode Mode // ModeMaintained (default)
}
type SortKey struct { Column string; Desc bool }The filter columns are validated against the entity (the injection guard, via query.ValidateConds), and the sort columns against the entity's declared sortable set (see LiveSpec). Fabriq always appends id ASC as the final tiebreak, so (Sort…, id) is a total order — required for stable keyset pagination. There is deliberately no offset: offset pagination is unstable under churn, so live windows use keyset cursors only.
Two phases over one stream
LiveQuery returns an initial snapshot plus a channel of live deltas:
func (f *Fabriq) LiveQuery(ctx context.Context, q livequery.LiveQuery) (
livequery.Snapshot, <-chan livequery.LiveDelta, func(), error)snap, deltas, cancel, err := f.LiveQuery(tenantCtx, livequery.LiveQuery{
Entity: "asset",
Where: query.Where{query.Eq("kind", "pump"), query.Eq("site_id", siteID)},
Sort: []livequery.SortKey{{Column: "name"}},
Limit: 50,
})
if err != nil { /* unknown entity, no LiveSpec, bad column, denied authz, not configured */ }
defer cancel()
render(snap.Rows) // the initial ordered window, straight from Postgres
for d := range deltas { // then splice each delta into the rendered list
apply(d)
}The snapshot is read through the relational path (RLS-enforced) and carries the first N rows in total order. The handoff is gapless: the engine attaches to the live change feed before taking the snapshot and discards any change whose version the snapshot already reflects, so no row is missed or double-applied across the seam.
The delta
type LiveDelta struct {
Op DeltaOp // OpEnter | OpLeave | OpMove | OpUpdate | OpReset
AggID string
Version int64 // idempotency key with AggID; stale versions are discarded
Row json.RawMessage // column-keyed payload (enter/update/move)
OldIndex int // window position before (leave/move/update); -1 if N/A
NewIndex int // window position after (enter/move/update); -1 on leave
Cursor Cursor // stable sort-key tuple — for array splicing AND resume
StreamID string // → SSE id:, drives Last-Event-ID resume
At time.Time
}Each delta carries both a numeric window index (for array-splicing list UIs) and the stable sort-key Cursor (for resume and correctness):
| Op | Meaning | Client action |
|---|---|---|
enter | row entered the visible window | insert at NewIndex (may push the last row out) |
leave | row left the visible window | remove at OldIndex |
move | row stayed visible, position changed | move OldIndex → NewIndex |
update | row stayed at its position, payload changed | replace at NewIndex |
reset | discard the window and re-snapshot | refetch (emitted on failover / overflow) |
Exact top-N: the cushion + Postgres oracle
The maintenance contract is exact top-N at all times. The engine holds an ordered prefix of the result — the visible window [0, N) plus a cushion [N, N+C) — maintained as a true prefix of the Postgres-ordered result:
Invariant: the buffered rows are exactly the first
len(rows)of the Postgres-ordered result from the anchor. Postgres owns ordering; the in-memory fast path only splices.
On each change the engine evaluates the filter in Go (fast path) and decides where the row sorts relative to the buffer. When a row leaves the visible window, the first cushion row is promoted to take its slot; when the cushion runs low, a single bounded keyset refill (WHERE (sort…, id) > cursor ORDER BY … LIMIT k) tops it back up from Postgres. Because Postgres — not hand-rolled Go comparison — fills and orders the buffer, text collation and type-coercion subtleties can never corrupt the window. This is the hybrid design: an in-engine incremental matcher on the hot path, Postgres authoritative at exactly the two moments correctness is hard (snapshot and boundary).
Opting in: LiveSpec
An entity opts into live queries by declaring a LiveSpec (nil = disabled), mirroring how SearchSpec opts into the search plane:
r.MustRegister(registry.EntitySpec{
Name: "asset",
Model: (*domain.Asset)(nil),
Live: ®istry.LiveSpec{
Filterable: []string{"name", "kind", "site_id"}, // columns allowed in Where (empty = all)
Sortable: []string{"name", "kind"}, // columns allowed in Sort (empty = all)
MaxWindow: 500, // cap on Limit
},
})Columns are validated against the model at registration — an unknown filterable/sortable column fails fast.
The SSE bridge
Because the query body (filter + sort + limit) does not fit a query string, the subscribe is a POST that upgrades to a Server-Sent Events stream. The example service exposes it at POST /api/v1/live: it writes the snapshot as a snapshot event, then each delta as an event named for its op (enter/leave/move/update) with id = StreamID for Last-Event-ID resume, over the same proxy-safe SSEWriter the scope-subscription plane uses.
snap, deltas, cancel, err := s.fabric.LiveQuery(tctx, q)
// ...
_ = sse.WriteEvent("", "snapshot", snap)
for d := range deltas {
_ = sse.WriteEvent(d.StreamID, d.Op.String(), d)
}Authorization
A WithLiveAuthz hook runs before the snapshot, receiving the tenant-stamped context and the query:
fabriq.Open(ctx, reg, cfg, fabriq.WithLiveAuthz(func(ctx context.Context, q livequery.LiveQuery) error {
// allow/deny; later phases may also inject mandatory row-visibility predicates into q.Where
return nil
}))Tenancy is always structural — the tenant comes from the authenticated context and is never client-supplied, and snapshots/refills run under the RLS-scoped app role.
Performance
The package ships microbenchmarks for the hot path (go test ./core/livequery/ -bench=. -benchmem). Representative numbers (Apple M-series, illustrative — run them on your own hardware):
| Benchmark | What it measures | Cost |
|---|---|---|
PredicateEval | one filter match against a row | ~80 ns, 0 allocs |
WindowApplyUpdate/N=10 | per-event maintenance, 10-row window | ~120 ns |
WindowApplyUpdate/N=1000 | per-event maintenance, 1000-row window | ~940 ns |
WindowApplyChurn | full enter/evict ↔ leave/promote cycle | ~1.2 µs |
WindowFanout/subs=1000 | routing one change to 1000 subscriptions | ~140 µs |
WindowFanout/subs=10000 | routing one change to 10000 subscriptions | ~1.5 ms |
Two things to read from these. Per-subscription maintenance is cheap and grows with the window size (the membership lookup), not the result-set size — deep windows stay affordable. Fan-out, however, is linear in subscriber count because P1 evaluates every subscription per event; that linear curve is exactly what the roadmap's content-based predicate index removes.
Roadmap
P1 is the single-node Maintained engine described above. Planned phases:
- Streamed mode — a
ModeStreamedpolicy that forwards matched change events and lets the client order them, with shared per-query state; the variant that scales to enormous/unbounded result sets and very high subscriber counts. - Predicate index — content-based candidate selection so a change is matched against only the subscriptions it could affect, replacing the linear fan-out above.
- Templated sharing — identical saved-view queries share one compiled matcher and buffer.
- Sharding & gateway tier — data-partitioned matcher shards fed by partitioned Redis streams, a durable subscription registry, and failover via re-snapshot.
- Virtualized viewport — a
Reanchorcontrol frame that slides a maintained window for deep/infinite scroll atO(window)server cost. - Reconcile backstop — a low-cadence loop that repairs any drift against Postgres truth.
Where to go next
Subscriptions
The delta plane — server-resolved channels, conflated live delivery, Last-Event-ID resume, and the SSE bridge.
Relational
The relational read port over Postgres — point reads, batched hydration, structured filters, load-one-by-condition, and a raw-SQL escape hatch — plus the typed Repo[T] layer. Every method is tenant-scoped.