Fabriq

Quickstart

Build a registry, open the facade, run create and update commands under a tenant context, read back, batch, and subscribe to deltas.

This is the shortest end-to-end path through Fabriq: register a domain, open the facade against Postgres and Redis, then exercise the write path (Exec, ExecBatch), the relational read path (Relational().Get), and the live delta stream (Subscribe). Every call is tenant-scoped — the tenant lives on the context.Context, and the fabric refuses unstamped contexts.

Build the registry and open the facade

package main

import (
	"context"
	"errors"
	"log"

	"github.com/xraph/fabriq"
	"github.com/xraph/fabriq/core/registry"
	"github.com/xraph/fabriq/domain"
)

func main() {
	reg := registry.New()
	if err := domain.RegisterAll(reg); err != nil {
		log.Fatalf("register domain: %v", err)
	}

	f, _, err := fabriq.Open(context.Background(), reg, fabriq.Config{
		Postgres: fabriq.PostgresConfig{DSN: "postgres://user:pass@localhost:5432/fabriq?sslmode=disable"},
		Redis:    fabriq.RedisConfig{Addr: "localhost:6379"},
	})
	if err != nil {
		log.Fatalf("open fabriq: %v", err)
	}
	defer f.Close()

	// ... commands and queries below run against f
}

domain.RegisterAll registers the example domain pack. You can register your own registry.EntitySpec set instead — the rest of this page assumes the site and asset entities from that pack.

Stamp the tenant context

Every fabric entry point calls tenant.Require and fails with ErrNoTenant on an unstamped context. Stamp the tenant with tenant.WithTenant, which validates the id before it ends up in any derived name (graph names, index names, stream keys).

import "github.com/xraph/fabriq/core/tenant"

ctx, err := tenant.WithTenant(context.Background(), "acme")
if err != nil {
	log.Fatalf("invalid tenant id: %v", err)
}

The tenant id MUST come from validated auth claims — a verified JWT, a session, an mTLS identity — never from user input or a forwarded header. WithTenant validates the id's shape (^[a-zA-Z0-9_-]{1,64}$), not its authenticity. Stamping a tenant from a client-controlled value lets a caller read and write another tenant's data. Only auth middleware should call WithTenant.

Create an aggregate

A write is one command.Command. OpCreate mints a ULID when AggID is empty and starts the aggregate at version 1. The executor stamps id, tenant_id, and version structurally — values you set on the payload for those columns are ignored.

import (
	"github.com/xraph/fabriq/core/command"
	"github.com/xraph/fabriq/domain"
)

res, err := f.Exec(ctx, command.Command{
	Entity:  "site",
	Op:      command.OpCreate,
	Payload: &domain.Site{Name: "North Plant", Code: "NP-1", Region: "eu-west"},
})
if err != nil {
	log.Fatalf("create site: %v", err)
}
// res is command.Result{AggID, Version, EventID}:
// res.AggID    — the minted (or supplied) aggregate id
// res.Version  — 1 for a create
// res.EventID  — the outbox event appended in the same transaction

Update with optimistic concurrency

Set ExpectedVersion to gate the write on the stored version. If another writer advanced it first, the command fails with ErrVersionConflict instead of clobbering. OpUpdate requires AggID and bumps the version.

expected := res.Version // the version we just read/created

upd, err := f.Exec(ctx, command.Command{
	Entity:          "site",
	Op:              command.OpUpdate,
	AggID:           res.AggID,
	Payload:         &domain.Site{Name: "North Plant A", Code: "NP-1", Region: "eu-west"},
	ExpectedVersion: &expected,
})
if err != nil {
	if errors.Is(err, fabriq.ErrVersionConflict) {
		// reload, re-derive, retry
	}
	log.Fatalf("update site: %v", err)
}
// upd.Version == 2

Read it back

Relational().Get loads one row by id into a model pointer. It is structurally tenant-scoped, so the same ctx carries the tenant.

var site domain.Site
if err := f.Relational().Get(ctx, "site", res.AggID, &site); err != nil {
	if errors.Is(err, fabriq.ErrNotFound) {
		// no such aggregate in this tenant
	}
	log.Fatalf("get site: %v", err)
}

Batch writes atomically

ExecBatch runs N commands in one transaction, ordered, all-or-nothing: if any command fails, none are applied. The results slice maps 1:1 to the input commands.

results, err := f.ExecBatch(ctx, []command.Command{
	{
		Entity:  "asset",
		Op:      command.OpCreate,
		Payload: &domain.Asset{Name: "Pump 12", Kind: "pump", Serial: "P-0012", SiteID: site.ID},
	},
	{
		Entity:  "asset",
		Op:      command.OpCreate,
		Payload: &domain.Asset{Name: "Valve 3", Kind: "valve", Serial: "V-0003", SiteID: site.ID},
	},
})
if err != nil {
	log.Fatalf("batch: %v", err) // nothing was written
}
for _, r := range results {
	log.Printf("created %s at v%d (event %s)", r.AggID, r.Version, r.EventID)
}

Subscribe to deltas

Subscribe resolves the scope to a channel server-side — client input never names a channel or tenant directly — and returns a conflated <-chan query.Delta. The scope below follows one aggregate by id; the Scope value ("id", "site", "tenant") must be one the entity declared. Cancel the context to unsubscribe.

import "github.com/xraph/fabriq/core/query"

stream, err := f.Subscribe(ctx, query.SubscribeScope{
	Entity: "asset",
	Scope:  "id",
	ID:     results[0].AggID,
})
if err != nil {
	log.Fatalf("subscribe: %v", err)
}

for delta := range stream {
	// delta.Version, delta.Type, delta.Payload (json.RawMessage), delta.StreamID
	log.Printf("%s %s -> v%d", delta.Aggregate, delta.AggID, delta.Version)
}

Each query.Delta carries StreamID (maps 1:1 onto an SSE id: for Last-Event-ID resume), Version, Type, and the event Payload as raw JSON — small enough to conflate, complete enough that a simple UI can patch without a refetch. Live delivery requires Redis; without it Subscribe has no transport.

Next steps

On this page