Stitch any CI into Tangled
105
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 732 lines 25 kB view raw
1package main 2 3// Tests for handleJetstreamEvent. We exercise the full path — collection 4// dispatch, record decoding, store mutation, and cursor advancement — 5// using an in-memory store from store_test.go's newTestStore helper. 6// 7// Events are constructed by hand rather than recorded from a live 8// jetstream so the tests don't need network access and stay fast. 9 10import ( 11 "context" 12 "encoding/json" 13 "path/filepath" 14 "testing" 15 16 jsmodels "github.com/bluesky-social/jetstream/pkg/models" 17 js "go.mitchellh.com/tack/internal/jetstream" 18 "tangled.org/core/api/tangled" 19) 20 21// commitEvent builds a jetstream commit event for tests. timeUS is the 22// cursor value the handler should persist after applying. record can be 23// nil for delete operations, which carry no body. 24func commitEvent(timeUS int64, did, collection, op, rkey string, record any) *jsmodels.Event { 25 var raw json.RawMessage 26 if record != nil { 27 b, err := json.Marshal(record) 28 if err != nil { 29 panic(err) // test helper — callers control the input 30 } 31 raw = b 32 } 33 return &jsmodels.Event{ 34 Did: did, 35 TimeUS: timeUS, 36 Kind: jsmodels.EventKindCommit, 37 Commit: &jsmodels.Commit{ 38 Operation: op, 39 Collection: collection, 40 RKey: rkey, 41 Record: raw, 42 }, 43 } 44} 45 46// requireCursor asserts the persisted cursor equals want. Pulled out 47// because almost every test in this file checks it. 48func requireCursor(t *testing.T, s *store, want int64) { 49 t.Helper() 50 got, err := s.LoadCursor(context.Background()) 51 if err != nil { 52 t.Fatalf("load cursor: %v", err) 53 } 54 if got == nil || *got != want { 55 t.Fatalf("cursor = %v, want %d", got, want) 56 } 57} 58 59// handleJetstreamEvent is the testable per-event entry point. The generic 60// jetstream processor owns cursor advancement; this wrapper only supplies 61// Tack's collection list and commit application callback. 62func handleJetstreamEvent( 63 ctx context.Context, 64 st *store, 65 knots KnotConsumer, 66 hostname, ownerDID string, 67 evt *jsmodels.Event, 68) error { 69 processor := &js.Processor{ 70 Collections: jetstreamCollections, 71 CursorStore: st, 72 Handler: js.HandlerFunc(func(ctx context.Context, evt *jsmodels.Event) error { 73 return applyCommit(ctx, st, knots, hostname, ownerDID, evt) 74 }), 75 Logger: loggerFrom(ctx), 76 } 77 return processor.HandleEvent(ctx, evt) 78} 79 80// TestHandleNonCommitEvent confirms account/identity events are ignored 81// without error and without advancing the cursor — they don't have a 82// TimeUS we want to commit to. 83func TestHandleNonCommitEvent(t *testing.T) { 84 s := newTestStore(t) 85 ctx := context.Background() 86 87 evt := &jsmodels.Event{ 88 Did: "did:plc:foo", 89 TimeUS: 100, 90 Kind: jsmodels.EventKindAccount, 91 } 92 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil { 93 t.Fatalf("handle: %v", err) 94 } 95 got, err := s.LoadCursor(ctx) 96 if err != nil { 97 t.Fatalf("load cursor: %v", err) 98 } 99 if got != nil { 100 t.Fatalf("expected no cursor for non-commit, got %d", *got) 101 } 102} 103 104// TestHandleSpindleMemberCreate exercises the happy path: a create 105// commit lands a row in spindle_members and advances the cursor. 106func TestHandleSpindleMemberCreate(t *testing.T) { 107 s := newTestStore(t) 108 ctx := context.Background() 109 110 rec := tangled.SpindleMember{ 111 Instance: "https://spindle.example", 112 Subject: "did:plc:alice", 113 CreatedAt: "2026-01-01T00:00:00Z", 114 } 115 evt := commitEvent(12345, "did:plc:owner", tangled.SpindleMemberNSID, jsOpCreate, "rk1", rec) 116 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil { 117 t.Fatalf("handle: %v", err) 118 } 119 120 var instance, subject string 121 err := s.db.QueryRowContext(ctx, 122 `SELECT instance, subject FROM spindle_members WHERE did = ? AND rkey = ?`, 123 "did:plc:owner", "rk1", 124 ).Scan(&instance, &subject) 125 if err != nil { 126 t.Fatalf("query: %v", err) 127 } 128 if instance != "https://spindle.example" || subject != "did:plc:alice" { 129 t.Fatalf("got (%q,%q)", instance, subject) 130 } 131 requireCursor(t, s, 12345) 132} 133 134// TestHandleSpindleMemberDelete confirms a delete commit removes the 135// previously-persisted row. 136func TestHandleSpindleMemberDelete(t *testing.T) { 137 s := newTestStore(t) 138 ctx := context.Background() 139 140 if err := s.UpsertSpindleMember(ctx, "did:plc:owner", "rk1", "i", "s", "t"); err != nil { 141 t.Fatalf("seed: %v", err) 142 } 143 evt := commitEvent(99, "did:plc:owner", tangled.SpindleMemberNSID, jsOpDelete, "rk1", nil) 144 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil { 145 t.Fatalf("handle: %v", err) 146 } 147 if n := countRows(t, s, "spindle_members"); n != 0 { 148 t.Fatalf("after delete: %d rows, want 0", n) 149 } 150 requireCursor(t, s, 99) 151} 152 153// TestHandleRepoCreateOptionals verifies pointer-typed optional fields 154// on tangled.Repo are derefed correctly (and nils become empty strings) 155// when written to the store. 156func TestHandleRepoCreateOptionals(t *testing.T) { 157 s := newTestStore(t) 158 ctx := context.Background() 159 160 spindle := "https://spindle.example" 161 repoDid := "did:plc:repo" 162 rec := tangled.Repo{ 163 Knot: "knot.example", 164 Name: "myrepo", 165 Spindle: &spindle, 166 RepoDid: &repoDid, 167 CreatedAt: "2026-01-01T00:00:00Z", 168 } 169 evt := commitEvent(7, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "repo1", rec) 170 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil { 171 t.Fatalf("handle: %v", err) 172 } 173 174 var gotSpindle, gotRepoDid string 175 err := s.db.QueryRowContext(ctx, 176 `SELECT spindle, repo_did FROM repos WHERE did = ? AND rkey = ?`, 177 "did:plc:owner", "repo1", 178 ).Scan(&gotSpindle, &gotRepoDid) 179 if err != nil { 180 t.Fatalf("query: %v", err) 181 } 182 if gotSpindle != spindle || gotRepoDid != repoDid { 183 t.Fatalf("got (%q,%q)", gotSpindle, gotRepoDid) 184 } 185 186 // Now a record with both optionals nil — should land as empty 187 // strings, not crash on a nil dereference in deref(). 188 rec2 := tangled.Repo{ 189 Knot: "knot.example", 190 Name: "other", 191 CreatedAt: "2026-01-01T00:00:00Z", 192 } 193 evt2 := commitEvent(8, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "repo2", rec2) 194 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt2); err != nil { 195 t.Fatalf("handle nil-optionals: %v", err) 196 } 197 err = s.db.QueryRowContext(ctx, 198 `SELECT spindle, repo_did FROM repos WHERE did = ? AND rkey = ?`, 199 "did:plc:owner", "repo2", 200 ).Scan(&gotSpindle, &gotRepoDid) 201 if err != nil { 202 t.Fatalf("query nil-optionals: %v", err) 203 } 204 if gotSpindle != "" || gotRepoDid != "" { 205 t.Fatalf("nil optionals: got (%q,%q), want both empty", gotSpindle, gotRepoDid) 206 } 207 requireCursor(t, s, 8) 208} 209 210// TestHandleRepoCollaboratorCreate covers the third dispatch arm so each 211// collection has at least one apply test. 212func TestHandleRepoCollaboratorCreate(t *testing.T) { 213 s := newTestStore(t) 214 ctx := context.Background() 215 216 repo := "myrepo" 217 rec := tangled.RepoCollaborator{ 218 Repo: &repo, 219 Subject: "did:plc:carol", 220 CreatedAt: "2026-01-01T00:00:00Z", 221 } 222 evt := commitEvent(55, "did:plc:owner", tangled.RepoCollaboratorNSID, jsOpCreate, "c1", rec) 223 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil { 224 t.Fatalf("handle: %v", err) 225 } 226 227 var subject string 228 err := s.db.QueryRowContext(ctx, 229 `SELECT subject FROM repo_collaborators WHERE did = ? AND rkey = ?`, 230 "did:plc:owner", "c1", 231 ).Scan(&subject) 232 if err != nil { 233 t.Fatalf("query: %v", err) 234 } 235 if subject != "did:plc:carol" { 236 t.Fatalf("subject = %q", subject) 237 } 238 requireCursor(t, s, 55) 239} 240 241// TestHandleUnknownCollection makes sure a collection we didn't ask for 242// (jetstream filter changes, schema drift) is silently dropped — no 243// error, no row, but cursor still advances so we don't replay it. 244func TestHandleUnknownCollection(t *testing.T) { 245 s := newTestStore(t) 246 ctx := context.Background() 247 248 evt := commitEvent(42, "did:plc:owner", "app.bsky.feed.post", jsOpCreate, "rk", map[string]string{"text": "hi"}) 249 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil { 250 t.Fatalf("handle: %v", err) 251 } 252 requireCursor(t, s, 42) 253} 254 255// TestHandleBadRecordAdvancesCursor is the failure-mode counterpart to 256// the happy paths: a malformed record body must be logged-and-skipped 257// (not returned as an error that pauses the scheduler) and the cursor 258// must still advance so we don't loop on the same bad event forever. 259func TestHandleBadRecordAdvancesCursor(t *testing.T) { 260 s := newTestStore(t) 261 ctx := context.Background() 262 263 evt := &jsmodels.Event{ 264 Did: "did:plc:owner", 265 TimeUS: 1000, 266 Kind: jsmodels.EventKindCommit, 267 Commit: &jsmodels.Commit{ 268 Operation: jsOpCreate, 269 Collection: tangled.SpindleMemberNSID, 270 RKey: "broken", 271 Record: json.RawMessage(`{not valid json`), 272 }, 273 } 274 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil { 275 t.Fatalf("handle should swallow decode error, got: %v", err) 276 } 277 if n := countRows(t, s, "spindle_members"); n != 0 { 278 t.Fatalf("bad record should not have inserted; got %d rows", n) 279 } 280 requireCursor(t, s, 1000) 281} 282 283// TestHandleTransientStoreErrorDoesNotAdvanceCursor is the partner to 284// TestHandleBadRecordAdvancesCursor: the cursor must hold its previous 285// position when applyCommit fails for an *infrastructure* reason 286// (here: store closed mid-flight, simulating SQLite busy / shutdown 287// races). Saving the cursor through such a failure would permanently 288// skip a perfectly good record and silently lose membership state. 289func TestHandleTransientStoreErrorDoesNotAdvanceCursor(t *testing.T) { 290 // Build the store inline (not via newTestStore) so the cleanup 291 // tolerates the explicit Close we do below to provoke errors. 292 path := filepath.Join(t.TempDir(), "tack.db") 293 s, err := openStore(path) 294 if err != nil { 295 t.Fatalf("openStore: %v", err) 296 } 297 ctx := context.Background() 298 299 // Seed a baseline cursor; after the failed apply it must still be 300 // 500, not 1000. 301 if err := s.SaveCursor(ctx, 500); err != nil { 302 t.Fatalf("seed cursor: %v", err) 303 } 304 305 // Close the underlying DB. Subsequent store calls will fail with 306 // "sql: database is closed", which is a transient-style error from 307 // applyCommit's perspective: the record itself is well-formed. 308 if err := s.db.Close(); err != nil { 309 t.Fatalf("close db: %v", err) 310 } 311 312 rec := tangled.SpindleMember{ 313 Instance: "https://spindle.example", 314 Subject: "did:plc:alice", 315 CreatedAt: "2026-01-01T00:00:00Z", 316 } 317 evt := commitEvent(1000, "did:plc:owner", tangled.SpindleMemberNSID, jsOpCreate, "rk", rec) 318 319 // Expect a non-nil error: handler propagates transient failures. 320 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err == nil { 321 t.Fatalf("handle should return transient error, got nil") 322 } 323 324 // Re-open the same DB file to inspect the persisted cursor; the 325 // closed handle obviously can't answer queries. 326 s2, err := openStore(path) 327 if err != nil { 328 t.Fatalf("re-open: %v", err) 329 } 330 defer s2.Close() 331 got, err := s2.LoadCursor(ctx) 332 if err != nil { 333 t.Fatalf("load cursor: %v", err) 334 } 335 if got == nil || *got != 500 { 336 t.Fatalf("cursor = %v, want 500 (must NOT advance on transient error)", got) 337 } 338} 339 340// TestRepoEventSubscribesKnotForOurSpindle confirms that observing a 341// sh.tangled.repo whose .spindle field equals our hostname results in a 342// dynamic AddKnot call. This is the hot path for picking up new repos 343// without a tack restart. 344func TestRepoEventSubscribesKnotForOurSpindle(t *testing.T) { 345 s := newTestStore(t) 346 ctx := context.Background() 347 348 const ours = "tack.example" 349 spindle := ours 350 rec := tangled.Repo{ 351 Knot: "knot.example", 352 Name: "myrepo", 353 Spindle: &spindle, 354 CreatedAt: "2026-01-01T00:00:00Z", 355 } 356 evt := commitEvent(1, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "rk", rec) 357 358 fake := &fakeKnotConsumer{} 359 if err := handleJetstreamEvent(ctx, s, fake, ours, "did:plc:owner", evt); err != nil { 360 t.Fatalf("handle: %v", err) 361 } 362 added := fake.Added() 363 if len(added) != 1 || added[0] != "knot.example" { 364 t.Fatalf("AddKnot calls = %v, want [knot.example]", added) 365 } 366} 367 368// TestRepoEventIgnoresUnauthorizedPublisher pins the high-severity 369// security gate: a sh.tangled.repo record naming us as its spindle 370// must NOT cause an outbound knot subscription unless its publisher 371// is the spindle owner or an owner-vouched member. Without this gate 372// any firehose publisher could force tack to dial an attacker-chosen 373// websocket simply by minting a matching repo record. 374func TestRepoEventIgnoresUnauthorizedPublisher(t *testing.T) { 375 s := newTestStore(t) 376 ctx := context.Background() 377 378 const ours = "tack.example" 379 const owner = "did:plc:owner" 380 381 // did:plc:rando is neither the owner nor a member. 382 spindle := ours 383 rec := tangled.Repo{ 384 Knot: "evil.example", 385 Name: "myrepo", 386 Spindle: &spindle, 387 CreatedAt: "2026-01-01T00:00:00Z", 388 } 389 evt := commitEvent(1, "did:plc:rando", tangled.RepoNSID, jsOpCreate, "rk", rec) 390 391 fake := &fakeKnotConsumer{} 392 if err := handleJetstreamEvent(ctx, s, fake, ours, owner, evt); err != nil { 393 t.Fatalf("handle: %v", err) 394 } 395 if added := fake.Added(); len(added) != 0 { 396 t.Fatalf("AddKnot calls = %v, want none (publisher is unauthorized)", added) 397 } 398} 399 400// TestSpindleMemberGrantSubscribesPendingKnots covers the case where 401// a member publishes their sh.tangled.repo *before* the owner's 402// sh.tangled.spindle.member grant arrives over the firehose. Until 403// the grant lands the publisher is unauthorized, so the repo doesn't 404// pull a subscription. Once the grant arrives, the same-rkey reconcile 405// must catch up by AddKnot-ing every knot that member's pending repos 406// already named. 407func TestSpindleMemberGrantSubscribesPendingKnots(t *testing.T) { 408 s := newTestStore(t) 409 ctx := context.Background() 410 411 const ours = "tack.example" 412 const owner = "did:plc:owner" 413 const alice = "did:plc:alice" 414 415 // Step 1: alice publishes a repo claiming us. She isn't a member 416 // yet, so no subscription should happen. 417 spindle := ours 418 repoRec := tangled.Repo{ 419 Knot: "knot.example", 420 Name: "myrepo", 421 Spindle: &spindle, 422 CreatedAt: "2026-01-01T00:00:00Z", 423 } 424 repoEvt := commitEvent(1, alice, tangled.RepoNSID, jsOpCreate, "rk", repoRec) 425 426 fake := &fakeKnotConsumer{} 427 if err := handleJetstreamEvent(ctx, s, fake, ours, owner, repoEvt); err != nil { 428 t.Fatalf("handle repo: %v", err) 429 } 430 if added := fake.Added(); len(added) != 0 { 431 t.Fatalf("AddKnot before grant = %v, want none", added) 432 } 433 434 // Step 2: owner publishes a membership grant naming alice. The 435 // reconcile triggered by that grant must subscribe to alice's 436 // already-stored repo's knot. 437 grant := tangled.SpindleMember{ 438 Instance: ours, 439 Subject: alice, 440 CreatedAt: "2026-01-02T00:00:00Z", 441 } 442 grantEvt := commitEvent(2, owner, tangled.SpindleMemberNSID, jsOpCreate, "mk", grant) 443 if err := handleJetstreamEvent(ctx, s, fake, ours, owner, grantEvt); err != nil { 444 t.Fatalf("handle grant: %v", err) 445 } 446 if added := fake.Added(); len(added) != 1 || added[0] != "knot.example" { 447 t.Fatalf("AddKnot after grant = %v, want [knot.example]", added) 448 } 449} 450 451// TestSpindleMemberRevokeUnsubscribesKnot verifies that revoking a 452// previously-granted membership tears down the only-held-by-that-member 453// knot subscription. Without this, an attacker who briefly held 454// membership could leave us dialing their chosen knot until restart. 455func TestSpindleMemberRevokeUnsubscribesKnot(t *testing.T) { 456 s := newTestStore(t) 457 ctx := context.Background() 458 459 const ours = "tack.example" 460 const owner = "did:plc:owner" 461 const alice = "did:plc:alice" 462 463 // Seed: owner has previously vouched for alice, and alice has 464 // already published a repo on knot.example pointing at us. 465 if err := s.UpsertSpindleMember(ctx, owner, "mk", ours, alice, "t"); err != nil { 466 t.Fatal(err) 467 } 468 if err := s.UpsertRepo(ctx, alice, "rk", "knot.example", "myrepo", ours, "", "t"); err != nil { 469 t.Fatal(err) 470 } 471 472 // Owner publishes a delete of the membership grant. 473 revoke := commitEvent(10, owner, tangled.SpindleMemberNSID, jsOpDelete, "mk", nil) 474 475 fake := &fakeKnotConsumer{} 476 if err := handleJetstreamEvent(ctx, s, fake, ours, owner, revoke); err != nil { 477 t.Fatalf("handle revoke: %v", err) 478 } 479 if removed := fake.Removed(); len(removed) != 1 || removed[0] != "knot.example" { 480 t.Fatalf("RemoveKnot calls = %v, want [knot.example]", removed) 481 } 482} 483 484// TestSpindleMemberGrantFromNonOwnerIgnored confirms that a forged 485// membership record published by anyone other than the spindle owner 486// does NOT trigger a knot subscription, even if a matching repo for 487// the named subject exists. This mirrors AuthorizePipelineActor's 488// rule that the membership grant's publisher must equal the spindle 489// owner. 490func TestSpindleMemberGrantFromNonOwnerIgnored(t *testing.T) { 491 s := newTestStore(t) 492 ctx := context.Background() 493 494 const ours = "tack.example" 495 const owner = "did:plc:owner" 496 const alice = "did:plc:alice" 497 498 if err := s.UpsertRepo(ctx, alice, "rk", "knot.example", "myrepo", ours, "", "t"); err != nil { 499 t.Fatal(err) 500 } 501 502 // Forged grant: alice "vouches" for herself. Stored, but must 503 // not flip her into authorized status. 504 forgery := tangled.SpindleMember{ 505 Instance: ours, 506 Subject: alice, 507 CreatedAt: "2026-01-02T00:00:00Z", 508 } 509 evt := commitEvent(2, alice, tangled.SpindleMemberNSID, jsOpCreate, "mk", forgery) 510 511 fake := &fakeKnotConsumer{} 512 if err := handleJetstreamEvent(ctx, s, fake, ours, owner, evt); err != nil { 513 t.Fatalf("handle forged grant: %v", err) 514 } 515 if added := fake.Added(); len(added) != 0 { 516 t.Fatalf("AddKnot calls = %v, want none (grant publisher != owner)", added) 517 } 518} 519 520// TestRepoEventIgnoresKnotForOtherSpindle confirms repos pointing at a 521// *different* spindle do not pull us into watching their knot. Without 522// this guard, tack would dial every knot named in any sh.tangled.repo 523// it sees over the firehose, which is most of them. 524func TestRepoEventIgnoresKnotForOtherSpindle(t *testing.T) { 525 s := newTestStore(t) 526 ctx := context.Background() 527 528 other := "other-spindle.example" 529 rec := tangled.Repo{ 530 Knot: "knot.example", 531 Name: "myrepo", 532 Spindle: &other, 533 CreatedAt: "2026-01-01T00:00:00Z", 534 } 535 evt := commitEvent(1, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "rk", rec) 536 537 fake := &fakeKnotConsumer{} 538 if err := handleJetstreamEvent(ctx, s, fake, "tack.example", "did:plc:owner", evt); err != nil { 539 t.Fatalf("handle: %v", err) 540 } 541 if added := fake.Added(); len(added) != 0 { 542 t.Fatalf("AddKnot calls = %v, want none", added) 543 } 544} 545 546// TestRepoUpdateSpindleAwayFromUsRemovesKnot covers the case where a 547// repo we'd previously been watching gets its .spindle field flipped to 548// some other spindle. Once that's the only repo we had on that knot, 549// the reconciliation must call RemoveKnot. 550func TestRepoUpdateSpindleAwayFromUsRemovesKnot(t *testing.T) { 551 s := newTestStore(t) 552 ctx := context.Background() 553 554 const ours = "tack.example" 555 const knot = "knot.example" 556 557 // Seed: a repo that names us as its spindle on `knot`. 558 if err := s.UpsertRepo(ctx, "did:plc:a", "rk", knot, "repo-a", ours, "", "t"); err != nil { 559 t.Fatal(err) 560 } 561 562 // Update: same record, now points at a different spindle. 563 other := "other.example" 564 rec := tangled.Repo{ 565 Knot: knot, 566 Name: "repo-a", 567 Spindle: &other, 568 CreatedAt: "2026-01-01T00:00:00Z", 569 } 570 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpUpdate, "rk", rec) 571 572 fake := &fakeKnotConsumer{} 573 if err := handleJetstreamEvent(ctx, s, fake, ours, "did:plc:owner", evt); err != nil { 574 t.Fatalf("handle: %v", err) 575 } 576 if added := fake.Added(); len(added) != 0 { 577 t.Fatalf("AddKnot calls = %v, want none", added) 578 } 579 if removed := fake.Removed(); len(removed) != 1 || removed[0] != knot { 580 t.Fatalf("RemoveKnot calls = %v, want [%s]", removed, knot) 581 } 582} 583 584// TestRepoUpdateSpindleAwayFromUsKeepsKnotIfShared ensures we don't 585// over-eagerly unsubscribe from a knot when one of multiple repos on 586// that knot moves away from us. The other repo's subscription must keep 587// the knot in our wanted set. 588func TestRepoUpdateSpindleAwayFromUsKeepsKnotIfShared(t *testing.T) { 589 s := newTestStore(t) 590 ctx := context.Background() 591 592 const ours = "tack.example" 593 const knot = "knot.example" 594 595 // Both publishers must be vouched for; otherwise IsKnotWanted's 596 // membership filter would zero them out and we'd unsubscribe 597 // regardless. The reconciliation we want to test only matters 598 // when there is a real authorized hold to keep. 599 if err := s.UpsertSpindleMember(ctx, "did:plc:owner", "mk1", ours, "did:plc:a", "t"); err != nil { 600 t.Fatal(err) 601 } 602 if err := s.UpsertSpindleMember(ctx, "did:plc:owner", "mk2", ours, "did:plc:b", "t"); err != nil { 603 t.Fatal(err) 604 } 605 606 // Two repos sharing one knot, both pointed at us. 607 if err := s.UpsertRepo(ctx, "did:plc:a", "rk1", knot, "a", ours, "", "t"); err != nil { 608 t.Fatal(err) 609 } 610 if err := s.UpsertRepo(ctx, "did:plc:b", "rk2", knot, "b", ours, "", "t"); err != nil { 611 t.Fatal(err) 612 } 613 614 // Repo A flips to a different spindle. B still wants us. 615 other := "other.example" 616 rec := tangled.Repo{ 617 Knot: knot, 618 Name: "a", 619 Spindle: &other, 620 CreatedAt: "2026-01-01T00:00:00Z", 621 } 622 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpUpdate, "rk1", rec) 623 624 fake := &fakeKnotConsumer{} 625 if err := handleJetstreamEvent(ctx, s, fake, ours, "did:plc:owner", evt); err != nil { 626 t.Fatalf("handle: %v", err) 627 } 628 if removed := fake.Removed(); len(removed) != 0 { 629 t.Fatalf("RemoveKnot calls = %v, want none (B still wants us on %s)", removed, knot) 630 } 631} 632 633// TestRepoUpdateChangingKnotSwapsSubscription verifies that a repo 634// staying with us but changing its .knot field unsubscribes the old 635// knot (if no other repo holds it) and subscribes the new one. 636func TestRepoUpdateChangingKnotSwapsSubscription(t *testing.T) { 637 s := newTestStore(t) 638 ctx := context.Background() 639 640 const ours = "tack.example" 641 const oldKnot = "old.example" 642 const newKnot = "new.example" 643 644 // Vouch for did:plc:a so reconcileKnot will actually call 645 // AddKnot for the new host. Without the grant the publisher 646 // is unauthorized and the AddKnot half is (correctly) skipped. 647 if err := s.UpsertSpindleMember(ctx, "did:plc:owner", "mk1", ours, "did:plc:a", "t"); err != nil { 648 t.Fatal(err) 649 } 650 651 if err := s.UpsertRepo(ctx, "did:plc:a", "rk", oldKnot, "a", ours, "", "t"); err != nil { 652 t.Fatal(err) 653 } 654 655 spindle := ours 656 rec := tangled.Repo{ 657 Knot: newKnot, 658 Name: "a", 659 Spindle: &spindle, 660 CreatedAt: "2026-01-01T00:00:00Z", 661 } 662 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpUpdate, "rk", rec) 663 664 fake := &fakeKnotConsumer{} 665 if err := handleJetstreamEvent(ctx, s, fake, ours, "did:plc:owner", evt); err != nil { 666 t.Fatalf("handle: %v", err) 667 } 668 if added := fake.Added(); len(added) != 1 || added[0] != newKnot { 669 t.Fatalf("AddKnot calls = %v, want [%s]", added, newKnot) 670 } 671 if removed := fake.Removed(); len(removed) != 1 || removed[0] != oldKnot { 672 t.Fatalf("RemoveKnot calls = %v, want [%s]", removed, oldKnot) 673 } 674} 675 676// TestRepoDeleteRemovesKnotWhenLast confirms deleting the last repo on 677// a knot we cared about triggers RemoveKnot. 678func TestRepoDeleteRemovesKnotWhenLast(t *testing.T) { 679 s := newTestStore(t) 680 ctx := context.Background() 681 682 const ours = "tack.example" 683 const knot = "knot.example" 684 685 if err := s.UpsertRepo(ctx, "did:plc:a", "rk", knot, "a", ours, "", "t"); err != nil { 686 t.Fatal(err) 687 } 688 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpDelete, "rk", nil) 689 690 fake := &fakeKnotConsumer{} 691 if err := handleJetstreamEvent(ctx, s, fake, ours, "did:plc:owner", evt); err != nil { 692 t.Fatalf("handle: %v", err) 693 } 694 if removed := fake.Removed(); len(removed) != 1 || removed[0] != knot { 695 t.Fatalf("RemoveKnot calls = %v, want [%s]", removed, knot) 696 } 697} 698 699// TestRepoDeleteKeepsKnotIfShared ensures deleting one of multiple 700// repos on a knot does not unsubscribe — the survivors still want it. 701func TestRepoDeleteKeepsKnotIfShared(t *testing.T) { 702 s := newTestStore(t) 703 ctx := context.Background() 704 705 const ours = "tack.example" 706 const knot = "knot.example" 707 708 // Both publishers vouched for; otherwise IsKnotWanted would 709 // (correctly) report neither holds the knot and we'd unsubscribe. 710 if err := s.UpsertSpindleMember(ctx, "did:plc:owner", "mk1", ours, "did:plc:a", "t"); err != nil { 711 t.Fatal(err) 712 } 713 if err := s.UpsertSpindleMember(ctx, "did:plc:owner", "mk2", ours, "did:plc:b", "t"); err != nil { 714 t.Fatal(err) 715 } 716 717 if err := s.UpsertRepo(ctx, "did:plc:a", "rk1", knot, "a", ours, "", "t"); err != nil { 718 t.Fatal(err) 719 } 720 if err := s.UpsertRepo(ctx, "did:plc:b", "rk2", knot, "b", ours, "", "t"); err != nil { 721 t.Fatal(err) 722 } 723 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpDelete, "rk1", nil) 724 725 fake := &fakeKnotConsumer{} 726 if err := handleJetstreamEvent(ctx, s, fake, ours, "did:plc:owner", evt); err != nil { 727 t.Fatalf("handle: %v", err) 728 } 729 if removed := fake.Removed(); len(removed) != 0 { 730 t.Fatalf("RemoveKnot calls = %v, want none", removed) 731 } 732}