Stitch any CI into Tangled
151
fork

Configure Feed

Select the types of activity you want to include in your feed.

jetstream: gate knot subscriptions on publisher membership

Previously, observing a `sh.tangled.repo` record whose spindle field
matched our hostname was enough to enroll a new knot subscription, no
matter who published the record.

`reconcileKnot` now consults a new `IsAuthorizedActor` helper before
calling `AddKnot`: the repo's publisher DID must be the spindle owner
or have been vouched for by a `sh.tangled.spindle.member` record
whose own publisher is the owner.

authored by

Mitchell Hashimoto and committed by
Tangled
c731f142 c3035867

+491 -81
+130 -27
jetstream.go
··· 78 78 clientCfg.WebsocketURL = cfg.JetstreamURL 79 79 clientCfg.WantedCollections = collections 80 80 81 - // The handler closes over `st`, `knots` and the spindle hostname so 82 - // the scheduler signature stays plain `func(ctx, *Event) error` and 83 - // applyCommit can hand the knot consumer new sources as soon as 84 - // matching repo records arrive. 81 + // The handler closes over `st`, `knots`, the spindle hostname and 82 + // the owner DID so the scheduler signature stays plain 83 + // `func(ctx, *Event) error` and applyCommit can hand the knot 84 + // consumer new sources as soon as matching repo records arrive 85 + // (gated on the publisher being an authorized actor). 85 86 handler := func(ctx context.Context, evt *jsmodels.Event) error { 86 - return handleJetstreamEvent(ctx, st, knots, cfg.Hostname, evt) 87 + return handleJetstreamEvent(ctx, st, knots, cfg.Hostname, cfg.OwnerDID, evt) 87 88 } 88 89 89 90 // Re-attach the component-scoped logger so handler — which the ··· 177 178 // lose a row that the rest of tack relies on. Leaving the cursor 178 179 // in place gives the next reconnect (which rewinds by 179 180 // jetstreamRewind) a chance to re-deliver and re-apply it. 180 - func handleJetstreamEvent(ctx context.Context, st *store, knots KnotConsumer, hostname string, evt *jsmodels.Event) error { 181 + func handleJetstreamEvent(ctx context.Context, st *store, knots KnotConsumer, hostname, ownerDID string, evt *jsmodels.Event) error { 181 182 // We only care about commits, which are the actual record CRUD 182 183 // operations on a user's PDS. Account/identity events are ignored 183 184 // for now; if we ever care about handle changes we can add them. ··· 189 190 // Dispatch on collection. Unknown collections shouldn't happen given 190 191 // our wantedCollections filter, but be defensive — jetstream may 191 192 // send schema changes ahead of us updating the filter. 192 - applyErr := applyCommit(ctx, st, knots, hostname, evt) 193 + applyErr := applyCommit(ctx, st, knots, hostname, ownerDID, evt) 193 194 if applyErr != nil { 194 195 logger.Error("apply commit", 195 196 "err", applyErr, ··· 261 262 262 263 // applyCommit routes a commit to the right store mutation based on its 263 264 // collection NSID and operation. 264 - func applyCommit(ctx context.Context, st *store, knots KnotConsumer, hostname string, evt *jsmodels.Event) error { 265 + func applyCommit(ctx context.Context, st *store, knots KnotConsumer, hostname, ownerDID string, evt *jsmodels.Event) error { 265 266 c := evt.Commit 266 267 switch c.Collection { 267 268 case tangled.SpindleMemberNSID: 268 - return applySpindleMember(ctx, st, evt.Did, c) 269 + return applySpindleMember(ctx, st, knots, hostname, ownerDID, evt.Did, c) 269 270 case tangled.RepoNSID: 270 - return applyRepo(ctx, st, knots, hostname, evt.Did, c) 271 + return applyRepo(ctx, st, knots, hostname, ownerDID, evt.Did, c) 271 272 case tangled.RepoCollaboratorNSID: 272 273 return applyRepoCollaborator(ctx, st, evt.Did, c) 273 274 default: ··· 279 280 } 280 281 } 281 282 282 - func applySpindleMember(ctx context.Context, st *store, did string, c *jsmodels.Commit) error { 283 + func applySpindleMember(ctx context.Context, st *store, knots KnotConsumer, hostname, ownerDID, did string, c *jsmodels.Commit) error { 283 284 switch c.Operation { 284 285 case jsOpCreate, jsOpUpdate: 286 + // Capture the previous subject. Necessary because a same-rkey 287 + // update can move the grant to a different DID, in which case 288 + // the *old* subject's knots may need to be released even as 289 + // the new subject's are picked up. 290 + oldSubject, err := st.GetSpindleMember(ctx, did, c.RKey) 291 + if err != nil { 292 + return err 293 + } 294 + 285 295 var rec tangled.SpindleMember 286 296 if err := json.Unmarshal(c.Record, &rec); err != nil { 287 297 // Decode failures are a permanent property of the record's 288 298 // bytes; mark as bad so the cursor can advance past it. 289 299 return badRecord(fmt.Errorf("decode spindle.member: %w", err)) 290 300 } 291 - return st.UpsertSpindleMember(ctx, did, c.RKey, rec.Instance, rec.Subject, rec.CreatedAt) 301 + if err := st.UpsertSpindleMember(ctx, did, c.RKey, rec.Instance, rec.Subject, rec.CreatedAt); err != nil { 302 + return err 303 + } 304 + 305 + // Only grants published by the spindle owner actually change 306 + // authorization (see IsAuthorizedActor). Forged grants are 307 + // stored but don't move the needle, so don't bother 308 + // reconciling on them: it would be a no-op at best and 309 + // add log noise. 310 + if did != ownerDID { 311 + return nil 312 + } 313 + if oldSubject != "" && oldSubject != rec.Subject { 314 + if err := reconcileMember(ctx, st, knots, hostname, ownerDID, oldSubject); err != nil { 315 + return err 316 + } 317 + } 318 + return reconcileMember(ctx, st, knots, hostname, ownerDID, rec.Subject) 319 + 292 320 case jsOpDelete: 293 - return st.DeleteSpindleMember(ctx, did, c.RKey) 321 + oldSubject, err := st.GetSpindleMember(ctx, did, c.RKey) 322 + if err != nil { 323 + return err 324 + } 325 + if err := st.DeleteSpindleMember(ctx, did, c.RKey); err != nil { 326 + return err 327 + } 328 + // As above: only the owner's grants matter for authorization. 329 + if did != ownerDID || oldSubject == "" { 330 + return nil 331 + } 332 + return reconcileMember(ctx, st, knots, hostname, ownerDID, oldSubject) 294 333 } 295 334 return nil 296 335 } 297 336 298 - func applyRepo(ctx context.Context, st *store, knots KnotConsumer, hostname string, did string, c *jsmodels.Commit) error { 337 + func applyRepo(ctx context.Context, st *store, knots KnotConsumer, hostname, ownerDID, did string, c *jsmodels.Commit) error { 299 338 switch c.Operation { 300 339 case jsOpCreate, jsOpUpdate: 301 340 var rec tangled.Repo ··· 322 361 } 323 362 324 363 newSpindle := deref(rec.Spindle) 325 - return reconcileKnot(ctx, st, knots, hostname, 364 + return reconcileKnot(ctx, st, knots, hostname, ownerDID, did, 326 365 oldKnot, oldSpindle, 327 366 rec.Knot, newSpindle, 328 367 ) ··· 339 378 if err := st.DeleteRepo(ctx, did, c.RKey); err != nil { 340 379 return err 341 380 } 342 - return reconcileKnot(ctx, st, knots, hostname, 381 + return reconcileKnot(ctx, st, knots, hostname, ownerDID, did, 343 382 oldKnot, oldSpindle, 344 383 "", "", 345 384 ) ··· 353 392 // post-mutation truth. 354 393 // 355 394 // Logic: 356 - // - If the new record names us as its spindle, ensure we're 357 - // subscribed to its knot (AddKnot is idempotent, so calling it on 358 - // an already-watched knot is cheap). 395 + // - If the new record names us as its spindle AND the publisher is 396 + // an authorized actor (spindle owner or owner-vouched member), 397 + // ensure we're subscribed to its knot. Without the membership 398 + // check, any firehose publisher could pin us to an attacker-chosen 399 + // knot just by publishing a sh.tangled.repo record naming us. 359 400 // - If the old record named us as its spindle, check whether any 360 - // other repo still references that knot through us; if not, 361 - // RemoveKnot it. Skip this when the knot didn't change AND the 362 - // spindle didn't move away from us, because then nothing actually 363 - // released our hold. 401 + // other authorized repo still references that knot through us; 402 + // if not, RemoveKnot it. Skip this when the knot didn't change 403 + // AND the spindle didn't move away from us, because then nothing 404 + // actually released our hold. 364 405 func reconcileKnot( 365 406 ctx context.Context, 366 407 st *store, 367 408 knots KnotConsumer, 368 - hostname string, 409 + hostname, ownerDID, publisherDID string, 369 410 oldKnot, oldSpindle string, 370 411 newKnot, newSpindle string, 371 412 ) error { ··· 377 418 } 378 419 379 420 if newSpindle == hostname && newKnot != "" { 380 - knots.AddKnot(ctx, newKnot) 421 + ok, err := st.IsAuthorizedActor(ctx, ownerDID, publisherDID) 422 + if err != nil { 423 + return err 424 + } 425 + if ok { 426 + knots.AddKnot(ctx, newKnot) 427 + } else { 428 + loggerFrom(ctx).Warn("ignoring repo from unauthorized publisher", 429 + "publisher_did", publisherDID, 430 + "knot", newKnot, 431 + ) 432 + } 381 433 } 382 434 383 435 // Did we just lose our claim on oldKnot? Two ways that can happen: 384 436 // the spindle field moved off of us, or the knot field moved to a 385 437 // different host. Either is a reason to consider unsubscribing 386 - // from oldKnot — but only if no *other* repo still has us on it. 438 + // from oldKnot, but only if no *other* authorized repo still has 439 + // us on it. 387 440 releasedOld := oldSpindle == hostname && oldKnot != "" && 388 441 (newSpindle != hostname || newKnot != oldKnot) 389 442 if releasedOld { 390 - stillWanted, err := st.IsKnotWanted(ctx, hostname, oldKnot) 443 + stillWanted, err := st.IsKnotWanted(ctx, hostname, ownerDID, oldKnot) 391 444 if err != nil { 392 445 return err 393 446 } 394 447 if !stillWanted { 395 448 knots.RemoveKnot(ctx, oldKnot) 449 + } 450 + } 451 + return nil 452 + } 453 + 454 + // reconcileMember adjusts knot subscriptions after a membership grant 455 + // or revocation may have changed `subject`'s authorization status. 456 + // For each knot named by subject's repos that point at us: 457 + // 458 + // - if subject is now authorized, AddKnot (idempotent: already- 459 + // subscribed knots are no-ops in the consumer); 460 + // - if subject is now unauthorized, ask IsKnotWanted whether any 461 + // *other* authorized repo still holds the knot; if not, 462 + // RemoveKnot. 463 + // 464 + // Without this, a member's repos picked up over the firehose before 465 + // the grant arrived would never get subscribed (the grant doesn't 466 + // re-deliver the older repo events), and a revocation would leave 467 + // the now-unauthorized publisher's knot subscribed until restart. 468 + func reconcileMember( 469 + ctx context.Context, 470 + st *store, 471 + knots KnotConsumer, 472 + hostname, ownerDID, subject string, 473 + ) error { 474 + if knots == nil || subject == "" { 475 + return nil 476 + } 477 + knotsForSubject, err := st.KnotsForOwner(ctx, hostname, subject) 478 + if err != nil { 479 + return err 480 + } 481 + if len(knotsForSubject) == 0 { 482 + return nil 483 + } 484 + authorized, err := st.IsAuthorizedActor(ctx, ownerDID, subject) 485 + if err != nil { 486 + return err 487 + } 488 + for _, k := range knotsForSubject { 489 + if authorized { 490 + knots.AddKnot(ctx, k) 491 + continue 492 + } 493 + stillWanted, err := st.IsKnotWanted(ctx, hostname, ownerDID, k) 494 + if err != nil { 495 + return err 496 + } 497 + if !stillWanted { 498 + knots.RemoveKnot(ctx, k) 396 499 } 397 500 } 398 501 return nil
+196 -16
jetstream_test.go
··· 67 67 TimeUS: 100, 68 68 Kind: jsmodels.EventKindAccount, 69 69 } 70 - if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil { 70 + if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil { 71 71 t.Fatalf("handle: %v", err) 72 72 } 73 73 got, err := s.LoadCursor(ctx) ··· 91 91 CreatedAt: "2026-01-01T00:00:00Z", 92 92 } 93 93 evt := commitEvent(12345, "did:plc:owner", tangled.SpindleMemberNSID, jsOpCreate, "rk1", rec) 94 - if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil { 94 + if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil { 95 95 t.Fatalf("handle: %v", err) 96 96 } 97 97 ··· 119 119 t.Fatalf("seed: %v", err) 120 120 } 121 121 evt := commitEvent(99, "did:plc:owner", tangled.SpindleMemberNSID, jsOpDelete, "rk1", nil) 122 - if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil { 122 + if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil { 123 123 t.Fatalf("handle: %v", err) 124 124 } 125 125 if n := countRows(t, s, "spindle_members"); n != 0 { ··· 145 145 CreatedAt: "2026-01-01T00:00:00Z", 146 146 } 147 147 evt := commitEvent(7, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "repo1", rec) 148 - if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil { 148 + if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil { 149 149 t.Fatalf("handle: %v", err) 150 150 } 151 151 ··· 169 169 CreatedAt: "2026-01-01T00:00:00Z", 170 170 } 171 171 evt2 := commitEvent(8, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "repo2", rec2) 172 - if err := handleJetstreamEvent(ctx, s, nil, "", evt2); err != nil { 172 + if err := handleJetstreamEvent(ctx, s, nil, "", "", evt2); err != nil { 173 173 t.Fatalf("handle nil-optionals: %v", err) 174 174 } 175 175 err = s.db.QueryRowContext(ctx, ··· 198 198 CreatedAt: "2026-01-01T00:00:00Z", 199 199 } 200 200 evt := commitEvent(55, "did:plc:owner", tangled.RepoCollaboratorNSID, jsOpCreate, "c1", rec) 201 - if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil { 201 + if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil { 202 202 t.Fatalf("handle: %v", err) 203 203 } 204 204 ··· 224 224 ctx := context.Background() 225 225 226 226 evt := commitEvent(42, "did:plc:owner", "app.bsky.feed.post", jsOpCreate, "rk", map[string]string{"text": "hi"}) 227 - if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil { 227 + if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil { 228 228 t.Fatalf("handle: %v", err) 229 229 } 230 230 requireCursor(t, s, 42) ··· 249 249 Record: json.RawMessage(`{not valid json`), 250 250 }, 251 251 } 252 - if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil { 252 + if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil { 253 253 t.Fatalf("handle should swallow decode error, got: %v", err) 254 254 } 255 255 if n := countRows(t, s, "spindle_members"); n != 0 { ··· 295 295 evt := commitEvent(1000, "did:plc:owner", tangled.SpindleMemberNSID, jsOpCreate, "rk", rec) 296 296 297 297 // Expect a non-nil error: handler propagates transient failures. 298 - if err := handleJetstreamEvent(ctx, s, nil, "", evt); err == nil { 298 + if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err == nil { 299 299 t.Fatalf("handle should return transient error, got nil") 300 300 } 301 301 ··· 334 334 evt := commitEvent(1, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "rk", rec) 335 335 336 336 fake := &fakeKnotConsumer{} 337 - if err := handleJetstreamEvent(ctx, s, fake, ours, evt); err != nil { 337 + if err := handleJetstreamEvent(ctx, s, fake, ours, "did:plc:owner", evt); err != nil { 338 338 t.Fatalf("handle: %v", err) 339 339 } 340 340 added := fake.Added() ··· 343 343 } 344 344 } 345 345 346 + // TestRepoEventIgnoresUnauthorizedPublisher pins the high-severity 347 + // security gate: a sh.tangled.repo record naming us as its spindle 348 + // must NOT cause an outbound knot subscription unless its publisher 349 + // is the spindle owner or an owner-vouched member. Without this gate 350 + // any firehose publisher could force tack to dial an attacker-chosen 351 + // websocket simply by minting a matching repo record. 352 + func TestRepoEventIgnoresUnauthorizedPublisher(t *testing.T) { 353 + s := newTestStore(t) 354 + ctx := context.Background() 355 + 356 + const ours = "tack.example" 357 + const owner = "did:plc:owner" 358 + 359 + // did:plc:rando is neither the owner nor a member. 360 + spindle := ours 361 + rec := tangled.Repo{ 362 + Knot: "evil.example", 363 + Name: "myrepo", 364 + Spindle: &spindle, 365 + CreatedAt: "2026-01-01T00:00:00Z", 366 + } 367 + evt := commitEvent(1, "did:plc:rando", tangled.RepoNSID, jsOpCreate, "rk", rec) 368 + 369 + fake := &fakeKnotConsumer{} 370 + if err := handleJetstreamEvent(ctx, s, fake, ours, owner, evt); err != nil { 371 + t.Fatalf("handle: %v", err) 372 + } 373 + if added := fake.Added(); len(added) != 0 { 374 + t.Fatalf("AddKnot calls = %v, want none (publisher is unauthorized)", added) 375 + } 376 + } 377 + 378 + // TestSpindleMemberGrantSubscribesPendingKnots covers the case where 379 + // a member publishes their sh.tangled.repo *before* the owner's 380 + // sh.tangled.spindle.member grant arrives over the firehose. Until 381 + // the grant lands the publisher is unauthorized, so the repo doesn't 382 + // pull a subscription. Once the grant arrives, the same-rkey reconcile 383 + // must catch up by AddKnot-ing every knot that member's pending repos 384 + // already named. 385 + func TestSpindleMemberGrantSubscribesPendingKnots(t *testing.T) { 386 + s := newTestStore(t) 387 + ctx := context.Background() 388 + 389 + const ours = "tack.example" 390 + const owner = "did:plc:owner" 391 + const alice = "did:plc:alice" 392 + 393 + // Step 1: alice publishes a repo claiming us. She isn't a member 394 + // yet, so no subscription should happen. 395 + spindle := ours 396 + repoRec := tangled.Repo{ 397 + Knot: "knot.example", 398 + Name: "myrepo", 399 + Spindle: &spindle, 400 + CreatedAt: "2026-01-01T00:00:00Z", 401 + } 402 + repoEvt := commitEvent(1, alice, tangled.RepoNSID, jsOpCreate, "rk", repoRec) 403 + 404 + fake := &fakeKnotConsumer{} 405 + if err := handleJetstreamEvent(ctx, s, fake, ours, owner, repoEvt); err != nil { 406 + t.Fatalf("handle repo: %v", err) 407 + } 408 + if added := fake.Added(); len(added) != 0 { 409 + t.Fatalf("AddKnot before grant = %v, want none", added) 410 + } 411 + 412 + // Step 2: owner publishes a membership grant naming alice. The 413 + // reconcile triggered by that grant must subscribe to alice's 414 + // already-stored repo's knot. 415 + grant := tangled.SpindleMember{ 416 + Instance: ours, 417 + Subject: alice, 418 + CreatedAt: "2026-01-02T00:00:00Z", 419 + } 420 + grantEvt := commitEvent(2, owner, tangled.SpindleMemberNSID, jsOpCreate, "mk", grant) 421 + if err := handleJetstreamEvent(ctx, s, fake, ours, owner, grantEvt); err != nil { 422 + t.Fatalf("handle grant: %v", err) 423 + } 424 + if added := fake.Added(); len(added) != 1 || added[0] != "knot.example" { 425 + t.Fatalf("AddKnot after grant = %v, want [knot.example]", added) 426 + } 427 + } 428 + 429 + // TestSpindleMemberRevokeUnsubscribesKnot verifies that revoking a 430 + // previously-granted membership tears down the only-held-by-that-member 431 + // knot subscription. Without this, an attacker who briefly held 432 + // membership could leave us dialing their chosen knot until restart, 433 + // the exact lingering-subscription concern called out in KNOWN_ISSUES. 434 + func TestSpindleMemberRevokeUnsubscribesKnot(t *testing.T) { 435 + s := newTestStore(t) 436 + ctx := context.Background() 437 + 438 + const ours = "tack.example" 439 + const owner = "did:plc:owner" 440 + const alice = "did:plc:alice" 441 + 442 + // Seed: owner has previously vouched for alice, and alice has 443 + // already published a repo on knot.example pointing at us. 444 + if err := s.UpsertSpindleMember(ctx, owner, "mk", ours, alice, "t"); err != nil { 445 + t.Fatal(err) 446 + } 447 + if err := s.UpsertRepo(ctx, alice, "rk", "knot.example", "myrepo", ours, "", "t"); err != nil { 448 + t.Fatal(err) 449 + } 450 + 451 + // Owner publishes a delete of the membership grant. 452 + revoke := commitEvent(10, owner, tangled.SpindleMemberNSID, jsOpDelete, "mk", nil) 453 + 454 + fake := &fakeKnotConsumer{} 455 + if err := handleJetstreamEvent(ctx, s, fake, ours, owner, revoke); err != nil { 456 + t.Fatalf("handle revoke: %v", err) 457 + } 458 + if removed := fake.Removed(); len(removed) != 1 || removed[0] != "knot.example" { 459 + t.Fatalf("RemoveKnot calls = %v, want [knot.example]", removed) 460 + } 461 + } 462 + 463 + // TestSpindleMemberGrantFromNonOwnerIgnored confirms that a forged 464 + // membership record published by anyone other than the spindle owner 465 + // does NOT trigger a knot subscription, even if a matching repo for 466 + // the named subject exists. This mirrors AuthorizePipelineActor's 467 + // rule that the membership grant's publisher must equal the spindle 468 + // owner. 469 + func TestSpindleMemberGrantFromNonOwnerIgnored(t *testing.T) { 470 + s := newTestStore(t) 471 + ctx := context.Background() 472 + 473 + const ours = "tack.example" 474 + const owner = "did:plc:owner" 475 + const alice = "did:plc:alice" 476 + 477 + if err := s.UpsertRepo(ctx, alice, "rk", "knot.example", "myrepo", ours, "", "t"); err != nil { 478 + t.Fatal(err) 479 + } 480 + 481 + // Forged grant: alice "vouches" for herself. Stored, but must 482 + // not flip her into authorized status. 483 + forgery := tangled.SpindleMember{ 484 + Instance: ours, 485 + Subject: alice, 486 + CreatedAt: "2026-01-02T00:00:00Z", 487 + } 488 + evt := commitEvent(2, alice, tangled.SpindleMemberNSID, jsOpCreate, "mk", forgery) 489 + 490 + fake := &fakeKnotConsumer{} 491 + if err := handleJetstreamEvent(ctx, s, fake, ours, owner, evt); err != nil { 492 + t.Fatalf("handle forged grant: %v", err) 493 + } 494 + if added := fake.Added(); len(added) != 0 { 495 + t.Fatalf("AddKnot calls = %v, want none (grant publisher != owner)", added) 496 + } 497 + } 498 + 346 499 // TestRepoEventIgnoresKnotForOtherSpindle confirms repos pointing at a 347 500 // *different* spindle do not pull us into watching their knot. Without 348 501 // this guard, tack would dial every knot named in any sh.tangled.repo ··· 361 514 evt := commitEvent(1, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "rk", rec) 362 515 363 516 fake := &fakeKnotConsumer{} 364 - if err := handleJetstreamEvent(ctx, s, fake, "tack.example", evt); err != nil { 517 + if err := handleJetstreamEvent(ctx, s, fake, "tack.example", "did:plc:owner", evt); err != nil { 365 518 t.Fatalf("handle: %v", err) 366 519 } 367 520 if added := fake.Added(); len(added) != 0 { ··· 396 549 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpUpdate, "rk", rec) 397 550 398 551 fake := &fakeKnotConsumer{} 399 - if err := handleJetstreamEvent(ctx, s, fake, ours, evt); err != nil { 552 + if err := handleJetstreamEvent(ctx, s, fake, ours, "did:plc:owner", evt); err != nil { 400 553 t.Fatalf("handle: %v", err) 401 554 } 402 555 if added := fake.Added(); len(added) != 0 { ··· 418 571 const ours = "tack.example" 419 572 const knot = "knot.example" 420 573 574 + // Both publishers must be vouched for; otherwise IsKnotWanted's 575 + // membership filter would zero them out and we'd unsubscribe 576 + // regardless. The reconciliation we want to test only matters 577 + // when there is a real authorized hold to keep. 578 + if err := s.UpsertSpindleMember(ctx, "did:plc:owner", "mk1", ours, "did:plc:a", "t"); err != nil { 579 + t.Fatal(err) 580 + } 581 + if err := s.UpsertSpindleMember(ctx, "did:plc:owner", "mk2", ours, "did:plc:b", "t"); err != nil { 582 + t.Fatal(err) 583 + } 584 + 421 585 // Two repos sharing one knot, both pointed at us. 422 586 if err := s.UpsertRepo(ctx, "did:plc:a", "rk1", knot, "a", ours, "", "t"); err != nil { 423 587 t.Fatal(err) ··· 437 601 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpUpdate, "rk1", rec) 438 602 439 603 fake := &fakeKnotConsumer{} 440 - if err := handleJetstreamEvent(ctx, s, fake, ours, evt); err != nil { 604 + if err := handleJetstreamEvent(ctx, s, fake, ours, "did:plc:owner", evt); err != nil { 441 605 t.Fatalf("handle: %v", err) 442 606 } 443 607 if removed := fake.Removed(); len(removed) != 0 { ··· 456 620 const oldKnot = "old.example" 457 621 const newKnot = "new.example" 458 622 623 + // Vouch for did:plc:a so reconcileKnot will actually call 624 + // AddKnot for the new host. Without the grant the publisher 625 + // is unauthorized and the AddKnot half is (correctly) skipped. 626 + if err := s.UpsertSpindleMember(ctx, "did:plc:owner", "mk1", ours, "did:plc:a", "t"); err != nil { 627 + t.Fatal(err) 628 + } 629 + 459 630 if err := s.UpsertRepo(ctx, "did:plc:a", "rk", oldKnot, "a", ours, "", "t"); err != nil { 460 631 t.Fatal(err) 461 632 } ··· 470 641 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpUpdate, "rk", rec) 471 642 472 643 fake := &fakeKnotConsumer{} 473 - if err := handleJetstreamEvent(ctx, s, fake, ours, evt); err != nil { 644 + if err := handleJetstreamEvent(ctx, s, fake, ours, "did:plc:owner", evt); err != nil { 474 645 t.Fatalf("handle: %v", err) 475 646 } 476 647 if added := fake.Added(); len(added) != 1 || added[0] != newKnot { ··· 496 667 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpDelete, "rk", nil) 497 668 498 669 fake := &fakeKnotConsumer{} 499 - if err := handleJetstreamEvent(ctx, s, fake, ours, evt); err != nil { 670 + if err := handleJetstreamEvent(ctx, s, fake, ours, "did:plc:owner", evt); err != nil { 500 671 t.Fatalf("handle: %v", err) 501 672 } 502 673 if removed := fake.Removed(); len(removed) != 1 || removed[0] != knot { ··· 513 684 const ours = "tack.example" 514 685 const knot = "knot.example" 515 686 687 + // Both publishers vouched for; otherwise IsKnotWanted would 688 + // (correctly) report neither holds the knot and we'd unsubscribe. 689 + if err := s.UpsertSpindleMember(ctx, "did:plc:owner", "mk1", ours, "did:plc:a", "t"); err != nil { 690 + t.Fatal(err) 691 + } 692 + if err := s.UpsertSpindleMember(ctx, "did:plc:owner", "mk2", ours, "did:plc:b", "t"); err != nil { 693 + t.Fatal(err) 694 + } 695 + 516 696 if err := s.UpsertRepo(ctx, "did:plc:a", "rk1", knot, "a", ours, "", "t"); err != nil { 517 697 t.Fatal(err) 518 698 } ··· 522 702 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpDelete, "rk1", nil) 523 703 524 704 fake := &fakeKnotConsumer{} 525 - if err := handleJetstreamEvent(ctx, s, fake, ours, evt); err != nil { 705 + if err := handleJetstreamEvent(ctx, s, fake, ours, "did:plc:owner", evt); err != nil { 526 706 t.Fatalf("handle: %v", err) 527 707 } 528 708 if removed := fake.Removed(); len(removed) != 0 {
+1 -1
knot.go
··· 120 120 func startKnotConsumer(ctx context.Context, cfg config, st *store, provider Provider) (*knotConsumer, error) { 121 121 logger := loggerFrom(ctx).With("component", "knotconsumer") 122 122 123 - knots, err := st.KnotsForSpindle(ctx, cfg.Hostname) 123 + knots, err := st.KnotsForSpindle(ctx, cfg.Hostname, cfg.OwnerDID) 124 124 if err != nil { 125 125 return nil, fmt.Errorf("load known knots: %w", err) 126 126 }
+132 -27
store.go
··· 1 1 package main 2 2 3 - // SQLite-backed persistence for tack. 4 - // 5 - // Two responsibilities live here: 3 + // SQLite-backed persistence for tack. Holds: 6 4 // 7 - // 1. The jetstream cursor — a microsecond unix timestamp the AT Proto 8 - // firehose uses to resume from a specific point. Without persistence 9 - // every restart begins at "now" and we'd silently miss any record 10 - // published while we were down. 11 - // 12 - // 2. Tangled membership state derived from jetstream commits: 13 - // sh.tangled.spindle.member, sh.tangled.repo, and 14 - // sh.tangled.repo.collaborator. We need this to later answer 15 - // "is DID X allowed to trigger a build on this spindle for repo Y?" 5 + // - the jetstream cursor, so restarts resume the firehose where the 6 + // previous run left off 7 + // - mirrored Tangled state (spindle members, repos, collaborators) 8 + // and the authorization helpers built on top of it, used both to 9 + // gate inbound pipeline triggers and to decide which knots we 10 + // should be subscribed to 11 + // - the outbound event log fed to /events websocket subscribers 12 + // - the Buildkite build → pipeline mapping the webhook and /logs 13 + // handlers look up 16 14 // 17 - // We use mattn/go-sqlite3, which is the most battle-tested SQLite driver 18 - // for Go. It requires CGo, which is fine for tack — the project already 19 - // builds under Nix where a C toolchain is readily available. 15 + // Per-method docs go into more detail. 20 16 21 17 import ( 22 18 "context" ··· 294 290 return true, "", nil 295 291 } 296 292 297 - // IsKnotWanted reports whether any repo currently stored still names the 298 - // given hostname as its spindle and the given knot as its host. After a 299 - // repo update or delete this is the question we ask to decide whether 300 - // to keep watching that knot or unsubscribe from it. 301 - func (s *store) IsKnotWanted(ctx context.Context, hostname, knot string) (bool, error) { 293 + // IsKnotWanted reports whether any *authorized* repo currently stored 294 + // still names the given hostname as its spindle and the given knot as 295 + // its host. After a repo update or delete this is the question we ask 296 + // to decide whether to keep watching that knot or unsubscribe from it. 297 + // 298 + // "Authorized" means the repo's publisher (the row's `did` column) is 299 + // either the spindle owner or has been vouched for by the spindle owner 300 + // via a sh.tangled.spindle.member record. Without this filter, a non- 301 + // member could pin us to an arbitrary attacker-chosen knot just by 302 + // publishing a sh.tangled.repo record naming us as its spindle. See 303 + // the matching gate in IsAuthorizedActor and AuthorizePipelineActor. 304 + func (s *store) IsKnotWanted(ctx context.Context, hostname, ownerDID, knot string) (bool, error) { 302 305 var n int 303 306 err := s.db.QueryRowContext(ctx, 304 - `SELECT COUNT(*) FROM repos WHERE spindle = ? AND knot = ?`, 305 - hostname, knot, 307 + `SELECT COUNT(*) FROM repos r 308 + WHERE r.spindle = ? AND r.knot = ? 309 + AND ( 310 + r.did = ? 311 + OR EXISTS ( 312 + SELECT 1 FROM spindle_members m 313 + WHERE m.did = ? AND m.subject = r.did 314 + ) 315 + )`, 316 + hostname, knot, ownerDID, ownerDID, 306 317 ).Scan(&n) 307 318 if err != nil { 308 319 return false, fmt.Errorf("count repos for knot: %w", err) ··· 310 321 return n > 0, nil 311 322 } 312 323 324 + // IsAuthorizedActor reports whether did is the spindle owner or has 325 + // been authorized by the spindle owner via a sh.tangled.spindle.member 326 + // record. The membership record's publisher (its `did` column) must 327 + // equal ownerDID; anyone can publish a membership record naming 328 + // anyone, so trusting unsigned grants would let any DID grant itself 329 + // access. This is the same trust rule AuthorizePipelineActor enforces; 330 + // it's pulled out here so knot-subscription decisions in jetstream.go 331 + // gate on the same check as pipeline-spawning decisions in knot.go. 332 + func (s *store) IsAuthorizedActor(ctx context.Context, ownerDID, did string) (bool, error) { 333 + if did == "" { 334 + return false, nil 335 + } 336 + if did == ownerDID { 337 + return true, nil 338 + } 339 + var n int 340 + err := s.db.QueryRowContext(ctx, 341 + `SELECT COUNT(*) FROM spindle_members 342 + WHERE did = ? AND subject = ?`, 343 + ownerDID, did, 344 + ).Scan(&n) 345 + if err != nil { 346 + return false, fmt.Errorf("count membership: %w", err) 347 + } 348 + return n > 0, nil 349 + } 350 + 351 + // GetSpindleMember returns the subject DID currently stored for a 352 + // (did, rkey) spindle.member row, or "" when no such row exists. 353 + // 354 + // Used by the jetstream handler to learn whose authorization a 355 + // delete/update of a membership record affects, so it can reconcile 356 + // that subject's knot subscriptions in the same step as the mutation. 357 + func (s *store) GetSpindleMember(ctx context.Context, did, rkey string) (string, error) { 358 + var subject string 359 + err := s.db.QueryRowContext(ctx, 360 + `SELECT subject FROM spindle_members WHERE did = ? AND rkey = ?`, 361 + did, rkey, 362 + ).Scan(&subject) 363 + if errors.Is(err, sql.ErrNoRows) { 364 + return "", nil 365 + } 366 + if err != nil { 367 + return "", fmt.Errorf("get spindle_member: %w", err) 368 + } 369 + return subject, nil 370 + } 371 + 372 + // KnotsForOwner returns the distinct knot hostnames of repos published 373 + // by `did` whose spindle field equals hostname. Used after a membership 374 + // change to find which knots that DID's repos want, so we can subscribe 375 + // (newly granted) or potentially unsubscribe (revoked) in lockstep with 376 + // the grant. 377 + // 378 + // Returns an empty slice (not nil) on no matches so callers can range 379 + // over the result without a nil check. 380 + func (s *store) KnotsForOwner(ctx context.Context, hostname, did string) ([]string, error) { 381 + rows, err := s.db.QueryContext(ctx, 382 + `SELECT DISTINCT knot FROM repos 383 + WHERE did = ? AND spindle = ? AND knot <> ''`, 384 + did, hostname, 385 + ) 386 + if err != nil { 387 + return nil, fmt.Errorf("query knots for owner: %w", err) 388 + } 389 + defer rows.Close() 390 + out := []string{} 391 + for rows.Next() { 392 + var k string 393 + if err := rows.Scan(&k); err != nil { 394 + return nil, fmt.Errorf("scan knot: %w", err) 395 + } 396 + out = append(out, k) 397 + } 398 + if err := rows.Err(); err != nil { 399 + return nil, fmt.Errorf("iterate knots: %w", err) 400 + } 401 + return out, nil 402 + } 403 + 313 404 // KnotsForSpindle returns the distinct knot hostnames of all repos that 314 - // have declared the given spindle hostname as their CI spindle. The knot 315 - // event-stream subscriber uses this to decide which knots to dial. 405 + // (a) have declared the given spindle hostname as their CI spindle, and 406 + // (b) were published by an *authorized* DID: either the spindle owner 407 + // itself or a member they vouched for via sh.tangled.spindle.member. 408 + // 409 + // The membership filter is critical: without it, any DID that publishes 410 + // a sh.tangled.repo record naming us as its spindle could force us to 411 + // dial an attacker-chosen knot at startup. See IsAuthorizedActor for 412 + // the matching trust check applied at firehose-event time. 316 413 // 317 414 // Returns an empty slice (not nil) when nothing matches, so callers can 318 415 // range over the result without a nil check. 319 - func (s *store) KnotsForSpindle(ctx context.Context, hostname string) ([]string, error) { 416 + func (s *store) KnotsForSpindle(ctx context.Context, hostname, ownerDID string) ([]string, error) { 320 417 rows, err := s.db.QueryContext(ctx, 321 - `SELECT DISTINCT knot FROM repos WHERE spindle = ? AND knot <> ''`, 322 - hostname, 418 + `SELECT DISTINCT r.knot FROM repos r 419 + WHERE r.spindle = ? AND r.knot <> '' 420 + AND ( 421 + r.did = ? 422 + OR EXISTS ( 423 + SELECT 1 FROM spindle_members m 424 + WHERE m.did = ? AND m.subject = r.did 425 + ) 426 + )`, 427 + hostname, ownerDID, ownerDID, 323 428 ) 324 429 if err != nil { 325 430 return nil, fmt.Errorf("query knots: %w", err)
+32 -10
store_test.go
··· 241 241 } 242 242 243 243 // TestKnotsForSpindle verifies the query returns only knots from repos 244 - // whose .spindle field matches the given hostname, and that duplicate 245 - // knots collapse to a single entry. 244 + // whose .spindle field matches the given hostname *and* whose publisher 245 + // is an authorized actor (owner or owner-vouched member). Duplicate 246 + // knots collapse to a single entry. Repos from non-members are excluded: 247 + // that's the gate that prevents an attacker-published repo record from 248 + // forcing us to dial an attacker-chosen knot. 246 249 func TestKnotsForSpindle(t *testing.T) { 247 250 s := newTestStore(t) 248 251 ctx := context.Background() 249 252 250 253 const ours = "tack.example" 251 254 const other = "other.example" 255 + const owner = "did:plc:owner" 252 256 253 - // Two repos on the same knot pointing at us — should collapse to 1. 257 + // Vouch for did:plc:a and did:plc:b; did:plc:c is the owner so 258 + // no membership grant is needed; did:plc:zzz is unvouched. 259 + if err := s.UpsertSpindleMember(ctx, owner, "mk1", ours, "did:plc:a", "t"); err != nil { 260 + t.Fatal(err) 261 + } 262 + if err := s.UpsertSpindleMember(ctx, owner, "mk2", ours, "did:plc:b", "t"); err != nil { 263 + t.Fatal(err) 264 + } 265 + 266 + // Two member-published repos on the same knot pointing at us; 267 + // should collapse to one entry. 254 268 if err := s.UpsertRepo(ctx, "did:plc:a", "rk1", "knot1.example", "repo-a", ours, "", "t"); err != nil { 255 269 t.Fatal(err) 256 270 } 257 271 if err := s.UpsertRepo(ctx, "did:plc:b", "rk2", "knot1.example", "repo-b", ours, "", "t"); err != nil { 258 272 t.Fatal(err) 259 273 } 260 - // A second knot pointing at us. 261 - if err := s.UpsertRepo(ctx, "did:plc:c", "rk3", "knot2.example", "repo-c", ours, "", "t"); err != nil { 274 + // Owner-published repo on a second knot. Owner is implicitly 275 + // authorized and must be included. 276 + if err := s.UpsertRepo(ctx, owner, "rk3", "knot2.example", "repo-c", ours, "", "t"); err != nil { 277 + t.Fatal(err) 278 + } 279 + // Repo pointing at a different spindle: must be excluded. 280 + if err := s.UpsertRepo(ctx, "did:plc:a", "rk4", "knot3.example", "repo-d", other, "", "t"); err != nil { 262 281 t.Fatal(err) 263 282 } 264 - // A repo pointing at a different spindle — must be excluded. 265 - if err := s.UpsertRepo(ctx, "did:plc:d", "rk4", "knot3.example", "repo-d", other, "", "t"); err != nil { 283 + // Repo with no spindle declared: must be excluded. 284 + if err := s.UpsertRepo(ctx, "did:plc:a", "rk5", "knot4.example", "repo-e", "", "", "t"); err != nil { 266 285 t.Fatal(err) 267 286 } 268 - // A repo with no spindle declared — must be excluded. 269 - if err := s.UpsertRepo(ctx, "did:plc:e", "rk5", "knot4.example", "repo-e", "", "", "t"); err != nil { 287 + // Repo published by an unvouched DID: must be excluded even 288 + // though it points at us. This is the security-relevant case: 289 + // without the membership filter, a stranger could pin us to 290 + // "evil.example" just by publishing a sh.tangled.repo record. 291 + if err := s.UpsertRepo(ctx, "did:plc:zzz", "rk6", "evil.example", "repo-z", ours, "", "t"); err != nil { 270 292 t.Fatal(err) 271 293 } 272 294 273 - got, err := s.KnotsForSpindle(ctx, ours) 295 + got, err := s.KnotsForSpindle(ctx, ours, owner) 274 296 if err != nil { 275 297 t.Fatalf("KnotsForSpindle: %v", err) 276 298 }