Subscriptions
The delta plane — server-resolved channels, conflated live delivery, Last-Event-ID resume, and the SSE bridge.
The delta plane turns committed events into live change notifications. A client requests a scope; the channel is always resolved server-side from the validated scope plus the context tenant, so clients never name a channel or a tenant. Deltas are conflated within a short window, delivered over a buffered Go channel, and resumable across reconnects via Last-Event-ID. This page covers the subscribe request, the delta shape, conflation, catch-up, and the SSE bridge.
Scope subscriptions answer "tell me when anything in this scope changed." When a client instead wants a maintained, ordered, filtered result set — a window it can bind to a list UI, with enter/leave/move/update deltas — reach for a live query, which builds on this plane.
Subscribe scope, never a channel
A subscription request names an entity, a declared scope, and an id:
type SubscribeScope struct {
Entity string `json:"entity"` // registry entity name
Scope string `json:"scope"` // a scope declared in the entity's spec: "id", "site", "tenant", ...
ID string `json:"id,omitempty"` // scope id; ignored for the tenant scope
}The channel is resolved from this plus the authenticated context tenant — client input never names a channel or a tenant directly. Channels always have the shape changes:{tenant}:{scope}:{id}. For a tenant-scoped subscription the id is forced to the context tenant; any client-supplied id is ignored by construction. The scope must be one the entity actually declares in its registry spec, or resolution fails.
deltas, err := f.Subscribe(tenantCtx, query.SubscribeScope{
Entity: "asset", Scope: "site", ID: siteID,
})
if err != nil {
// unknown entity, undeclared scope, missing id, denied authz, or no tenant
}
for d := range deltas {
// apply d to local state, or refetch on demand
}The delta
type Delta struct {
StreamID string `json:"stream_id"` // transport position; maps 1:1 to the SSE "id:" field
Channel string `json:"channel"`
TenantID string `json:"tenant_id"`
Aggregate string `json:"aggregate"`
AggID string `json:"agg_id"`
Version int64 `json:"version"`
Type string `json:"type"`
At time.Time `json:"at"`
Payload json.RawMessage `json:"payload"`
}A delta is small enough to conflate but complete enough that a simple UI can patch its state without a refetch — and a rich UI can refetch on demand. StreamID is the Redis stream entry id; it maps 1:1 onto the SSE id: field, which is what makes Last-Event-ID resume work. The Payload is the same column-keyed row shape the event envelope carries.
Subscribe: authz gate, then conflated delivery
f.Subscribe runs the request through an authz gate, resolves the channel server-side, and returns a buffered, conflated delta stream:
func (f *Fabriq) Subscribe(ctx context.Context, scope query.SubscribeScope) (<-chan query.Delta, error)Two things happen before any data flows. First, the subscribe.Gate calls the authorization hook (AuthzFunc, installed with fabriq.WithAuthz; the default is AllowAll) with the validated scope and the already-tenant-stamped context — returning an error denies the subscription. Only then does the gate resolve the channel. The returned channel is <-chan query.Delta; closing the context detaches the subscriber and closes the channel.
The conflation window
Live deltas are coalesced per aggregate within a flush window (the subscribe.Hub's last-write-wins conflation). Within the window, repeated changes to the same (aggregate, aggID) collapse to the latest version; on flush the survivors are delivered in stream order. The window defaults to 150ms (spec range 100–250ms) and is tuned with fabriq.WithConflationWindow. This bounds how fast a hot aggregate can spam a subscriber without losing the most recent state.
Subscriber channels are buffered (default 64, fabriq.WithSubscribeBuffer). The delivery policy is deliberate: when a conflated subscriber's buffer is full, the delta is dropped for that subscriber. That is safe because of the fetch-then-subscribe contract — a client that falls behind refetches and resumes from its Last-Event-ID rather than relying on every frame arriving.
Catch-up on reconnect
When a client reconnects, it replays what it missed with CatchUp, passing its last SSE id as afterID:
func (f *Fabriq) CatchUp(ctx context.Context, scope query.SubscribeScope, afterID string, limit int) ([]query.Delta, error)CatchUp goes through the same authz gate as Subscribe, resolves the same channel, and reads the deltas after afterID from the channel's Redis stream.
The channels are MAXLEN-bounded (fabriq.WithStreamMaxLen, default 500), which makes catch-up depth finite. The contract for the caller is explicit: a full page means "refetch instead." If CatchUp returns limit deltas, the client may have missed older ones that aged out of the bounded stream, so it should refetch the current state rather than trust the partial replay. An empty slice means the client is current. Because a live Subscribe and a CatchUp can overlap, consumers dedupe by StreamID.
missed, err := f.CatchUp(tenantCtx, scope, lastEventID, 500)
if err != nil { /* ... */ }
if len(missed) == 500 {
// possible gap: refetch current state, then resume the live stream
} else {
for _, d := range missed { apply(d) }
}The SSE bridge
The subscription plane ships a stdlib-only, proxy-safe Server-Sent Events bridge (subscribe.SSEWriter). It requires an http.Flusher and refuses a ResponseWriter that cannot flush — a buffering proxy would otherwise hold events indefinitely. On setup it sets Content-Type: text/event-stream, Cache-Control: no-cache, Connection: keep-alive, and crucially X-Accel-Buffering: no to stop nginx-class proxies from buffering the stream, then flushes after every event.
Each delta is written as an SSE event with id = StreamID, event = the delta type, and data = the JSON delta, so a reconnecting client's Last-Event-ID header (read with subscribe.LastEventID) feeds straight back into CatchUp. A Heartbeat writes an SSE comment (: ping) to keep intermediaries from idling the connection out.
sse, err := subscribe.NewSSEWriter(w) // fails if w is not an http.Flusher
if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError); return }
last := subscribe.LastEventID(r)
if last != "" {
missed, _ := f.CatchUp(ctx, scope, last, 500)
for _, d := range missed { _ = sse.WriteDelta(d) }
}
deltas, _ := f.Subscribe(ctx, scope)
for d := range deltas { _ = sse.WriteDelta(d) }The full fetch-then-subscribe pattern, heartbeat cadence, and proxy configuration are covered in the runbooks.
Where to go next
Projections
How the graph and search planes are derived from the event stream as engine-neutral mutations, kept idempotent by version gating, and always rebuildable from Postgres.
Live Queries
Maintained result sets — filter + sort + limit subscriptions that emit exact enter/leave/move/update deltas, with Postgres as the ordering oracle.