Timeseries
The telemetry port over TimescaleDB — bulk ingest that bypasses the event path, range reads, and structural tenancy on a hypertable with no RLS.
The timeseries plane is for telemetry: high-volume tag readings, sensor samples, meter values. It is implemented in adapters/postgres against a TimescaleDB hypertable (tag_readings). The port has two methods — bulk ingest and range read.
type TSQuerier interface {
BulkWrite(ctx context.Context, series string, points []Point) error
Range(ctx context.Context, q RangeQuery, into any) error
}Reach it through f.Timeseries().
Why telemetry bypasses the event path
Aggregates are written through the command plane, which appends one versioned outbox event per write. Telemetry cannot afford that: per-row events for industrial tag readings at volume would melt the outbox. So BulkWrite is an event-bypass ingest path — it writes points directly with one multi-row INSERT per call and emits no outbox events. The relay publishes conflated deltas for telemetry instead, so live subscribers still see movement without the outbox carrying every sample.
Point
type Point struct {
Key string // series key within the tenant, e.g. tag id
At time.Time
Value float64
Quality int
}BulkWrite
BulkWrite ingests a batch of points into a series in one statement. The series is the hypertable name; it is validated (no quotes, semicolons, or spaces) before use. An empty points slice is a no-op.
points := []query.Point{
{Key: "tag-101", At: t0, Value: 72.4, Quality: 192},
{Key: "tag-101", At: t0.Add(time.Second), Value: 72.6, Quality: 192},
{Key: "tag-102", At: t0, Value: 0.81, Quality: 192},
}
if err := f.Timeseries().BulkWrite(ctx, "tag_readings", points); err != nil {
return err
}Range
Range reads a time window of one series key over [From, To). It scans into *[]query.Point, ordered by time ascending.
type RangeQuery struct {
Series string
Key string
From time.Time
To time.Time
Bucket time.Duration // 0 = raw points
Agg string // "avg", "min", "max", "last" (when Bucket > 0)
}var readings []query.Point
err := f.Timeseries().Range(ctx, query.RangeQuery{
Series: "tag_readings",
Key: "tag-101",
From: t0,
To: t0.Add(time.Hour),
}, &readings)Bucket and Agg declare time-bucketed aggregation: with Bucket > 0, points fold into fixed-width buckets reduced by Agg ("avg", "min", "max", or "last"); Bucket: 0 returns raw points.
In the current adapter Range serves raw points only — a RangeQuery with Bucket > 0 returns an explicit "not implemented yet" error. Bucketed aggregation (Timescale time_bucket) lands with the projection phase. Read raw points and bucket client-side until then.
Tenancy without RLS
The telemetry hypertable has no RLS: TimescaleDB's columnstore refuses tables with row security, and compression is the whole reason Timescale is in the stack — industrial readings at volume are unaffordable uncompressed (see ADR 0006). Tenancy is therefore enforced structurally instead:
- The hypertable is reachable only through
TSQuerier(BulkWrite/Range), which stampstenant_idinto every statement structurally and validates the series name. - It is not a registry entity, so the generic relational port cannot name it.
- The raw-SQL escape hatch is guarded: SQL referencing this unprotected table without a literal
tenant_idreference is rejected withErrTenantHookTripped(counted, alertable).
Cross-tenant isolation is integration-tested. The table keeps compression / columnstore; revisiting RLS is one CREATE POLICY if Timescale ever lifts the restriction. See Tenancy and ADR 0006.