Stitch any CI into Tangled
1package main
2
3// Schema definition and migration logic for the SQLite store. Pulled out
4// of store.go so the big SQL block doesn't sit in the middle of the
5// runtime API surface.
6
7import (
8 "context"
9 "fmt"
10 "strings"
11 "time"
12)
13
14// schema is the full set of CREATE statements applied at startup. It is
15// idempotent and additive only — no `DROP`s — so future changes can be
16// layered on as additional statements without needing a separate
17// migration tool until the project actually outgrows that.
18const schema = `
19CREATE TABLE IF NOT EXISTS meta (
20 key TEXT PRIMARY KEY,
21 value TEXT NOT NULL
22);
23
24-- Records of sh.tangled.spindle.member. The owner of a spindle publishes
25-- one of these per authorized member. (did, rkey) is the natural ATProto
26-- key — did identifies the publisher's PDS, rkey identifies the record
27-- within that PDS's collection.
28CREATE TABLE IF NOT EXISTS spindle_members (
29 did TEXT NOT NULL,
30 rkey TEXT NOT NULL,
31 instance TEXT NOT NULL,
32 subject TEXT NOT NULL,
33 created_at TEXT NOT NULL,
34 PRIMARY KEY (did, rkey)
35);
36
37-- Records of sh.tangled.repo. We keep the full set so that when a
38-- pipeline trigger arrives we can look up which knot/spindle/repo_did
39-- it corresponds to without another round-trip.
40CREATE TABLE IF NOT EXISTS repos (
41 did TEXT NOT NULL,
42 rkey TEXT NOT NULL,
43 knot TEXT NOT NULL,
44 name TEXT NOT NULL,
45 spindle TEXT,
46 repo_did TEXT,
47 created_at TEXT NOT NULL,
48 PRIMARY KEY (did, rkey)
49);
50
51-- Records of sh.tangled.repo.collaborator. Used together with repos to
52-- decide whether a triggering DID is allowed to push builds to us.
53CREATE TABLE IF NOT EXISTS repo_collaborators (
54 did TEXT NOT NULL,
55 rkey TEXT NOT NULL,
56 repo TEXT,
57 repo_did TEXT,
58 subject TEXT NOT NULL,
59 created_at TEXT NOT NULL,
60 PRIMARY KEY (did, rkey)
61);
62
63-- Outbound event log. Each row is one record we want to fan out to
64-- connected /events websocket subscribers (typically the Tangled
65-- appview) — today only sh.tangled.pipeline.status.
66--
67-- We persist instead of pushing through an in-memory channel so that
68-- (a) a reconnecting subscriber can resume from a cursor without
69-- missing events that happened during the gap, and
70-- (b) slow subscribers can't make us drop events for fast ones — they
71-- simply lag behind in the rowid space.
72--
73-- AUTOINCREMENT (vs plain INTEGER PRIMARY KEY) guarantees rowids
74-- strictly increase and never get reused if a row is ever deleted, so
75-- treating the created column as a monotonic cursor is safe forever.
76CREATE TABLE IF NOT EXISTS events (
77 created INTEGER PRIMARY KEY AUTOINCREMENT,
78 rkey TEXT NOT NULL,
79 nsid TEXT NOT NULL,
80 event_json TEXT NOT NULL,
81 inserted_at TEXT NOT NULL
82);
83
84-- Mapping from a Buildkite build back to the Tangled pipeline that
85-- spawned it. The Buildkite webhook receiver only knows the build
86-- UUID; everything we need to publish a pipeline.status record
87-- (knot, pipeline rkey, workflow name, full pipeline ATURI) lives
88-- on this row.
89--
90-- pipeline_uri is denormalized off (knot, pipeline_rkey) so the
91-- webhook handler doesn't have to recompute the at:// string on
92-- every event — it's a constant for the lifetime of the build and
93-- the webhook is the hot path for status fan-out.
94--
95-- The (knot, pipeline_rkey, workflow) index supports the /logs
96-- handler, which only knows that tuple at request time.
97-- org is the Buildkite organisation slug the build was created
98-- against. A workflow's YAML can override the spindle's default
99-- org via tack.buildkite.org, so we persist whatever was used at
100-- Spawn time and read it back when fetching jobs/logs. The empty
101-- string means "use the provider's defaultOrg" — that's both the
102-- usual single-org case and what every row written before this
103-- column existed will scan as.
104-- created_unix_ns is the monotonic recency key. created_at is kept
105-- (RFC3339Nano text) for human-readable inspection, but it must NOT
106-- be used for ordering: text comparison of nanosecond timestamps is
107-- not reliable, which used to make /logs occasionally resolve the
108-- wrong run. created_unix_ns is the integer the latest-build lookup
109-- sorts on instead.
110CREATE TABLE IF NOT EXISTS buildkite_builds (
111 build_uuid TEXT PRIMARY KEY,
112 build_number INTEGER NOT NULL,
113 pipeline_slug TEXT NOT NULL,
114 org TEXT NOT NULL DEFAULT '',
115 knot TEXT NOT NULL,
116 pipeline_rkey TEXT NOT NULL,
117 workflow TEXT NOT NULL,
118 pipeline_uri TEXT NOT NULL,
119 created_at TEXT NOT NULL,
120 created_unix_ns INTEGER NOT NULL DEFAULT 0
121);
122CREATE INDEX IF NOT EXISTS buildkite_builds_lookup
123 ON buildkite_builds (knot, pipeline_rkey, workflow);
124
125-- Mapping from a Tangled workflow tuple to the latest Tekton
126-- PipelineRun tack created for it. Unlike Buildkite webhooks, Tekton
127-- status observation happens in-process, so the primary read path is
128-- /logs resolving (knot, pipeline_rkey, workflow) back to the concrete
129-- PipelineRun whose TaskRuns and pods hold output.
130CREATE TABLE IF NOT EXISTS tekton_runs (
131 knot TEXT NOT NULL,
132 pipeline_rkey TEXT NOT NULL,
133 workflow TEXT NOT NULL,
134 namespace TEXT NOT NULL,
135 pipeline_run_name TEXT NOT NULL,
136 pipeline_run_uid TEXT NOT NULL,
137 pipeline_name TEXT NOT NULL,
138 pipeline_uri TEXT NOT NULL,
139 created_at TEXT NOT NULL,
140 created_unix_ns INTEGER NOT NULL,
141 PRIMARY KEY (knot, pipeline_rkey, workflow)
142);
143CREATE INDEX IF NOT EXISTS tekton_runs_uid
144 ON tekton_runs (pipeline_run_uid);
145`
146
147// migrate applies the schema. Safe to call repeatedly.
148//
149// CREATE TABLE IF NOT EXISTS is enough for fresh databases, but it
150// won't widen an already-existing table. Columns added after the
151// initial release therefore need a parallel ALTER TABLE step here;
152// SQLite has no `ADD COLUMN IF NOT EXISTS`, so we run the ALTER and
153// swallow the "duplicate column" error that fires on subsequent
154// startups. Anything else is fatal.
155func (s *store) migrate(ctx context.Context) error {
156 if _, err := s.db.ExecContext(ctx, schema); err != nil {
157 return fmt.Errorf("apply schema: %w", err)
158 }
159 for _, alter := range []string{
160 // Persist the Buildkite org each build was created
161 // against so /logs can target the same org the workflow
162 // chose. Pre-existing rows scan as empty string, which
163 // the provider treats as "use defaultOrg".
164 `ALTER TABLE buildkite_builds ADD COLUMN org TEXT NOT NULL DEFAULT ''`,
165
166 // Monotonic integer ordering key for buildkite_builds.
167 // Replaces ORDER BY created_at (RFC3339Nano text), whose
168 // lexical order isn't reliable across nanosecond precision
169 // and could make /logs resolve the wrong run. Default 0
170 // covers pre-existing rows; the backfill below promotes
171 // them to their parsed timestamp so ordering stays stable
172 // across the upgrade.
173 `ALTER TABLE buildkite_builds ADD COLUMN created_unix_ns INTEGER NOT NULL DEFAULT 0`,
174 } {
175 if _, err := s.db.ExecContext(ctx, alter); err != nil {
176 if strings.Contains(err.Error(), "duplicate column name") {
177 continue
178 }
179 return fmt.Errorf("apply alter %q: %w", alter, err)
180 }
181 }
182
183 if err := s.backfillBuildkiteCreatedUnixNS(ctx); err != nil {
184 return fmt.Errorf("backfill buildkite created_unix_ns: %w", err)
185 }
186 return nil
187}
188
189// backfillBuildkiteCreatedUnixNS walks every buildkite_builds row whose
190// created_unix_ns is still the post-ALTER default (0) and sets it from
191// the RFC3339Nano text in created_at. SQLite has no native nanosecond
192// parser, so the conversion has to happen in Go.
193//
194// Rows whose created_at can't be parsed are left at 0; that keeps a
195// single corrupt row from blocking startup, and ordering between two
196// 0-keyed rows still falls through to created_at as a deterministic
197// tiebreaker in the lookup query.
198func (s *store) backfillBuildkiteCreatedUnixNS(ctx context.Context) error {
199 rows, err := s.db.QueryContext(ctx,
200 `SELECT build_uuid, created_at FROM buildkite_builds
201 WHERE created_unix_ns = 0`,
202 )
203 if err != nil {
204 return fmt.Errorf("query rows to backfill: %w", err)
205 }
206 type pending struct {
207 uuid string
208 ns int64
209 }
210 var todo []pending
211 for rows.Next() {
212 var uuid, createdAt string
213 if err := rows.Scan(&uuid, &createdAt); err != nil {
214 rows.Close()
215 return fmt.Errorf("scan row: %w", err)
216 }
217 t, perr := time.Parse(time.RFC3339Nano, createdAt)
218 if perr != nil {
219 // Skip unparseable rows rather than failing the whole
220 // migration. The lookup query still has a fallback
221 // ordering for rows that share the default 0 key.
222 continue
223 }
224 todo = append(todo, pending{uuid: uuid, ns: t.UnixNano()})
225 }
226 if err := rows.Err(); err != nil {
227 rows.Close()
228 return fmt.Errorf("iterate rows: %w", err)
229 }
230 rows.Close()
231
232 for _, p := range todo {
233 if _, err := s.db.ExecContext(ctx,
234 `UPDATE buildkite_builds SET created_unix_ns = ?
235 WHERE build_uuid = ?`,
236 p.ns, p.uuid,
237 ); err != nil {
238 return fmt.Errorf("update row %q: %w", p.uuid, err)
239 }
240 }
241 return nil
242}