Stitch any CI into Tangled
1package main
2
3// SQLite-backed persistence for tack. Holds:
4//
5// - the jetstream cursor, so restarts resume the firehose where the
6// previous run left off
7// - mirrored Tangled state (spindle members, repos, collaborators)
8// and the authorization helpers built on top of it, used both to
9// gate inbound pipeline triggers and to decide which knots we
10// should be subscribed to
11// - the outbound event log fed to /events websocket subscribers
12// - the Buildkite build → pipeline mapping the webhook and /logs
13// handlers look up
14//
15// Per-method docs go into more detail.
16
17import (
18 "context"
19 "database/sql"
20 "encoding/json"
21 "errors"
22 "fmt"
23 "strconv"
24 "time"
25
26 _ "github.com/mattn/go-sqlite3"
27)
28
29// store wraps the SQLite handle and exposes the small set of operations
30// the rest of tack needs. Keeping the surface narrow lets the persistence
31// layer be swapped or mocked later without rewriting callers.
32type store struct {
33 db *sql.DB
34}
35
36// openStore opens (or creates) the SQLite database at path and applies
37// the schema. It also flips on WAL + NORMAL synchronous, which is the
38// usual "this is a long-running server, not a one-shot script" config:
39// concurrent reads don't block the single writer, and we trade a little
40// crash-safety on power loss for a lot of write throughput.
41func openStore(path string) (*store, error) {
42 // mattn/go-sqlite3 reads pragma-style query parameters (_journal_mode,
43 // _synchronous, _foreign_keys) and applies them on each new connection.
44 // journal_mode=WAL persists in the database file but setting it here
45 // also covers the first-ever open.
46 dsn := fmt.Sprintf("file:%s?_journal_mode=WAL&_synchronous=NORMAL&_foreign_keys=on", path)
47 db, err := sql.Open("sqlite3", dsn)
48 if err != nil {
49 return nil, fmt.Errorf("open sqlite %q: %w", path, err)
50 }
51
52 // SQLite only supports one writer at a time. Capping max open conns
53 // at 1 keeps database/sql from spinning up extra connections that
54 // will only ever serialize behind that writer.
55 db.SetMaxOpenConns(1)
56
57 s := &store{db: db}
58 if err := s.migrate(context.Background()); err != nil {
59 _ = db.Close()
60 return nil, err
61 }
62 return s, nil
63}
64
65// Close releases the underlying database handle.
66func (s *store) Close() error {
67 return s.db.Close()
68}
69
70// metaCursorKey is the meta-table key under which we persist the
71// jetstream cursor. Pulled out as a constant to avoid drift between
72// load/save sites.
73const metaCursorKey = "jetstream_cursor"
74
75// LoadCursor returns the persisted jetstream cursor, or nil if none has
76// been saved yet (signaling "start from now"). The cursor is a unix
77// microsecond timestamp — see jetstream.Event.TimeUS.
78func (s *store) LoadCursor(ctx context.Context) (*int64, error) {
79 var raw string
80 err := s.db.QueryRowContext(ctx,
81 `SELECT value FROM meta WHERE key = ?`, metaCursorKey,
82 ).Scan(&raw)
83 if errors.Is(err, sql.ErrNoRows) {
84 return nil, nil
85 }
86 if err != nil {
87 return nil, fmt.Errorf("load cursor: %w", err)
88 }
89 v, err := strconv.ParseInt(raw, 10, 64)
90 if err != nil {
91 // A malformed cursor shouldn't wedge startup — log-and-ignore
92 // (return nil) so we just resume from "now". The caller has the
93 // logger; we surface the parse error so it can decide.
94 return nil, fmt.Errorf("parse cursor %q: %w", raw, err)
95 }
96 return &v, nil
97}
98
99// SaveCursor writes the jetstream cursor. It uses an UPSERT so the meta
100// row is created on first call and updated thereafter.
101func (s *store) SaveCursor(ctx context.Context, cursor int64) error {
102 _, err := s.db.ExecContext(ctx,
103 `INSERT INTO meta (key, value) VALUES (?, ?)
104 ON CONFLICT(key) DO UPDATE SET value = excluded.value`,
105 metaCursorKey, strconv.FormatInt(cursor, 10),
106 )
107 if err != nil {
108 return fmt.Errorf("save cursor: %w", err)
109 }
110 return nil
111}
112
113// UpsertSpindleMember records (or refreshes) a sh.tangled.spindle.member
114// observation.
115func (s *store) UpsertSpindleMember(ctx context.Context, did, rkey, instance, subject, createdAt string) error {
116 _, err := s.db.ExecContext(ctx,
117 `INSERT INTO spindle_members (did, rkey, instance, subject, created_at)
118 VALUES (?, ?, ?, ?, ?)
119 ON CONFLICT(did, rkey) DO UPDATE SET
120 instance = excluded.instance,
121 subject = excluded.subject,
122 created_at = excluded.created_at`,
123 did, rkey, instance, subject, createdAt,
124 )
125 if err != nil {
126 return fmt.Errorf("upsert spindle_member: %w", err)
127 }
128 return nil
129}
130
131// DeleteSpindleMember removes a member record by its ATProto identity.
132func (s *store) DeleteSpindleMember(ctx context.Context, did, rkey string) error {
133 _, err := s.db.ExecContext(ctx,
134 `DELETE FROM spindle_members WHERE did = ? AND rkey = ?`,
135 did, rkey,
136 )
137 if err != nil {
138 return fmt.Errorf("delete spindle_member: %w", err)
139 }
140 return nil
141}
142
143// UpsertRepo records (or refreshes) a sh.tangled.repo observation.
144// spindle and repoDid may be empty strings — we store them as such rather
145// than as SQL NULLs to keep the upsert path uniform.
146func (s *store) UpsertRepo(ctx context.Context, did, rkey, knot, name, spindle, repoDid, createdAt string) error {
147 _, err := s.db.ExecContext(ctx,
148 `INSERT INTO repos (did, rkey, knot, name, spindle, repo_did, created_at)
149 VALUES (?, ?, ?, ?, ?, ?, ?)
150 ON CONFLICT(did, rkey) DO UPDATE SET
151 knot = excluded.knot,
152 name = excluded.name,
153 spindle = excluded.spindle,
154 repo_did = excluded.repo_did,
155 created_at = excluded.created_at`,
156 did, rkey, knot, name, spindle, repoDid, createdAt,
157 )
158 if err != nil {
159 return fmt.Errorf("upsert repo: %w", err)
160 }
161 return nil
162}
163
164// DeleteRepo removes a repo record by its ATProto identity.
165func (s *store) DeleteRepo(ctx context.Context, did, rkey string) error {
166 _, err := s.db.ExecContext(ctx,
167 `DELETE FROM repos WHERE did = ? AND rkey = ?`,
168 did, rkey,
169 )
170 if err != nil {
171 return fmt.Errorf("delete repo: %w", err)
172 }
173 return nil
174}
175
176// UpsertRepoCollaborator records (or refreshes) a
177// sh.tangled.repo.collaborator observation.
178func (s *store) UpsertRepoCollaborator(ctx context.Context, did, rkey, repo, repoDid, subject, createdAt string) error {
179 _, err := s.db.ExecContext(ctx,
180 `INSERT INTO repo_collaborators (did, rkey, repo, repo_did, subject, created_at)
181 VALUES (?, ?, ?, ?, ?, ?)
182 ON CONFLICT(did, rkey) DO UPDATE SET
183 repo = excluded.repo,
184 repo_did = excluded.repo_did,
185 subject = excluded.subject,
186 created_at = excluded.created_at`,
187 did, rkey, repo, repoDid, subject, createdAt,
188 )
189 if err != nil {
190 return fmt.Errorf("upsert repo_collaborator: %w", err)
191 }
192 return nil
193}
194
195// GetRepo returns the (knot, spindle) currently stored for a (did, rkey)
196// pair. Both are returned as empty strings when no row exists; callers
197// that need to distinguish "absent" from "stored but empty" should
198// pre-check existence themselves.
199//
200// This exists so applyRepo can read the *previous* spindle/knot of a
201// record before applying a mutation, which is what makes it possible to
202// detect transitions like "this repo used to be ours, now it isn't" and
203// trigger a knot unsubscribe.
204func (s *store) GetRepo(ctx context.Context, did, rkey string) (knot, spindle string, err error) {
205 err = s.db.QueryRowContext(ctx,
206 `SELECT knot, spindle FROM repos WHERE did = ? AND rkey = ?`,
207 did, rkey,
208 ).Scan(&knot, &spindle)
209 if errors.Is(err, sql.ErrNoRows) {
210 return "", "", nil
211 }
212 if err != nil {
213 return "", "", fmt.Errorf("get repo: %w", err)
214 }
215 return knot, spindle, nil
216}
217
218// AuthorizePipelineActor reports whether a pipeline trigger that
219// arrived on knot for repo (repoOwnerDID, repoName) should be allowed
220// to spawn work on this spindle. Both halves of the answer are
221// derived from the persisted Tangled state in spindle_members and
222// repos: the network's own authorization records, mirrored here from
223// the firehose by jetstream.go.
224//
225// The check is two independent gates; both must hold:
226//
227// 1. **Repo claim**: a sh.tangled.repo record exists naming
228// hostname as its CI spindle AND knot as its host knot,
229// published by repoOwnerDID with the matching name. Without this
230// a knot we already happen to be subscribed to (because *some*
231// repo on it points at us) could otherwise smuggle in pipeline
232// triggers for unrelated repos that never opted into us.
233//
234// 2. **Actor membership**: repoOwnerDID is either the spindle owner
235// itself, or has been authorized via a sh.tangled.spindle.member
236// record published by the spindle owner. The publisher (`did`)
237// of that membership grant must equal ownerDID. Anyone can
238// publish a member record naming anyone, so trusting unsigned
239// grants would let any DID grant itself access. We do NOT match
240// against the record's `instance` column because the upstream
241// ecosystem stores it inconsistently (URL vs hostname), and a
242// tack instance only ever speaks for a single hostname anyway.
243//
244// reason is a short, log-friendly description of why authorization
245// failed when ok is false; it is empty when ok is true. Errors
246// reported here are SQL/IO failures, never policy denials; those
247// surface as ok=false with a populated reason.
248func (s *store) AuthorizePipelineActor(
249 ctx context.Context,
250 hostname, knot, ownerDID, repoOwnerDID, repoName string,
251) (ok bool, reason string, err error) {
252 // We can't authorize an unknown actor or an unidentified repo:
253 // every gate below joins on these. Bail before touching SQLite
254 // so a malformed trigger logs cleanly instead of triggering a
255 // "no rows" miss that could be confused with a real denial.
256 if repoOwnerDID == "" {
257 return false, "trigger has no repo did", nil
258 }
259 if repoName == "" {
260 return false, "trigger has no repo name", nil
261 }
262
263 // Gate 1: this specific repo opted into us on this specific knot.
264 var n int
265 if err := s.db.QueryRowContext(ctx,
266 `SELECT COUNT(*) FROM repos
267 WHERE did = ? AND name = ? AND knot = ? AND spindle = ?`,
268 repoOwnerDID, repoName, knot, hostname,
269 ).Scan(&n); err != nil {
270 return false, "", fmt.Errorf("count repo claim: %w", err)
271 }
272 if n == 0 {
273 return false, "no repo record claims this spindle on this knot", nil
274 }
275
276 // Gate 2: actor is the spindle owner, or vouched for by them.
277 if repoOwnerDID == ownerDID {
278 return true, "", nil
279 }
280 if err := s.db.QueryRowContext(ctx,
281 `SELECT COUNT(*) FROM spindle_members
282 WHERE did = ? AND subject = ?`,
283 ownerDID, repoOwnerDID,
284 ).Scan(&n); err != nil {
285 return false, "", fmt.Errorf("count membership: %w", err)
286 }
287 if n == 0 {
288 return false, "actor is not a spindle member", nil
289 }
290 return true, "", nil
291}
292
293// IsKnotWanted reports whether any *authorized* repo currently stored
294// still names the given hostname as its spindle and the given knot as
295// its host. After a repo update or delete this is the question we ask
296// to decide whether to keep watching that knot or unsubscribe from it.
297//
298// "Authorized" means the repo's publisher (the row's `did` column) is
299// either the spindle owner or has been vouched for by the spindle owner
300// via a sh.tangled.spindle.member record. Without this filter, a non-
301// member could pin us to an arbitrary attacker-chosen knot just by
302// publishing a sh.tangled.repo record naming us as its spindle. See
303// the matching gate in IsAuthorizedActor and AuthorizePipelineActor.
304func (s *store) IsKnotWanted(ctx context.Context, hostname, ownerDID, knot string) (bool, error) {
305 var n int
306 err := s.db.QueryRowContext(ctx,
307 `SELECT COUNT(*) FROM repos r
308 WHERE r.spindle = ? AND r.knot = ?
309 AND (
310 r.did = ?
311 OR EXISTS (
312 SELECT 1 FROM spindle_members m
313 WHERE m.did = ? AND m.subject = r.did
314 )
315 )`,
316 hostname, knot, ownerDID, ownerDID,
317 ).Scan(&n)
318 if err != nil {
319 return false, fmt.Errorf("count repos for knot: %w", err)
320 }
321 return n > 0, nil
322}
323
324// IsAuthorizedActor reports whether did is the spindle owner or has
325// been authorized by the spindle owner via a sh.tangled.spindle.member
326// record. The membership record's publisher (its `did` column) must
327// equal ownerDID; anyone can publish a membership record naming
328// anyone, so trusting unsigned grants would let any DID grant itself
329// access. This is the same trust rule AuthorizePipelineActor enforces;
330// it's pulled out here so knot-subscription decisions in jetstream.go
331// gate on the same check as pipeline-spawning decisions in knot.go.
332func (s *store) IsAuthorizedActor(ctx context.Context, ownerDID, did string) (bool, error) {
333 if did == "" {
334 return false, nil
335 }
336 if did == ownerDID {
337 return true, nil
338 }
339 var n int
340 err := s.db.QueryRowContext(ctx,
341 `SELECT COUNT(*) FROM spindle_members
342 WHERE did = ? AND subject = ?`,
343 ownerDID, did,
344 ).Scan(&n)
345 if err != nil {
346 return false, fmt.Errorf("count membership: %w", err)
347 }
348 return n > 0, nil
349}
350
351// GetSpindleMember returns the subject DID currently stored for a
352// (did, rkey) spindle.member row, or "" when no such row exists.
353//
354// Used by the jetstream handler to learn whose authorization a
355// delete/update of a membership record affects, so it can reconcile
356// that subject's knot subscriptions in the same step as the mutation.
357func (s *store) GetSpindleMember(ctx context.Context, did, rkey string) (string, error) {
358 var subject string
359 err := s.db.QueryRowContext(ctx,
360 `SELECT subject FROM spindle_members WHERE did = ? AND rkey = ?`,
361 did, rkey,
362 ).Scan(&subject)
363 if errors.Is(err, sql.ErrNoRows) {
364 return "", nil
365 }
366 if err != nil {
367 return "", fmt.Errorf("get spindle_member: %w", err)
368 }
369 return subject, nil
370}
371
372// KnotsForOwner returns the distinct knot hostnames of repos published
373// by `did` whose spindle field equals hostname. Used after a membership
374// change to find which knots that DID's repos want, so we can subscribe
375// (newly granted) or potentially unsubscribe (revoked) in lockstep with
376// the grant.
377//
378// Returns an empty slice (not nil) on no matches so callers can range
379// over the result without a nil check.
380func (s *store) KnotsForOwner(ctx context.Context, hostname, did string) ([]string, error) {
381 rows, err := s.db.QueryContext(ctx,
382 `SELECT DISTINCT knot FROM repos
383 WHERE did = ? AND spindle = ? AND knot <> ''`,
384 did, hostname,
385 )
386 if err != nil {
387 return nil, fmt.Errorf("query knots for owner: %w", err)
388 }
389 defer rows.Close()
390 out := []string{}
391 for rows.Next() {
392 var k string
393 if err := rows.Scan(&k); err != nil {
394 return nil, fmt.Errorf("scan knot: %w", err)
395 }
396 out = append(out, k)
397 }
398 if err := rows.Err(); err != nil {
399 return nil, fmt.Errorf("iterate knots: %w", err)
400 }
401 return out, nil
402}
403
404// KnotsForSpindle returns the distinct knot hostnames of all repos that
405// (a) have declared the given spindle hostname as their CI spindle, and
406// (b) were published by an *authorized* DID: either the spindle owner
407// itself or a member they vouched for via sh.tangled.spindle.member.
408//
409// The membership filter is critical: without it, any DID that publishes
410// a sh.tangled.repo record naming us as its spindle could force us to
411// dial an attacker-chosen knot at startup. See IsAuthorizedActor for
412// the matching trust check applied at firehose-event time.
413//
414// Returns an empty slice (not nil) when nothing matches, so callers can
415// range over the result without a nil check.
416func (s *store) KnotsForSpindle(ctx context.Context, hostname, ownerDID string) ([]string, error) {
417 rows, err := s.db.QueryContext(ctx,
418 `SELECT DISTINCT r.knot FROM repos r
419 WHERE r.spindle = ? AND r.knot <> ''
420 AND (
421 r.did = ?
422 OR EXISTS (
423 SELECT 1 FROM spindle_members m
424 WHERE m.did = ? AND m.subject = r.did
425 )
426 )`,
427 hostname, ownerDID, ownerDID,
428 )
429 if err != nil {
430 return nil, fmt.Errorf("query knots: %w", err)
431 }
432 defer rows.Close()
433
434 out := []string{}
435 for rows.Next() {
436 var k string
437 if err := rows.Scan(&k); err != nil {
438 return nil, fmt.Errorf("scan knot: %w", err)
439 }
440 out = append(out, k)
441 }
442 if err := rows.Err(); err != nil {
443 return nil, fmt.Errorf("iterate knots: %w", err)
444 }
445 return out, nil
446}
447
448// DeleteRepoCollaborator removes a collaborator record by its ATProto
449// identity.
450func (s *store) DeleteRepoCollaborator(ctx context.Context, did, rkey string) error {
451 _, err := s.db.ExecContext(ctx,
452 `DELETE FROM repo_collaborators WHERE did = ? AND rkey = ?`,
453 did, rkey,
454 )
455 if err != nil {
456 return fmt.Errorf("delete repo_collaborator: %w", err)
457 }
458 return nil
459}
460
461// EventRow is one row of the events table. It represents an outbound
462// record we want to deliver to /events websocket subscribers, in the
463// shape callers actually need (raw record JSON, not stringly-typed).
464type EventRow struct {
465 // Created is the assigned monotonic rowid; doubles as the cursor
466 // value subscribers use to resume.
467 Created int64
468 // Rkey is the ATProto record key. For sh.tangled.pipeline.status
469 // records this is the rkey we mint when publishing.
470 Rkey string
471 // Nsid is the lexicon collection (e.g. sh.tangled.pipeline.status).
472 Nsid string
473 // EventJSON is the record body verbatim — held as RawMessage so
474 // the /events handler can splice it into the wire envelope without
475 // an unmarshal/remarshal round-trip.
476 EventJSON json.RawMessage
477}
478
479// InsertEvent appends an event row and returns its assigned `created`
480// (rowid) cursor. Storage is the source of truth for fan-out, so we
481// write here even if zero subscribers are connected — a subscriber that
482// connects later (with an old cursor) will pick the row up via
483// EventsAfter.
484//
485// eventJSON must be a valid JSON object; we store it verbatim. Length
486// validation is intentionally absent — the schema accepts arbitrary
487// TEXT and SQLite handles huge blobs fine for our scale.
488func (s *store) InsertEvent(ctx context.Context, rkey, nsid string, eventJSON []byte) (int64, error) {
489 res, err := s.db.ExecContext(ctx,
490 `INSERT INTO events (rkey, nsid, event_json, inserted_at)
491 VALUES (?, ?, ?, ?)`,
492 rkey, nsid, string(eventJSON),
493 time.Now().UTC().Format(time.RFC3339Nano),
494 )
495 if err != nil {
496 return 0, fmt.Errorf("insert event: %w", err)
497 }
498 id, err := res.LastInsertId()
499 if err != nil {
500 return 0, fmt.Errorf("event last insert id: %w", err)
501 }
502 return id, nil
503}
504
505// BuildkiteBuildRef is the persisted mapping from one Buildkite build
506// to the Tangled pipeline tuple that spawned it. It's the row written
507// by the Buildkite provider at Spawn time and read back from two
508// places: the webhook handler (by build UUID) when an event arrives,
509// and the /logs handler (by knot+rkey+workflow) when an appview
510// client asks for output.
511type BuildkiteBuildRef struct {
512 BuildUUID string
513 BuildNumber int64
514 PipelineSlug string
515 // Org is the Buildkite organisation slug the build was created
516 // against. Persisted at Spawn time so /logs and any other
517 // post-creation API call can target the same org the workflow
518 // originally chose — see the workflow YAML `tack.buildkite.org`
519 // override. Empty means "use the provider's default org", which
520 // is what every row written before the org column existed will
521 // scan as.
522 Org string
523 Knot string
524 PipelineRkey string
525 Workflow string
526 PipelineURI string
527}
528
529// TektonRunRef is the persisted link from a Tangled workflow tuple
530// to the in-cluster PipelineRun tack created for it. The tuple is the
531// user-facing identity the appview knows; namespace/name/uid are the
532// Kubernetes identity needed for status watching and log lookup.
533type TektonRunRef struct {
534 Knot string
535 PipelineRkey string
536 Workflow string
537 Namespace string
538 PipelineRunName string
539 PipelineRunUID string
540 PipelineName string
541 PipelineURI string
542}
543
544// InsertBuildkiteBuild records that a Buildkite build was created on
545// behalf of the given (knot, pipelineRkey, workflow) tuple. Uses
546// INSERT OR REPLACE so that an unlikely build-uuid collision (or a
547// Buildkite-side rebuild that re-fires us) just refreshes the row
548// instead of failing.
549func (s *store) InsertBuildkiteBuild(ctx context.Context, ref BuildkiteBuildRef) error {
550 // Capture wall-clock and monotonic-friendly forms once so the two
551 // columns agree on the same instant. created_at is the
552 // human-readable RFC3339Nano string; created_unix_ns is the
553 // integer the lookup orders on (text comparison of nanosecond
554 // timestamps isn't reliable, so we sort on the int instead).
555 now := time.Now().UTC()
556 _, err := s.db.ExecContext(ctx,
557 `INSERT INTO buildkite_builds (
558 build_uuid, build_number, pipeline_slug, org,
559 knot, pipeline_rkey, workflow,
560 pipeline_uri, created_at, created_unix_ns
561 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
562 ON CONFLICT(build_uuid) DO UPDATE SET
563 build_number = excluded.build_number,
564 pipeline_slug = excluded.pipeline_slug,
565 org = excluded.org,
566 knot = excluded.knot,
567 pipeline_rkey = excluded.pipeline_rkey,
568 workflow = excluded.workflow,
569 pipeline_uri = excluded.pipeline_uri,
570 created_at = excluded.created_at,
571 created_unix_ns = excluded.created_unix_ns`,
572 ref.BuildUUID, ref.BuildNumber, ref.PipelineSlug, ref.Org,
573 ref.Knot, ref.PipelineRkey, ref.Workflow,
574 ref.PipelineURI, now.Format(time.RFC3339Nano), now.UnixNano(),
575 )
576 if err != nil {
577 return fmt.Errorf("insert buildkite_build: %w", err)
578 }
579 return nil
580}
581
582// LookupBuildkiteBuildByUUID returns the saved mapping for the given
583// Buildkite build UUID, or nil when no such build is recorded.
584// Returning a nil pointer rather than a sentinel error keeps the
585// webhook handler's "we don't know about this build" branch a simple
586// nil check.
587func (s *store) LookupBuildkiteBuildByUUID(ctx context.Context, buildUUID string) (*BuildkiteBuildRef, error) {
588 var ref BuildkiteBuildRef
589 err := s.db.QueryRowContext(ctx,
590 `SELECT build_uuid, build_number, pipeline_slug, org,
591 knot, pipeline_rkey, workflow, pipeline_uri
592 FROM buildkite_builds WHERE build_uuid = ?`,
593 buildUUID,
594 ).Scan(
595 &ref.BuildUUID, &ref.BuildNumber, &ref.PipelineSlug, &ref.Org,
596 &ref.Knot, &ref.PipelineRkey, &ref.Workflow, &ref.PipelineURI,
597 )
598 if errors.Is(err, sql.ErrNoRows) {
599 return nil, nil
600 }
601 if err != nil {
602 return nil, fmt.Errorf("lookup buildkite_build by uuid: %w", err)
603 }
604 return &ref, nil
605}
606
607// LookupBuildkiteBuildByTuple finds the most recently created build
608// for (knot, pipelineRkey, workflow). Returns nil when no build has
609// been recorded for that tuple — used by /logs to translate the
610// appview's path-based identity back into something Buildkite knows.
611//
612// "Most recent" matters because a workflow may have multiple builds
613// over time (rebuilds, re-triggers). We always serve logs for the
614// latest run; older runs are still queryable by build UUID directly
615// if anyone ever wants that.
616//
617// Ordering is on created_unix_ns (a monotonic int) rather than
618// created_at. Text comparison of RFC3339Nano timestamps is not
619// reliable across nanosecond precision, which used to make this
620// query occasionally pick the wrong run. created_at and build_number
621// are kept as deterministic tiebreakers for legacy rows that pre-date
622// the new column and still scan as 0.
623func (s *store) LookupBuildkiteBuildByTuple(ctx context.Context, knot, pipelineRkey, workflow string) (*BuildkiteBuildRef, error) {
624 var ref BuildkiteBuildRef
625 err := s.db.QueryRowContext(ctx,
626 `SELECT build_uuid, build_number, pipeline_slug, org,
627 knot, pipeline_rkey, workflow, pipeline_uri
628 FROM buildkite_builds
629 WHERE knot = ? AND pipeline_rkey = ? AND workflow = ?
630 ORDER BY created_unix_ns DESC, created_at DESC, build_number DESC
631 LIMIT 1`,
632 knot, pipelineRkey, workflow,
633 ).Scan(
634 &ref.BuildUUID, &ref.BuildNumber, &ref.PipelineSlug, &ref.Org,
635 &ref.Knot, &ref.PipelineRkey, &ref.Workflow, &ref.PipelineURI,
636 )
637 if errors.Is(err, sql.ErrNoRows) {
638 return nil, nil
639 }
640 if err != nil {
641 return nil, fmt.Errorf("lookup buildkite_build by tuple: %w", err)
642 }
643 return &ref, nil
644}
645
646// InsertTektonRun records the latest PipelineRun created for a Tangled
647// workflow tuple. Reusing the tuple as the primary key intentionally
648// makes /logs resolve to the newest run for that workflow identity.
649func (s *store) InsertTektonRun(ctx context.Context, ref TektonRunRef) error {
650 now := time.Now().UTC()
651 _, err := s.db.ExecContext(ctx,
652 `INSERT INTO tekton_runs (
653 knot, pipeline_rkey, workflow,
654 namespace, pipeline_run_name, pipeline_run_uid,
655 pipeline_name, pipeline_uri, created_at, created_unix_ns
656 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
657 ON CONFLICT(knot, pipeline_rkey, workflow) DO UPDATE SET
658 namespace = excluded.namespace,
659 pipeline_run_name = excluded.pipeline_run_name,
660 pipeline_run_uid = excluded.pipeline_run_uid,
661 pipeline_name = excluded.pipeline_name,
662 pipeline_uri = excluded.pipeline_uri,
663 created_at = excluded.created_at,
664 created_unix_ns = excluded.created_unix_ns`,
665 ref.Knot, ref.PipelineRkey, ref.Workflow,
666 ref.Namespace, ref.PipelineRunName, ref.PipelineRunUID,
667 ref.PipelineName, ref.PipelineURI, now.Format(time.RFC3339Nano), now.UnixNano(),
668 )
669 if err != nil {
670 return fmt.Errorf("insert tekton_run: %w", err)
671 }
672 return nil
673}
674
675// LookupTektonRunByTuple resolves the appview's path-based identity to
676// the concrete PipelineRun tack created in Kubernetes.
677func (s *store) LookupTektonRunByTuple(ctx context.Context, knot, pipelineRkey, workflow string) (*TektonRunRef, error) {
678 var ref TektonRunRef
679 err := s.db.QueryRowContext(ctx,
680 `SELECT knot, pipeline_rkey, workflow,
681 namespace, pipeline_run_name, pipeline_run_uid,
682 pipeline_name, pipeline_uri
683 FROM tekton_runs
684 WHERE knot = ? AND pipeline_rkey = ? AND workflow = ?`,
685 knot, pipelineRkey, workflow,
686 ).Scan(
687 &ref.Knot, &ref.PipelineRkey, &ref.Workflow,
688 &ref.Namespace, &ref.PipelineRunName, &ref.PipelineRunUID,
689 &ref.PipelineName, &ref.PipelineURI,
690 )
691 if errors.Is(err, sql.ErrNoRows) {
692 return nil, nil
693 }
694 if err != nil {
695 return nil, fmt.Errorf("lookup tekton_run by tuple: %w", err)
696 }
697 return &ref, nil
698}
699
700// EventsAfter returns every event row with `created` strictly greater
701// than cursor, in cursor order. Used by /events to backfill a
702// reconnecting subscriber and to drain newly-published rows on each
703// broker notification.
704//
705// Pass cursor=0 to get the full log from the beginning, which is what
706// happens when a subscriber connects without a ?cursor= query param.
707func (s *store) EventsAfter(ctx context.Context, cursor int64) ([]EventRow, error) {
708 rows, err := s.db.QueryContext(ctx,
709 `SELECT created, rkey, nsid, event_json
710 FROM events
711 WHERE created > ?
712 ORDER BY created ASC`,
713 cursor,
714 )
715 if err != nil {
716 return nil, fmt.Errorf("query events: %w", err)
717 }
718 defer rows.Close()
719
720 out := []EventRow{}
721 for rows.Next() {
722 var (
723 ev EventRow
724 raw string
725 )
726 if err := rows.Scan(&ev.Created, &ev.Rkey, &ev.Nsid, &raw); err != nil {
727 return nil, fmt.Errorf("scan event: %w", err)
728 }
729 ev.EventJSON = json.RawMessage(raw)
730 out = append(out, ev)
731 }
732 if err := rows.Err(); err != nil {
733 return nil, fmt.Errorf("iterate events: %w", err)
734 }
735 return out, nil
736}