Stitch any CI into Tangled
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 242 lines 9.2 kB view raw
1package main 2 3// Schema definition and migration logic for the SQLite store. Pulled out 4// of store.go so the big SQL block doesn't sit in the middle of the 5// runtime API surface. 6 7import ( 8 "context" 9 "fmt" 10 "strings" 11 "time" 12) 13 14// schema is the full set of CREATE statements applied at startup. It is 15// idempotent and additive only — no `DROP`s — so future changes can be 16// layered on as additional statements without needing a separate 17// migration tool until the project actually outgrows that. 18const schema = ` 19CREATE TABLE IF NOT EXISTS meta ( 20 key TEXT PRIMARY KEY, 21 value TEXT NOT NULL 22); 23 24-- Records of sh.tangled.spindle.member. The owner of a spindle publishes 25-- one of these per authorized member. (did, rkey) is the natural ATProto 26-- key — did identifies the publisher's PDS, rkey identifies the record 27-- within that PDS's collection. 28CREATE TABLE IF NOT EXISTS spindle_members ( 29 did TEXT NOT NULL, 30 rkey TEXT NOT NULL, 31 instance TEXT NOT NULL, 32 subject TEXT NOT NULL, 33 created_at TEXT NOT NULL, 34 PRIMARY KEY (did, rkey) 35); 36 37-- Records of sh.tangled.repo. We keep the full set so that when a 38-- pipeline trigger arrives we can look up which knot/spindle/repo_did 39-- it corresponds to without another round-trip. 40CREATE TABLE IF NOT EXISTS repos ( 41 did TEXT NOT NULL, 42 rkey TEXT NOT NULL, 43 knot TEXT NOT NULL, 44 name TEXT NOT NULL, 45 spindle TEXT, 46 repo_did TEXT, 47 created_at TEXT NOT NULL, 48 PRIMARY KEY (did, rkey) 49); 50 51-- Records of sh.tangled.repo.collaborator. Used together with repos to 52-- decide whether a triggering DID is allowed to push builds to us. 53CREATE TABLE IF NOT EXISTS repo_collaborators ( 54 did TEXT NOT NULL, 55 rkey TEXT NOT NULL, 56 repo TEXT, 57 repo_did TEXT, 58 subject TEXT NOT NULL, 59 created_at TEXT NOT NULL, 60 PRIMARY KEY (did, rkey) 61); 62 63-- Outbound event log. Each row is one record we want to fan out to 64-- connected /events websocket subscribers (typically the Tangled 65-- appview) — today only sh.tangled.pipeline.status. 66-- 67-- We persist instead of pushing through an in-memory channel so that 68-- (a) a reconnecting subscriber can resume from a cursor without 69-- missing events that happened during the gap, and 70-- (b) slow subscribers can't make us drop events for fast ones — they 71-- simply lag behind in the rowid space. 72-- 73-- AUTOINCREMENT (vs plain INTEGER PRIMARY KEY) guarantees rowids 74-- strictly increase and never get reused if a row is ever deleted, so 75-- treating the created column as a monotonic cursor is safe forever. 76CREATE TABLE IF NOT EXISTS events ( 77 created INTEGER PRIMARY KEY AUTOINCREMENT, 78 rkey TEXT NOT NULL, 79 nsid TEXT NOT NULL, 80 event_json TEXT NOT NULL, 81 inserted_at TEXT NOT NULL 82); 83 84-- Mapping from a Buildkite build back to the Tangled pipeline that 85-- spawned it. The Buildkite webhook receiver only knows the build 86-- UUID; everything we need to publish a pipeline.status record 87-- (knot, pipeline rkey, workflow name, full pipeline ATURI) lives 88-- on this row. 89-- 90-- pipeline_uri is denormalized off (knot, pipeline_rkey) so the 91-- webhook handler doesn't have to recompute the at:// string on 92-- every event — it's a constant for the lifetime of the build and 93-- the webhook is the hot path for status fan-out. 94-- 95-- The (knot, pipeline_rkey, workflow) index supports the /logs 96-- handler, which only knows that tuple at request time. 97-- org is the Buildkite organisation slug the build was created 98-- against. A workflow's YAML can override the spindle's default 99-- org via tack.buildkite.org, so we persist whatever was used at 100-- Spawn time and read it back when fetching jobs/logs. The empty 101-- string means "use the provider's defaultOrg" — that's both the 102-- usual single-org case and what every row written before this 103-- column existed will scan as. 104-- created_unix_ns is the monotonic recency key. created_at is kept 105-- (RFC3339Nano text) for human-readable inspection, but it must NOT 106-- be used for ordering: text comparison of nanosecond timestamps is 107-- not reliable, which used to make /logs occasionally resolve the 108-- wrong run. created_unix_ns is the integer the latest-build lookup 109-- sorts on instead. 110CREATE TABLE IF NOT EXISTS buildkite_builds ( 111 build_uuid TEXT PRIMARY KEY, 112 build_number INTEGER NOT NULL, 113 pipeline_slug TEXT NOT NULL, 114 org TEXT NOT NULL DEFAULT '', 115 knot TEXT NOT NULL, 116 pipeline_rkey TEXT NOT NULL, 117 workflow TEXT NOT NULL, 118 pipeline_uri TEXT NOT NULL, 119 created_at TEXT NOT NULL, 120 created_unix_ns INTEGER NOT NULL DEFAULT 0 121); 122CREATE INDEX IF NOT EXISTS buildkite_builds_lookup 123 ON buildkite_builds (knot, pipeline_rkey, workflow); 124 125-- Mapping from a Tangled workflow tuple to the latest Tekton 126-- PipelineRun tack created for it. Unlike Buildkite webhooks, Tekton 127-- status observation happens in-process, so the primary read path is 128-- /logs resolving (knot, pipeline_rkey, workflow) back to the concrete 129-- PipelineRun whose TaskRuns and pods hold output. 130CREATE TABLE IF NOT EXISTS tekton_runs ( 131 knot TEXT NOT NULL, 132 pipeline_rkey TEXT NOT NULL, 133 workflow TEXT NOT NULL, 134 namespace TEXT NOT NULL, 135 pipeline_run_name TEXT NOT NULL, 136 pipeline_run_uid TEXT NOT NULL, 137 pipeline_name TEXT NOT NULL, 138 pipeline_uri TEXT NOT NULL, 139 created_at TEXT NOT NULL, 140 created_unix_ns INTEGER NOT NULL, 141 PRIMARY KEY (knot, pipeline_rkey, workflow) 142); 143CREATE INDEX IF NOT EXISTS tekton_runs_uid 144 ON tekton_runs (pipeline_run_uid); 145` 146 147// migrate applies the schema. Safe to call repeatedly. 148// 149// CREATE TABLE IF NOT EXISTS is enough for fresh databases, but it 150// won't widen an already-existing table. Columns added after the 151// initial release therefore need a parallel ALTER TABLE step here; 152// SQLite has no `ADD COLUMN IF NOT EXISTS`, so we run the ALTER and 153// swallow the "duplicate column" error that fires on subsequent 154// startups. Anything else is fatal. 155func (s *store) migrate(ctx context.Context) error { 156 if _, err := s.db.ExecContext(ctx, schema); err != nil { 157 return fmt.Errorf("apply schema: %w", err) 158 } 159 for _, alter := range []string{ 160 // Persist the Buildkite org each build was created 161 // against so /logs can target the same org the workflow 162 // chose. Pre-existing rows scan as empty string, which 163 // the provider treats as "use defaultOrg". 164 `ALTER TABLE buildkite_builds ADD COLUMN org TEXT NOT NULL DEFAULT ''`, 165 166 // Monotonic integer ordering key for buildkite_builds. 167 // Replaces ORDER BY created_at (RFC3339Nano text), whose 168 // lexical order isn't reliable across nanosecond precision 169 // and could make /logs resolve the wrong run. Default 0 170 // covers pre-existing rows; the backfill below promotes 171 // them to their parsed timestamp so ordering stays stable 172 // across the upgrade. 173 `ALTER TABLE buildkite_builds ADD COLUMN created_unix_ns INTEGER NOT NULL DEFAULT 0`, 174 } { 175 if _, err := s.db.ExecContext(ctx, alter); err != nil { 176 if strings.Contains(err.Error(), "duplicate column name") { 177 continue 178 } 179 return fmt.Errorf("apply alter %q: %w", alter, err) 180 } 181 } 182 183 if err := s.backfillBuildkiteCreatedUnixNS(ctx); err != nil { 184 return fmt.Errorf("backfill buildkite created_unix_ns: %w", err) 185 } 186 return nil 187} 188 189// backfillBuildkiteCreatedUnixNS walks every buildkite_builds row whose 190// created_unix_ns is still the post-ALTER default (0) and sets it from 191// the RFC3339Nano text in created_at. SQLite has no native nanosecond 192// parser, so the conversion has to happen in Go. 193// 194// Rows whose created_at can't be parsed are left at 0; that keeps a 195// single corrupt row from blocking startup, and ordering between two 196// 0-keyed rows still falls through to created_at as a deterministic 197// tiebreaker in the lookup query. 198func (s *store) backfillBuildkiteCreatedUnixNS(ctx context.Context) error { 199 rows, err := s.db.QueryContext(ctx, 200 `SELECT build_uuid, created_at FROM buildkite_builds 201 WHERE created_unix_ns = 0`, 202 ) 203 if err != nil { 204 return fmt.Errorf("query rows to backfill: %w", err) 205 } 206 type pending struct { 207 uuid string 208 ns int64 209 } 210 var todo []pending 211 for rows.Next() { 212 var uuid, createdAt string 213 if err := rows.Scan(&uuid, &createdAt); err != nil { 214 rows.Close() 215 return fmt.Errorf("scan row: %w", err) 216 } 217 t, perr := time.Parse(time.RFC3339Nano, createdAt) 218 if perr != nil { 219 // Skip unparseable rows rather than failing the whole 220 // migration. The lookup query still has a fallback 221 // ordering for rows that share the default 0 key. 222 continue 223 } 224 todo = append(todo, pending{uuid: uuid, ns: t.UnixNano()}) 225 } 226 if err := rows.Err(); err != nil { 227 rows.Close() 228 return fmt.Errorf("iterate rows: %w", err) 229 } 230 rows.Close() 231 232 for _, p := range todo { 233 if _, err := s.db.ExecContext(ctx, 234 `UPDATE buildkite_builds SET created_unix_ns = ? 235 WHERE build_uuid = ?`, 236 p.ns, p.uuid, 237 ); err != nil { 238 return fmt.Errorf("update row %q: %w", p.uuid, err) 239 } 240 } 241 return nil 242}