Stitch any CI into Tangled
82
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 736 lines 28 kB view raw
1package main 2 3// SQLite-backed persistence for tack. Holds: 4// 5// - the jetstream cursor, so restarts resume the firehose where the 6// previous run left off 7// - mirrored Tangled state (spindle members, repos, collaborators) 8// and the authorization helpers built on top of it, used both to 9// gate inbound pipeline triggers and to decide which knots we 10// should be subscribed to 11// - the outbound event log fed to /events websocket subscribers 12// - the Buildkite build → pipeline mapping the webhook and /logs 13// handlers look up 14// 15// Per-method docs go into more detail. 16 17import ( 18 "context" 19 "database/sql" 20 "encoding/json" 21 "errors" 22 "fmt" 23 "strconv" 24 "time" 25 26 _ "github.com/mattn/go-sqlite3" 27) 28 29// store wraps the SQLite handle and exposes the small set of operations 30// the rest of tack needs. Keeping the surface narrow lets the persistence 31// layer be swapped or mocked later without rewriting callers. 32type store struct { 33 db *sql.DB 34} 35 36// openStore opens (or creates) the SQLite database at path and applies 37// the schema. It also flips on WAL + NORMAL synchronous, which is the 38// usual "this is a long-running server, not a one-shot script" config: 39// concurrent reads don't block the single writer, and we trade a little 40// crash-safety on power loss for a lot of write throughput. 41func openStore(path string) (*store, error) { 42 // mattn/go-sqlite3 reads pragma-style query parameters (_journal_mode, 43 // _synchronous, _foreign_keys) and applies them on each new connection. 44 // journal_mode=WAL persists in the database file but setting it here 45 // also covers the first-ever open. 46 dsn := fmt.Sprintf("file:%s?_journal_mode=WAL&_synchronous=NORMAL&_foreign_keys=on", path) 47 db, err := sql.Open("sqlite3", dsn) 48 if err != nil { 49 return nil, fmt.Errorf("open sqlite %q: %w", path, err) 50 } 51 52 // SQLite only supports one writer at a time. Capping max open conns 53 // at 1 keeps database/sql from spinning up extra connections that 54 // will only ever serialize behind that writer. 55 db.SetMaxOpenConns(1) 56 57 s := &store{db: db} 58 if err := s.migrate(context.Background()); err != nil { 59 _ = db.Close() 60 return nil, err 61 } 62 return s, nil 63} 64 65// Close releases the underlying database handle. 66func (s *store) Close() error { 67 return s.db.Close() 68} 69 70// metaCursorKey is the meta-table key under which we persist the 71// jetstream cursor. Pulled out as a constant to avoid drift between 72// load/save sites. 73const metaCursorKey = "jetstream_cursor" 74 75// LoadCursor returns the persisted jetstream cursor, or nil if none has 76// been saved yet (signaling "start from now"). The cursor is a unix 77// microsecond timestamp — see jetstream.Event.TimeUS. 78func (s *store) LoadCursor(ctx context.Context) (*int64, error) { 79 var raw string 80 err := s.db.QueryRowContext(ctx, 81 `SELECT value FROM meta WHERE key = ?`, metaCursorKey, 82 ).Scan(&raw) 83 if errors.Is(err, sql.ErrNoRows) { 84 return nil, nil 85 } 86 if err != nil { 87 return nil, fmt.Errorf("load cursor: %w", err) 88 } 89 v, err := strconv.ParseInt(raw, 10, 64) 90 if err != nil { 91 // A malformed cursor shouldn't wedge startup — log-and-ignore 92 // (return nil) so we just resume from "now". The caller has the 93 // logger; we surface the parse error so it can decide. 94 return nil, fmt.Errorf("parse cursor %q: %w", raw, err) 95 } 96 return &v, nil 97} 98 99// SaveCursor writes the jetstream cursor. It uses an UPSERT so the meta 100// row is created on first call and updated thereafter. 101func (s *store) SaveCursor(ctx context.Context, cursor int64) error { 102 _, err := s.db.ExecContext(ctx, 103 `INSERT INTO meta (key, value) VALUES (?, ?) 104 ON CONFLICT(key) DO UPDATE SET value = excluded.value`, 105 metaCursorKey, strconv.FormatInt(cursor, 10), 106 ) 107 if err != nil { 108 return fmt.Errorf("save cursor: %w", err) 109 } 110 return nil 111} 112 113// UpsertSpindleMember records (or refreshes) a sh.tangled.spindle.member 114// observation. 115func (s *store) UpsertSpindleMember(ctx context.Context, did, rkey, instance, subject, createdAt string) error { 116 _, err := s.db.ExecContext(ctx, 117 `INSERT INTO spindle_members (did, rkey, instance, subject, created_at) 118 VALUES (?, ?, ?, ?, ?) 119 ON CONFLICT(did, rkey) DO UPDATE SET 120 instance = excluded.instance, 121 subject = excluded.subject, 122 created_at = excluded.created_at`, 123 did, rkey, instance, subject, createdAt, 124 ) 125 if err != nil { 126 return fmt.Errorf("upsert spindle_member: %w", err) 127 } 128 return nil 129} 130 131// DeleteSpindleMember removes a member record by its ATProto identity. 132func (s *store) DeleteSpindleMember(ctx context.Context, did, rkey string) error { 133 _, err := s.db.ExecContext(ctx, 134 `DELETE FROM spindle_members WHERE did = ? AND rkey = ?`, 135 did, rkey, 136 ) 137 if err != nil { 138 return fmt.Errorf("delete spindle_member: %w", err) 139 } 140 return nil 141} 142 143// UpsertRepo records (or refreshes) a sh.tangled.repo observation. 144// spindle and repoDid may be empty strings — we store them as such rather 145// than as SQL NULLs to keep the upsert path uniform. 146func (s *store) UpsertRepo(ctx context.Context, did, rkey, knot, name, spindle, repoDid, createdAt string) error { 147 _, err := s.db.ExecContext(ctx, 148 `INSERT INTO repos (did, rkey, knot, name, spindle, repo_did, created_at) 149 VALUES (?, ?, ?, ?, ?, ?, ?) 150 ON CONFLICT(did, rkey) DO UPDATE SET 151 knot = excluded.knot, 152 name = excluded.name, 153 spindle = excluded.spindle, 154 repo_did = excluded.repo_did, 155 created_at = excluded.created_at`, 156 did, rkey, knot, name, spindle, repoDid, createdAt, 157 ) 158 if err != nil { 159 return fmt.Errorf("upsert repo: %w", err) 160 } 161 return nil 162} 163 164// DeleteRepo removes a repo record by its ATProto identity. 165func (s *store) DeleteRepo(ctx context.Context, did, rkey string) error { 166 _, err := s.db.ExecContext(ctx, 167 `DELETE FROM repos WHERE did = ? AND rkey = ?`, 168 did, rkey, 169 ) 170 if err != nil { 171 return fmt.Errorf("delete repo: %w", err) 172 } 173 return nil 174} 175 176// UpsertRepoCollaborator records (or refreshes) a 177// sh.tangled.repo.collaborator observation. 178func (s *store) UpsertRepoCollaborator(ctx context.Context, did, rkey, repo, repoDid, subject, createdAt string) error { 179 _, err := s.db.ExecContext(ctx, 180 `INSERT INTO repo_collaborators (did, rkey, repo, repo_did, subject, created_at) 181 VALUES (?, ?, ?, ?, ?, ?) 182 ON CONFLICT(did, rkey) DO UPDATE SET 183 repo = excluded.repo, 184 repo_did = excluded.repo_did, 185 subject = excluded.subject, 186 created_at = excluded.created_at`, 187 did, rkey, repo, repoDid, subject, createdAt, 188 ) 189 if err != nil { 190 return fmt.Errorf("upsert repo_collaborator: %w", err) 191 } 192 return nil 193} 194 195// GetRepo returns the (knot, spindle) currently stored for a (did, rkey) 196// pair. Both are returned as empty strings when no row exists; callers 197// that need to distinguish "absent" from "stored but empty" should 198// pre-check existence themselves. 199// 200// This exists so applyRepo can read the *previous* spindle/knot of a 201// record before applying a mutation, which is what makes it possible to 202// detect transitions like "this repo used to be ours, now it isn't" and 203// trigger a knot unsubscribe. 204func (s *store) GetRepo(ctx context.Context, did, rkey string) (knot, spindle string, err error) { 205 err = s.db.QueryRowContext(ctx, 206 `SELECT knot, spindle FROM repos WHERE did = ? AND rkey = ?`, 207 did, rkey, 208 ).Scan(&knot, &spindle) 209 if errors.Is(err, sql.ErrNoRows) { 210 return "", "", nil 211 } 212 if err != nil { 213 return "", "", fmt.Errorf("get repo: %w", err) 214 } 215 return knot, spindle, nil 216} 217 218// AuthorizePipelineActor reports whether a pipeline trigger that 219// arrived on knot for repo (repoOwnerDID, repoName) should be allowed 220// to spawn work on this spindle. Both halves of the answer are 221// derived from the persisted Tangled state in spindle_members and 222// repos: the network's own authorization records, mirrored here from 223// the firehose by jetstream.go. 224// 225// The check is two independent gates; both must hold: 226// 227// 1. **Repo claim**: a sh.tangled.repo record exists naming 228// hostname as its CI spindle AND knot as its host knot, 229// published by repoOwnerDID with the matching name. Without this 230// a knot we already happen to be subscribed to (because *some* 231// repo on it points at us) could otherwise smuggle in pipeline 232// triggers for unrelated repos that never opted into us. 233// 234// 2. **Actor membership**: repoOwnerDID is either the spindle owner 235// itself, or has been authorized via a sh.tangled.spindle.member 236// record published by the spindle owner. The publisher (`did`) 237// of that membership grant must equal ownerDID. Anyone can 238// publish a member record naming anyone, so trusting unsigned 239// grants would let any DID grant itself access. We do NOT match 240// against the record's `instance` column because the upstream 241// ecosystem stores it inconsistently (URL vs hostname), and a 242// tack instance only ever speaks for a single hostname anyway. 243// 244// reason is a short, log-friendly description of why authorization 245// failed when ok is false; it is empty when ok is true. Errors 246// reported here are SQL/IO failures, never policy denials; those 247// surface as ok=false with a populated reason. 248func (s *store) AuthorizePipelineActor( 249 ctx context.Context, 250 hostname, knot, ownerDID, repoOwnerDID, repoName string, 251) (ok bool, reason string, err error) { 252 // We can't authorize an unknown actor or an unidentified repo: 253 // every gate below joins on these. Bail before touching SQLite 254 // so a malformed trigger logs cleanly instead of triggering a 255 // "no rows" miss that could be confused with a real denial. 256 if repoOwnerDID == "" { 257 return false, "trigger has no repo did", nil 258 } 259 if repoName == "" { 260 return false, "trigger has no repo name", nil 261 } 262 263 // Gate 1: this specific repo opted into us on this specific knot. 264 var n int 265 if err := s.db.QueryRowContext(ctx, 266 `SELECT COUNT(*) FROM repos 267 WHERE did = ? AND name = ? AND knot = ? AND spindle = ?`, 268 repoOwnerDID, repoName, knot, hostname, 269 ).Scan(&n); err != nil { 270 return false, "", fmt.Errorf("count repo claim: %w", err) 271 } 272 if n == 0 { 273 return false, "no repo record claims this spindle on this knot", nil 274 } 275 276 // Gate 2: actor is the spindle owner, or vouched for by them. 277 if repoOwnerDID == ownerDID { 278 return true, "", nil 279 } 280 if err := s.db.QueryRowContext(ctx, 281 `SELECT COUNT(*) FROM spindle_members 282 WHERE did = ? AND subject = ?`, 283 ownerDID, repoOwnerDID, 284 ).Scan(&n); err != nil { 285 return false, "", fmt.Errorf("count membership: %w", err) 286 } 287 if n == 0 { 288 return false, "actor is not a spindle member", nil 289 } 290 return true, "", nil 291} 292 293// IsKnotWanted reports whether any *authorized* repo currently stored 294// still names the given hostname as its spindle and the given knot as 295// its host. After a repo update or delete this is the question we ask 296// to decide whether to keep watching that knot or unsubscribe from it. 297// 298// "Authorized" means the repo's publisher (the row's `did` column) is 299// either the spindle owner or has been vouched for by the spindle owner 300// via a sh.tangled.spindle.member record. Without this filter, a non- 301// member could pin us to an arbitrary attacker-chosen knot just by 302// publishing a sh.tangled.repo record naming us as its spindle. See 303// the matching gate in IsAuthorizedActor and AuthorizePipelineActor. 304func (s *store) IsKnotWanted(ctx context.Context, hostname, ownerDID, knot string) (bool, error) { 305 var n int 306 err := s.db.QueryRowContext(ctx, 307 `SELECT COUNT(*) FROM repos r 308 WHERE r.spindle = ? AND r.knot = ? 309 AND ( 310 r.did = ? 311 OR EXISTS ( 312 SELECT 1 FROM spindle_members m 313 WHERE m.did = ? AND m.subject = r.did 314 ) 315 )`, 316 hostname, knot, ownerDID, ownerDID, 317 ).Scan(&n) 318 if err != nil { 319 return false, fmt.Errorf("count repos for knot: %w", err) 320 } 321 return n > 0, nil 322} 323 324// IsAuthorizedActor reports whether did is the spindle owner or has 325// been authorized by the spindle owner via a sh.tangled.spindle.member 326// record. The membership record's publisher (its `did` column) must 327// equal ownerDID; anyone can publish a membership record naming 328// anyone, so trusting unsigned grants would let any DID grant itself 329// access. This is the same trust rule AuthorizePipelineActor enforces; 330// it's pulled out here so knot-subscription decisions in jetstream.go 331// gate on the same check as pipeline-spawning decisions in knot.go. 332func (s *store) IsAuthorizedActor(ctx context.Context, ownerDID, did string) (bool, error) { 333 if did == "" { 334 return false, nil 335 } 336 if did == ownerDID { 337 return true, nil 338 } 339 var n int 340 err := s.db.QueryRowContext(ctx, 341 `SELECT COUNT(*) FROM spindle_members 342 WHERE did = ? AND subject = ?`, 343 ownerDID, did, 344 ).Scan(&n) 345 if err != nil { 346 return false, fmt.Errorf("count membership: %w", err) 347 } 348 return n > 0, nil 349} 350 351// GetSpindleMember returns the subject DID currently stored for a 352// (did, rkey) spindle.member row, or "" when no such row exists. 353// 354// Used by the jetstream handler to learn whose authorization a 355// delete/update of a membership record affects, so it can reconcile 356// that subject's knot subscriptions in the same step as the mutation. 357func (s *store) GetSpindleMember(ctx context.Context, did, rkey string) (string, error) { 358 var subject string 359 err := s.db.QueryRowContext(ctx, 360 `SELECT subject FROM spindle_members WHERE did = ? AND rkey = ?`, 361 did, rkey, 362 ).Scan(&subject) 363 if errors.Is(err, sql.ErrNoRows) { 364 return "", nil 365 } 366 if err != nil { 367 return "", fmt.Errorf("get spindle_member: %w", err) 368 } 369 return subject, nil 370} 371 372// KnotsForOwner returns the distinct knot hostnames of repos published 373// by `did` whose spindle field equals hostname. Used after a membership 374// change to find which knots that DID's repos want, so we can subscribe 375// (newly granted) or potentially unsubscribe (revoked) in lockstep with 376// the grant. 377// 378// Returns an empty slice (not nil) on no matches so callers can range 379// over the result without a nil check. 380func (s *store) KnotsForOwner(ctx context.Context, hostname, did string) ([]string, error) { 381 rows, err := s.db.QueryContext(ctx, 382 `SELECT DISTINCT knot FROM repos 383 WHERE did = ? AND spindle = ? AND knot <> ''`, 384 did, hostname, 385 ) 386 if err != nil { 387 return nil, fmt.Errorf("query knots for owner: %w", err) 388 } 389 defer rows.Close() 390 out := []string{} 391 for rows.Next() { 392 var k string 393 if err := rows.Scan(&k); err != nil { 394 return nil, fmt.Errorf("scan knot: %w", err) 395 } 396 out = append(out, k) 397 } 398 if err := rows.Err(); err != nil { 399 return nil, fmt.Errorf("iterate knots: %w", err) 400 } 401 return out, nil 402} 403 404// KnotsForSpindle returns the distinct knot hostnames of all repos that 405// (a) have declared the given spindle hostname as their CI spindle, and 406// (b) were published by an *authorized* DID: either the spindle owner 407// itself or a member they vouched for via sh.tangled.spindle.member. 408// 409// The membership filter is critical: without it, any DID that publishes 410// a sh.tangled.repo record naming us as its spindle could force us to 411// dial an attacker-chosen knot at startup. See IsAuthorizedActor for 412// the matching trust check applied at firehose-event time. 413// 414// Returns an empty slice (not nil) when nothing matches, so callers can 415// range over the result without a nil check. 416func (s *store) KnotsForSpindle(ctx context.Context, hostname, ownerDID string) ([]string, error) { 417 rows, err := s.db.QueryContext(ctx, 418 `SELECT DISTINCT r.knot FROM repos r 419 WHERE r.spindle = ? AND r.knot <> '' 420 AND ( 421 r.did = ? 422 OR EXISTS ( 423 SELECT 1 FROM spindle_members m 424 WHERE m.did = ? AND m.subject = r.did 425 ) 426 )`, 427 hostname, ownerDID, ownerDID, 428 ) 429 if err != nil { 430 return nil, fmt.Errorf("query knots: %w", err) 431 } 432 defer rows.Close() 433 434 out := []string{} 435 for rows.Next() { 436 var k string 437 if err := rows.Scan(&k); err != nil { 438 return nil, fmt.Errorf("scan knot: %w", err) 439 } 440 out = append(out, k) 441 } 442 if err := rows.Err(); err != nil { 443 return nil, fmt.Errorf("iterate knots: %w", err) 444 } 445 return out, nil 446} 447 448// DeleteRepoCollaborator removes a collaborator record by its ATProto 449// identity. 450func (s *store) DeleteRepoCollaborator(ctx context.Context, did, rkey string) error { 451 _, err := s.db.ExecContext(ctx, 452 `DELETE FROM repo_collaborators WHERE did = ? AND rkey = ?`, 453 did, rkey, 454 ) 455 if err != nil { 456 return fmt.Errorf("delete repo_collaborator: %w", err) 457 } 458 return nil 459} 460 461// EventRow is one row of the events table. It represents an outbound 462// record we want to deliver to /events websocket subscribers, in the 463// shape callers actually need (raw record JSON, not stringly-typed). 464type EventRow struct { 465 // Created is the assigned monotonic rowid; doubles as the cursor 466 // value subscribers use to resume. 467 Created int64 468 // Rkey is the ATProto record key. For sh.tangled.pipeline.status 469 // records this is the rkey we mint when publishing. 470 Rkey string 471 // Nsid is the lexicon collection (e.g. sh.tangled.pipeline.status). 472 Nsid string 473 // EventJSON is the record body verbatim — held as RawMessage so 474 // the /events handler can splice it into the wire envelope without 475 // an unmarshal/remarshal round-trip. 476 EventJSON json.RawMessage 477} 478 479// InsertEvent appends an event row and returns its assigned `created` 480// (rowid) cursor. Storage is the source of truth for fan-out, so we 481// write here even if zero subscribers are connected — a subscriber that 482// connects later (with an old cursor) will pick the row up via 483// EventsAfter. 484// 485// eventJSON must be a valid JSON object; we store it verbatim. Length 486// validation is intentionally absent — the schema accepts arbitrary 487// TEXT and SQLite handles huge blobs fine for our scale. 488func (s *store) InsertEvent(ctx context.Context, rkey, nsid string, eventJSON []byte) (int64, error) { 489 res, err := s.db.ExecContext(ctx, 490 `INSERT INTO events (rkey, nsid, event_json, inserted_at) 491 VALUES (?, ?, ?, ?)`, 492 rkey, nsid, string(eventJSON), 493 time.Now().UTC().Format(time.RFC3339Nano), 494 ) 495 if err != nil { 496 return 0, fmt.Errorf("insert event: %w", err) 497 } 498 id, err := res.LastInsertId() 499 if err != nil { 500 return 0, fmt.Errorf("event last insert id: %w", err) 501 } 502 return id, nil 503} 504 505// BuildkiteBuildRef is the persisted mapping from one Buildkite build 506// to the Tangled pipeline tuple that spawned it. It's the row written 507// by the Buildkite provider at Spawn time and read back from two 508// places: the webhook handler (by build UUID) when an event arrives, 509// and the /logs handler (by knot+rkey+workflow) when an appview 510// client asks for output. 511type BuildkiteBuildRef struct { 512 BuildUUID string 513 BuildNumber int64 514 PipelineSlug string 515 // Org is the Buildkite organisation slug the build was created 516 // against. Persisted at Spawn time so /logs and any other 517 // post-creation API call can target the same org the workflow 518 // originally chose — see the workflow YAML `tack.buildkite.org` 519 // override. Empty means "use the provider's default org", which 520 // is what every row written before the org column existed will 521 // scan as. 522 Org string 523 Knot string 524 PipelineRkey string 525 Workflow string 526 PipelineURI string 527} 528 529// TektonRunRef is the persisted link from a Tangled workflow tuple 530// to the in-cluster PipelineRun tack created for it. The tuple is the 531// user-facing identity the appview knows; namespace/name/uid are the 532// Kubernetes identity needed for status watching and log lookup. 533type TektonRunRef struct { 534 Knot string 535 PipelineRkey string 536 Workflow string 537 Namespace string 538 PipelineRunName string 539 PipelineRunUID string 540 PipelineName string 541 PipelineURI string 542} 543 544// InsertBuildkiteBuild records that a Buildkite build was created on 545// behalf of the given (knot, pipelineRkey, workflow) tuple. Uses 546// INSERT OR REPLACE so that an unlikely build-uuid collision (or a 547// Buildkite-side rebuild that re-fires us) just refreshes the row 548// instead of failing. 549func (s *store) InsertBuildkiteBuild(ctx context.Context, ref BuildkiteBuildRef) error { 550 // Capture wall-clock and monotonic-friendly forms once so the two 551 // columns agree on the same instant. created_at is the 552 // human-readable RFC3339Nano string; created_unix_ns is the 553 // integer the lookup orders on (text comparison of nanosecond 554 // timestamps isn't reliable, so we sort on the int instead). 555 now := time.Now().UTC() 556 _, err := s.db.ExecContext(ctx, 557 `INSERT INTO buildkite_builds ( 558 build_uuid, build_number, pipeline_slug, org, 559 knot, pipeline_rkey, workflow, 560 pipeline_uri, created_at, created_unix_ns 561 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 562 ON CONFLICT(build_uuid) DO UPDATE SET 563 build_number = excluded.build_number, 564 pipeline_slug = excluded.pipeline_slug, 565 org = excluded.org, 566 knot = excluded.knot, 567 pipeline_rkey = excluded.pipeline_rkey, 568 workflow = excluded.workflow, 569 pipeline_uri = excluded.pipeline_uri, 570 created_at = excluded.created_at, 571 created_unix_ns = excluded.created_unix_ns`, 572 ref.BuildUUID, ref.BuildNumber, ref.PipelineSlug, ref.Org, 573 ref.Knot, ref.PipelineRkey, ref.Workflow, 574 ref.PipelineURI, now.Format(time.RFC3339Nano), now.UnixNano(), 575 ) 576 if err != nil { 577 return fmt.Errorf("insert buildkite_build: %w", err) 578 } 579 return nil 580} 581 582// LookupBuildkiteBuildByUUID returns the saved mapping for the given 583// Buildkite build UUID, or nil when no such build is recorded. 584// Returning a nil pointer rather than a sentinel error keeps the 585// webhook handler's "we don't know about this build" branch a simple 586// nil check. 587func (s *store) LookupBuildkiteBuildByUUID(ctx context.Context, buildUUID string) (*BuildkiteBuildRef, error) { 588 var ref BuildkiteBuildRef 589 err := s.db.QueryRowContext(ctx, 590 `SELECT build_uuid, build_number, pipeline_slug, org, 591 knot, pipeline_rkey, workflow, pipeline_uri 592 FROM buildkite_builds WHERE build_uuid = ?`, 593 buildUUID, 594 ).Scan( 595 &ref.BuildUUID, &ref.BuildNumber, &ref.PipelineSlug, &ref.Org, 596 &ref.Knot, &ref.PipelineRkey, &ref.Workflow, &ref.PipelineURI, 597 ) 598 if errors.Is(err, sql.ErrNoRows) { 599 return nil, nil 600 } 601 if err != nil { 602 return nil, fmt.Errorf("lookup buildkite_build by uuid: %w", err) 603 } 604 return &ref, nil 605} 606 607// LookupBuildkiteBuildByTuple finds the most recently created build 608// for (knot, pipelineRkey, workflow). Returns nil when no build has 609// been recorded for that tuple — used by /logs to translate the 610// appview's path-based identity back into something Buildkite knows. 611// 612// "Most recent" matters because a workflow may have multiple builds 613// over time (rebuilds, re-triggers). We always serve logs for the 614// latest run; older runs are still queryable by build UUID directly 615// if anyone ever wants that. 616// 617// Ordering is on created_unix_ns (a monotonic int) rather than 618// created_at. Text comparison of RFC3339Nano timestamps is not 619// reliable across nanosecond precision, which used to make this 620// query occasionally pick the wrong run. created_at and build_number 621// are kept as deterministic tiebreakers for legacy rows that pre-date 622// the new column and still scan as 0. 623func (s *store) LookupBuildkiteBuildByTuple(ctx context.Context, knot, pipelineRkey, workflow string) (*BuildkiteBuildRef, error) { 624 var ref BuildkiteBuildRef 625 err := s.db.QueryRowContext(ctx, 626 `SELECT build_uuid, build_number, pipeline_slug, org, 627 knot, pipeline_rkey, workflow, pipeline_uri 628 FROM buildkite_builds 629 WHERE knot = ? AND pipeline_rkey = ? AND workflow = ? 630 ORDER BY created_unix_ns DESC, created_at DESC, build_number DESC 631 LIMIT 1`, 632 knot, pipelineRkey, workflow, 633 ).Scan( 634 &ref.BuildUUID, &ref.BuildNumber, &ref.PipelineSlug, &ref.Org, 635 &ref.Knot, &ref.PipelineRkey, &ref.Workflow, &ref.PipelineURI, 636 ) 637 if errors.Is(err, sql.ErrNoRows) { 638 return nil, nil 639 } 640 if err != nil { 641 return nil, fmt.Errorf("lookup buildkite_build by tuple: %w", err) 642 } 643 return &ref, nil 644} 645 646// InsertTektonRun records the latest PipelineRun created for a Tangled 647// workflow tuple. Reusing the tuple as the primary key intentionally 648// makes /logs resolve to the newest run for that workflow identity. 649func (s *store) InsertTektonRun(ctx context.Context, ref TektonRunRef) error { 650 now := time.Now().UTC() 651 _, err := s.db.ExecContext(ctx, 652 `INSERT INTO tekton_runs ( 653 knot, pipeline_rkey, workflow, 654 namespace, pipeline_run_name, pipeline_run_uid, 655 pipeline_name, pipeline_uri, created_at, created_unix_ns 656 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 657 ON CONFLICT(knot, pipeline_rkey, workflow) DO UPDATE SET 658 namespace = excluded.namespace, 659 pipeline_run_name = excluded.pipeline_run_name, 660 pipeline_run_uid = excluded.pipeline_run_uid, 661 pipeline_name = excluded.pipeline_name, 662 pipeline_uri = excluded.pipeline_uri, 663 created_at = excluded.created_at, 664 created_unix_ns = excluded.created_unix_ns`, 665 ref.Knot, ref.PipelineRkey, ref.Workflow, 666 ref.Namespace, ref.PipelineRunName, ref.PipelineRunUID, 667 ref.PipelineName, ref.PipelineURI, now.Format(time.RFC3339Nano), now.UnixNano(), 668 ) 669 if err != nil { 670 return fmt.Errorf("insert tekton_run: %w", err) 671 } 672 return nil 673} 674 675// LookupTektonRunByTuple resolves the appview's path-based identity to 676// the concrete PipelineRun tack created in Kubernetes. 677func (s *store) LookupTektonRunByTuple(ctx context.Context, knot, pipelineRkey, workflow string) (*TektonRunRef, error) { 678 var ref TektonRunRef 679 err := s.db.QueryRowContext(ctx, 680 `SELECT knot, pipeline_rkey, workflow, 681 namespace, pipeline_run_name, pipeline_run_uid, 682 pipeline_name, pipeline_uri 683 FROM tekton_runs 684 WHERE knot = ? AND pipeline_rkey = ? AND workflow = ?`, 685 knot, pipelineRkey, workflow, 686 ).Scan( 687 &ref.Knot, &ref.PipelineRkey, &ref.Workflow, 688 &ref.Namespace, &ref.PipelineRunName, &ref.PipelineRunUID, 689 &ref.PipelineName, &ref.PipelineURI, 690 ) 691 if errors.Is(err, sql.ErrNoRows) { 692 return nil, nil 693 } 694 if err != nil { 695 return nil, fmt.Errorf("lookup tekton_run by tuple: %w", err) 696 } 697 return &ref, nil 698} 699 700// EventsAfter returns every event row with `created` strictly greater 701// than cursor, in cursor order. Used by /events to backfill a 702// reconnecting subscriber and to drain newly-published rows on each 703// broker notification. 704// 705// Pass cursor=0 to get the full log from the beginning, which is what 706// happens when a subscriber connects without a ?cursor= query param. 707func (s *store) EventsAfter(ctx context.Context, cursor int64) ([]EventRow, error) { 708 rows, err := s.db.QueryContext(ctx, 709 `SELECT created, rkey, nsid, event_json 710 FROM events 711 WHERE created > ? 712 ORDER BY created ASC`, 713 cursor, 714 ) 715 if err != nil { 716 return nil, fmt.Errorf("query events: %w", err) 717 } 718 defer rows.Close() 719 720 out := []EventRow{} 721 for rows.Next() { 722 var ( 723 ev EventRow 724 raw string 725 ) 726 if err := rows.Scan(&ev.Created, &ev.Rkey, &ev.Nsid, &raw); err != nil { 727 return nil, fmt.Errorf("scan event: %w", err) 728 } 729 ev.EventJSON = json.RawMessage(raw) 730 out = append(out, ev) 731 } 732 if err := rows.Err(); err != nil { 733 return nil, fmt.Errorf("iterate events: %w", err) 734 } 735 return out, nil 736}