Fabriq

Timeseries

The telemetry port over TimescaleDB — bulk ingest that bypasses the event path, range reads, and structural tenancy on a hypertable with no RLS.

The timeseries plane is for telemetry: high-volume tag readings, sensor samples, meter values. It is implemented in adapters/postgres against a TimescaleDB hypertable (tag_readings). The port has two methods — bulk ingest and range read.

type TSQuerier interface {
	BulkWrite(ctx context.Context, series string, points []Point) error
	Range(ctx context.Context, q RangeQuery, into any) error
}

Reach it through f.Timeseries().

Why telemetry bypasses the event path

Aggregates are written through the command plane, which appends one versioned outbox event per write. Telemetry cannot afford that: per-row events for industrial tag readings at volume would melt the outbox. So BulkWrite is an event-bypass ingest path — it writes points directly with one multi-row INSERT per call and emits no outbox events. The relay publishes conflated deltas for telemetry instead, so live subscribers still see movement without the outbox carrying every sample.

Point

type Point struct {
	Key     string    // series key within the tenant, e.g. tag id
	At      time.Time
	Value   float64
	Quality int
}

BulkWrite

BulkWrite ingests a batch of points into a series in one statement. The series is the hypertable name; it is validated (no quotes, semicolons, or spaces) before use. An empty points slice is a no-op.

points := []query.Point{
	{Key: "tag-101", At: t0, Value: 72.4, Quality: 192},
	{Key: "tag-101", At: t0.Add(time.Second), Value: 72.6, Quality: 192},
	{Key: "tag-102", At: t0, Value: 0.81, Quality: 192},
}
if err := f.Timeseries().BulkWrite(ctx, "tag_readings", points); err != nil {
	return err
}

Range

Range reads a time window of one series key over [From, To). It scans into *[]query.Point, ordered by time ascending.

type RangeQuery struct {
	Series string
	Key    string
	From   time.Time
	To     time.Time
	Bucket time.Duration // 0 = raw points
	Agg    string        // "avg", "min", "max", "last" (when Bucket > 0)
}
var readings []query.Point
err := f.Timeseries().Range(ctx, query.RangeQuery{
	Series: "tag_readings",
	Key:    "tag-101",
	From:   t0,
	To:     t0.Add(time.Hour),
}, &readings)

Bucket and Agg declare time-bucketed aggregation: with Bucket > 0, points fold into fixed-width buckets reduced by Agg ("avg", "min", "max", or "last"); Bucket: 0 returns raw points.

In the current adapter Range serves raw points only — a RangeQuery with Bucket > 0 returns an explicit "not implemented yet" error. Bucketed aggregation (Timescale time_bucket) lands with the projection phase. Read raw points and bucket client-side until then.

Tenancy without RLS

The telemetry hypertable has no RLS: TimescaleDB's columnstore refuses tables with row security, and compression is the whole reason Timescale is in the stack — industrial readings at volume are unaffordable uncompressed (see ADR 0006). Tenancy is therefore enforced structurally instead:

  1. The hypertable is reachable only through TSQuerier (BulkWrite / Range), which stamps tenant_id into every statement structurally and validates the series name.
  2. It is not a registry entity, so the generic relational port cannot name it.
  3. The raw-SQL escape hatch is guarded: SQL referencing this unprotected table without a literal tenant_id reference is rejected with ErrTenantHookTripped (counted, alertable).

Cross-tenant isolation is integration-tested. The table keeps compression / columnstore; revisiting RLS is one CREATE POLICY if Timescale ever lifts the restriction. See Tenancy and ADR 0006.

On this page