Quickstart
Build a registry, open the facade, run create and update commands under a tenant context, read back, batch, and subscribe to deltas.
This is the shortest end-to-end path through Fabriq: register a domain, open the facade against Postgres and Redis, then exercise the write path (Exec, ExecBatch), the relational read path (Relational().Get), and the live delta stream (Subscribe). Every call is tenant-scoped — the tenant lives on the context.Context, and the fabric refuses unstamped contexts.
Build the registry and open the facade
package main
import (
"context"
"errors"
"log"
"github.com/xraph/fabriq"
"github.com/xraph/fabriq/core/registry"
"github.com/xraph/fabriq/domain"
)
func main() {
reg := registry.New()
if err := domain.RegisterAll(reg); err != nil {
log.Fatalf("register domain: %v", err)
}
f, _, err := fabriq.Open(context.Background(), reg, fabriq.Config{
Postgres: fabriq.PostgresConfig{DSN: "postgres://user:pass@localhost:5432/fabriq?sslmode=disable"},
Redis: fabriq.RedisConfig{Addr: "localhost:6379"},
})
if err != nil {
log.Fatalf("open fabriq: %v", err)
}
defer f.Close()
// ... commands and queries below run against f
}domain.RegisterAll registers the example domain pack. You can register your own registry.EntitySpec set instead — the rest of this page assumes the site and asset entities from that pack.
Stamp the tenant context
Every fabric entry point calls tenant.Require and fails with ErrNoTenant on an unstamped context. Stamp the tenant with tenant.WithTenant, which validates the id before it ends up in any derived name (graph names, index names, stream keys).
import "github.com/xraph/fabriq/core/tenant"
ctx, err := tenant.WithTenant(context.Background(), "acme")
if err != nil {
log.Fatalf("invalid tenant id: %v", err)
}The tenant id MUST come from validated auth claims — a verified JWT, a session, an mTLS identity — never from user input or a forwarded header. WithTenant validates the id's shape (^[a-zA-Z0-9_-]{1,64}$), not its authenticity. Stamping a tenant from a client-controlled value lets a caller read and write another tenant's data. Only auth middleware should call WithTenant.
Create an aggregate
A write is one command.Command. OpCreate mints a ULID when AggID is empty and starts the aggregate at version 1. The executor stamps id, tenant_id, and version structurally — values you set on the payload for those columns are ignored.
import (
"github.com/xraph/fabriq/core/command"
"github.com/xraph/fabriq/domain"
)
res, err := f.Exec(ctx, command.Command{
Entity: "site",
Op: command.OpCreate,
Payload: &domain.Site{Name: "North Plant", Code: "NP-1", Region: "eu-west"},
})
if err != nil {
log.Fatalf("create site: %v", err)
}
// res is command.Result{AggID, Version, EventID}:
// res.AggID — the minted (or supplied) aggregate id
// res.Version — 1 for a create
// res.EventID — the outbox event appended in the same transactionUpdate with optimistic concurrency
Set ExpectedVersion to gate the write on the stored version. If another writer advanced it first, the command fails with ErrVersionConflict instead of clobbering. OpUpdate requires AggID and bumps the version.
expected := res.Version // the version we just read/created
upd, err := f.Exec(ctx, command.Command{
Entity: "site",
Op: command.OpUpdate,
AggID: res.AggID,
Payload: &domain.Site{Name: "North Plant A", Code: "NP-1", Region: "eu-west"},
ExpectedVersion: &expected,
})
if err != nil {
if errors.Is(err, fabriq.ErrVersionConflict) {
// reload, re-derive, retry
}
log.Fatalf("update site: %v", err)
}
// upd.Version == 2Read it back
Relational().Get loads one row by id into a model pointer. It is structurally tenant-scoped, so the same ctx carries the tenant.
var site domain.Site
if err := f.Relational().Get(ctx, "site", res.AggID, &site); err != nil {
if errors.Is(err, fabriq.ErrNotFound) {
// no such aggregate in this tenant
}
log.Fatalf("get site: %v", err)
}Batch writes atomically
ExecBatch runs N commands in one transaction, ordered, all-or-nothing: if any command fails, none are applied. The results slice maps 1:1 to the input commands.
results, err := f.ExecBatch(ctx, []command.Command{
{
Entity: "asset",
Op: command.OpCreate,
Payload: &domain.Asset{Name: "Pump 12", Kind: "pump", Serial: "P-0012", SiteID: site.ID},
},
{
Entity: "asset",
Op: command.OpCreate,
Payload: &domain.Asset{Name: "Valve 3", Kind: "valve", Serial: "V-0003", SiteID: site.ID},
},
})
if err != nil {
log.Fatalf("batch: %v", err) // nothing was written
}
for _, r := range results {
log.Printf("created %s at v%d (event %s)", r.AggID, r.Version, r.EventID)
}Subscribe to deltas
Subscribe resolves the scope to a channel server-side — client input never names a channel or tenant directly — and returns a conflated <-chan query.Delta. The scope below follows one aggregate by id; the Scope value ("id", "site", "tenant") must be one the entity declared. Cancel the context to unsubscribe.
import "github.com/xraph/fabriq/core/query"
stream, err := f.Subscribe(ctx, query.SubscribeScope{
Entity: "asset",
Scope: "id",
ID: results[0].AggID,
})
if err != nil {
log.Fatalf("subscribe: %v", err)
}
for delta := range stream {
// delta.Version, delta.Type, delta.Payload (json.RawMessage), delta.StreamID
log.Printf("%s %s -> v%d", delta.Aggregate, delta.AggID, delta.Version)
}Each query.Delta carries StreamID (maps 1:1 onto an SSE id: for Last-Event-ID resume), Version, Type, and the event Payload as raw JSON — small enough to conflate, complete enough that a simple UI can patch without a refetch. Live delivery requires Redis; without it Subscribe has no transport.