Stitch any CI into Tangled
1package main
2
3// Tests for handleJetstreamEvent. We exercise the full path — collection
4// dispatch, record decoding, store mutation, and cursor advancement —
5// using an in-memory store from store_test.go's newTestStore helper.
6//
7// Events are constructed by hand rather than recorded from a live
8// jetstream so the tests don't need network access and stay fast.
9
10import (
11 "context"
12 "encoding/json"
13 "path/filepath"
14 "testing"
15
16 jsmodels "github.com/bluesky-social/jetstream/pkg/models"
17 js "go.mitchellh.com/tack/internal/jetstream"
18 "tangled.org/core/api/tangled"
19)
20
21// commitEvent builds a jetstream commit event for tests. timeUS is the
22// cursor value the handler should persist after applying. record can be
23// nil for delete operations, which carry no body.
24func commitEvent(timeUS int64, did, collection, op, rkey string, record any) *jsmodels.Event {
25 var raw json.RawMessage
26 if record != nil {
27 b, err := json.Marshal(record)
28 if err != nil {
29 panic(err) // test helper — callers control the input
30 }
31 raw = b
32 }
33 return &jsmodels.Event{
34 Did: did,
35 TimeUS: timeUS,
36 Kind: jsmodels.EventKindCommit,
37 Commit: &jsmodels.Commit{
38 Operation: op,
39 Collection: collection,
40 RKey: rkey,
41 Record: raw,
42 },
43 }
44}
45
46// requireCursor asserts the persisted cursor equals want. Pulled out
47// because almost every test in this file checks it.
48func requireCursor(t *testing.T, s *store, want int64) {
49 t.Helper()
50 got, err := s.LoadCursor(context.Background())
51 if err != nil {
52 t.Fatalf("load cursor: %v", err)
53 }
54 if got == nil || *got != want {
55 t.Fatalf("cursor = %v, want %d", got, want)
56 }
57}
58
59// handleJetstreamEvent is the testable per-event entry point. The generic
60// jetstream processor owns cursor advancement; this wrapper only supplies
61// Tack's collection list and commit application callback.
62func handleJetstreamEvent(
63 ctx context.Context,
64 st *store,
65 knots KnotConsumer,
66 hostname, ownerDID string,
67 evt *jsmodels.Event,
68) error {
69 processor := &js.Processor{
70 Collections: jetstreamCollections,
71 CursorStore: st,
72 Handler: js.HandlerFunc(func(ctx context.Context, evt *jsmodels.Event) error {
73 return applyCommit(ctx, st, knots, hostname, ownerDID, evt)
74 }),
75 Logger: loggerFrom(ctx),
76 }
77 return processor.HandleEvent(ctx, evt)
78}
79
80// TestHandleNonCommitEvent confirms account/identity events are ignored
81// without error and without advancing the cursor — they don't have a
82// TimeUS we want to commit to.
83func TestHandleNonCommitEvent(t *testing.T) {
84 s := newTestStore(t)
85 ctx := context.Background()
86
87 evt := &jsmodels.Event{
88 Did: "did:plc:foo",
89 TimeUS: 100,
90 Kind: jsmodels.EventKindAccount,
91 }
92 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil {
93 t.Fatalf("handle: %v", err)
94 }
95 got, err := s.LoadCursor(ctx)
96 if err != nil {
97 t.Fatalf("load cursor: %v", err)
98 }
99 if got != nil {
100 t.Fatalf("expected no cursor for non-commit, got %d", *got)
101 }
102}
103
104// TestHandleSpindleMemberCreate exercises the happy path: a create
105// commit lands a row in spindle_members and advances the cursor.
106func TestHandleSpindleMemberCreate(t *testing.T) {
107 s := newTestStore(t)
108 ctx := context.Background()
109
110 rec := tangled.SpindleMember{
111 Instance: "https://spindle.example",
112 Subject: "did:plc:alice",
113 CreatedAt: "2026-01-01T00:00:00Z",
114 }
115 evt := commitEvent(12345, "did:plc:owner", tangled.SpindleMemberNSID, jsOpCreate, "rk1", rec)
116 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil {
117 t.Fatalf("handle: %v", err)
118 }
119
120 var instance, subject string
121 err := s.db.QueryRowContext(ctx,
122 `SELECT instance, subject FROM spindle_members WHERE did = ? AND rkey = ?`,
123 "did:plc:owner", "rk1",
124 ).Scan(&instance, &subject)
125 if err != nil {
126 t.Fatalf("query: %v", err)
127 }
128 if instance != "https://spindle.example" || subject != "did:plc:alice" {
129 t.Fatalf("got (%q,%q)", instance, subject)
130 }
131 requireCursor(t, s, 12345)
132}
133
134// TestHandleSpindleMemberDelete confirms a delete commit removes the
135// previously-persisted row.
136func TestHandleSpindleMemberDelete(t *testing.T) {
137 s := newTestStore(t)
138 ctx := context.Background()
139
140 if err := s.UpsertSpindleMember(ctx, "did:plc:owner", "rk1", "i", "s", "t"); err != nil {
141 t.Fatalf("seed: %v", err)
142 }
143 evt := commitEvent(99, "did:plc:owner", tangled.SpindleMemberNSID, jsOpDelete, "rk1", nil)
144 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil {
145 t.Fatalf("handle: %v", err)
146 }
147 if n := countRows(t, s, "spindle_members"); n != 0 {
148 t.Fatalf("after delete: %d rows, want 0", n)
149 }
150 requireCursor(t, s, 99)
151}
152
153// TestHandleRepoCreateOptionals verifies pointer-typed optional fields
154// on tangled.Repo are derefed correctly (and nils become empty strings)
155// when written to the store.
156func TestHandleRepoCreateOptionals(t *testing.T) {
157 s := newTestStore(t)
158 ctx := context.Background()
159
160 spindle := "https://spindle.example"
161 repoDid := "did:plc:repo"
162 rec := tangled.Repo{
163 Knot: "knot.example",
164 Name: "myrepo",
165 Spindle: &spindle,
166 RepoDid: &repoDid,
167 CreatedAt: "2026-01-01T00:00:00Z",
168 }
169 evt := commitEvent(7, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "repo1", rec)
170 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil {
171 t.Fatalf("handle: %v", err)
172 }
173
174 var gotSpindle, gotRepoDid string
175 err := s.db.QueryRowContext(ctx,
176 `SELECT spindle, repo_did FROM repos WHERE did = ? AND rkey = ?`,
177 "did:plc:owner", "repo1",
178 ).Scan(&gotSpindle, &gotRepoDid)
179 if err != nil {
180 t.Fatalf("query: %v", err)
181 }
182 if gotSpindle != spindle || gotRepoDid != repoDid {
183 t.Fatalf("got (%q,%q)", gotSpindle, gotRepoDid)
184 }
185
186 // Now a record with both optionals nil — should land as empty
187 // strings, not crash on a nil dereference in deref().
188 rec2 := tangled.Repo{
189 Knot: "knot.example",
190 Name: "other",
191 CreatedAt: "2026-01-01T00:00:00Z",
192 }
193 evt2 := commitEvent(8, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "repo2", rec2)
194 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt2); err != nil {
195 t.Fatalf("handle nil-optionals: %v", err)
196 }
197 err = s.db.QueryRowContext(ctx,
198 `SELECT spindle, repo_did FROM repos WHERE did = ? AND rkey = ?`,
199 "did:plc:owner", "repo2",
200 ).Scan(&gotSpindle, &gotRepoDid)
201 if err != nil {
202 t.Fatalf("query nil-optionals: %v", err)
203 }
204 if gotSpindle != "" || gotRepoDid != "" {
205 t.Fatalf("nil optionals: got (%q,%q), want both empty", gotSpindle, gotRepoDid)
206 }
207 requireCursor(t, s, 8)
208}
209
210// TestHandleRepoCollaboratorCreate covers the third dispatch arm so each
211// collection has at least one apply test.
212func TestHandleRepoCollaboratorCreate(t *testing.T) {
213 s := newTestStore(t)
214 ctx := context.Background()
215
216 repo := "myrepo"
217 rec := tangled.RepoCollaborator{
218 Repo: &repo,
219 Subject: "did:plc:carol",
220 CreatedAt: "2026-01-01T00:00:00Z",
221 }
222 evt := commitEvent(55, "did:plc:owner", tangled.RepoCollaboratorNSID, jsOpCreate, "c1", rec)
223 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil {
224 t.Fatalf("handle: %v", err)
225 }
226
227 var subject string
228 err := s.db.QueryRowContext(ctx,
229 `SELECT subject FROM repo_collaborators WHERE did = ? AND rkey = ?`,
230 "did:plc:owner", "c1",
231 ).Scan(&subject)
232 if err != nil {
233 t.Fatalf("query: %v", err)
234 }
235 if subject != "did:plc:carol" {
236 t.Fatalf("subject = %q", subject)
237 }
238 requireCursor(t, s, 55)
239}
240
241// TestHandleUnknownCollection makes sure a collection we didn't ask for
242// (jetstream filter changes, schema drift) is silently dropped — no
243// error, no row, but cursor still advances so we don't replay it.
244func TestHandleUnknownCollection(t *testing.T) {
245 s := newTestStore(t)
246 ctx := context.Background()
247
248 evt := commitEvent(42, "did:plc:owner", "app.bsky.feed.post", jsOpCreate, "rk", map[string]string{"text": "hi"})
249 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil {
250 t.Fatalf("handle: %v", err)
251 }
252 requireCursor(t, s, 42)
253}
254
255// TestHandleBadRecordAdvancesCursor is the failure-mode counterpart to
256// the happy paths: a malformed record body must be logged-and-skipped
257// (not returned as an error that pauses the scheduler) and the cursor
258// must still advance so we don't loop on the same bad event forever.
259func TestHandleBadRecordAdvancesCursor(t *testing.T) {
260 s := newTestStore(t)
261 ctx := context.Background()
262
263 evt := &jsmodels.Event{
264 Did: "did:plc:owner",
265 TimeUS: 1000,
266 Kind: jsmodels.EventKindCommit,
267 Commit: &jsmodels.Commit{
268 Operation: jsOpCreate,
269 Collection: tangled.SpindleMemberNSID,
270 RKey: "broken",
271 Record: json.RawMessage(`{not valid json`),
272 },
273 }
274 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err != nil {
275 t.Fatalf("handle should swallow decode error, got: %v", err)
276 }
277 if n := countRows(t, s, "spindle_members"); n != 0 {
278 t.Fatalf("bad record should not have inserted; got %d rows", n)
279 }
280 requireCursor(t, s, 1000)
281}
282
283// TestHandleTransientStoreErrorDoesNotAdvanceCursor is the partner to
284// TestHandleBadRecordAdvancesCursor: the cursor must hold its previous
285// position when applyCommit fails for an *infrastructure* reason
286// (here: store closed mid-flight, simulating SQLite busy / shutdown
287// races). Saving the cursor through such a failure would permanently
288// skip a perfectly good record and silently lose membership state.
289func TestHandleTransientStoreErrorDoesNotAdvanceCursor(t *testing.T) {
290 // Build the store inline (not via newTestStore) so the cleanup
291 // tolerates the explicit Close we do below to provoke errors.
292 path := filepath.Join(t.TempDir(), "tack.db")
293 s, err := openStore(path)
294 if err != nil {
295 t.Fatalf("openStore: %v", err)
296 }
297 ctx := context.Background()
298
299 // Seed a baseline cursor; after the failed apply it must still be
300 // 500, not 1000.
301 if err := s.SaveCursor(ctx, 500); err != nil {
302 t.Fatalf("seed cursor: %v", err)
303 }
304
305 // Close the underlying DB. Subsequent store calls will fail with
306 // "sql: database is closed", which is a transient-style error from
307 // applyCommit's perspective: the record itself is well-formed.
308 if err := s.db.Close(); err != nil {
309 t.Fatalf("close db: %v", err)
310 }
311
312 rec := tangled.SpindleMember{
313 Instance: "https://spindle.example",
314 Subject: "did:plc:alice",
315 CreatedAt: "2026-01-01T00:00:00Z",
316 }
317 evt := commitEvent(1000, "did:plc:owner", tangled.SpindleMemberNSID, jsOpCreate, "rk", rec)
318
319 // Expect a non-nil error: handler propagates transient failures.
320 if err := handleJetstreamEvent(ctx, s, nil, "", "", evt); err == nil {
321 t.Fatalf("handle should return transient error, got nil")
322 }
323
324 // Re-open the same DB file to inspect the persisted cursor; the
325 // closed handle obviously can't answer queries.
326 s2, err := openStore(path)
327 if err != nil {
328 t.Fatalf("re-open: %v", err)
329 }
330 defer s2.Close()
331 got, err := s2.LoadCursor(ctx)
332 if err != nil {
333 t.Fatalf("load cursor: %v", err)
334 }
335 if got == nil || *got != 500 {
336 t.Fatalf("cursor = %v, want 500 (must NOT advance on transient error)", got)
337 }
338}
339
340// TestRepoEventSubscribesKnotForOurSpindle confirms that observing a
341// sh.tangled.repo whose .spindle field equals our hostname results in a
342// dynamic AddKnot call. This is the hot path for picking up new repos
343// without a tack restart.
344func TestRepoEventSubscribesKnotForOurSpindle(t *testing.T) {
345 s := newTestStore(t)
346 ctx := context.Background()
347
348 const ours = "tack.example"
349 spindle := ours
350 rec := tangled.Repo{
351 Knot: "knot.example",
352 Name: "myrepo",
353 Spindle: &spindle,
354 CreatedAt: "2026-01-01T00:00:00Z",
355 }
356 evt := commitEvent(1, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "rk", rec)
357
358 fake := &fakeKnotConsumer{}
359 if err := handleJetstreamEvent(ctx, s, fake, ours, "did:plc:owner", evt); err != nil {
360 t.Fatalf("handle: %v", err)
361 }
362 added := fake.Added()
363 if len(added) != 1 || added[0] != "knot.example" {
364 t.Fatalf("AddKnot calls = %v, want [knot.example]", added)
365 }
366}
367
368// TestRepoEventIgnoresUnauthorizedPublisher pins the high-severity
369// security gate: a sh.tangled.repo record naming us as its spindle
370// must NOT cause an outbound knot subscription unless its publisher
371// is the spindle owner or an owner-vouched member. Without this gate
372// any firehose publisher could force tack to dial an attacker-chosen
373// websocket simply by minting a matching repo record.
374func TestRepoEventIgnoresUnauthorizedPublisher(t *testing.T) {
375 s := newTestStore(t)
376 ctx := context.Background()
377
378 const ours = "tack.example"
379 const owner = "did:plc:owner"
380
381 // did:plc:rando is neither the owner nor a member.
382 spindle := ours
383 rec := tangled.Repo{
384 Knot: "evil.example",
385 Name: "myrepo",
386 Spindle: &spindle,
387 CreatedAt: "2026-01-01T00:00:00Z",
388 }
389 evt := commitEvent(1, "did:plc:rando", tangled.RepoNSID, jsOpCreate, "rk", rec)
390
391 fake := &fakeKnotConsumer{}
392 if err := handleJetstreamEvent(ctx, s, fake, ours, owner, evt); err != nil {
393 t.Fatalf("handle: %v", err)
394 }
395 if added := fake.Added(); len(added) != 0 {
396 t.Fatalf("AddKnot calls = %v, want none (publisher is unauthorized)", added)
397 }
398}
399
400// TestSpindleMemberGrantSubscribesPendingKnots covers the case where
401// a member publishes their sh.tangled.repo *before* the owner's
402// sh.tangled.spindle.member grant arrives over the firehose. Until
403// the grant lands the publisher is unauthorized, so the repo doesn't
404// pull a subscription. Once the grant arrives, the same-rkey reconcile
405// must catch up by AddKnot-ing every knot that member's pending repos
406// already named.
407func TestSpindleMemberGrantSubscribesPendingKnots(t *testing.T) {
408 s := newTestStore(t)
409 ctx := context.Background()
410
411 const ours = "tack.example"
412 const owner = "did:plc:owner"
413 const alice = "did:plc:alice"
414
415 // Step 1: alice publishes a repo claiming us. She isn't a member
416 // yet, so no subscription should happen.
417 spindle := ours
418 repoRec := tangled.Repo{
419 Knot: "knot.example",
420 Name: "myrepo",
421 Spindle: &spindle,
422 CreatedAt: "2026-01-01T00:00:00Z",
423 }
424 repoEvt := commitEvent(1, alice, tangled.RepoNSID, jsOpCreate, "rk", repoRec)
425
426 fake := &fakeKnotConsumer{}
427 if err := handleJetstreamEvent(ctx, s, fake, ours, owner, repoEvt); err != nil {
428 t.Fatalf("handle repo: %v", err)
429 }
430 if added := fake.Added(); len(added) != 0 {
431 t.Fatalf("AddKnot before grant = %v, want none", added)
432 }
433
434 // Step 2: owner publishes a membership grant naming alice. The
435 // reconcile triggered by that grant must subscribe to alice's
436 // already-stored repo's knot.
437 grant := tangled.SpindleMember{
438 Instance: ours,
439 Subject: alice,
440 CreatedAt: "2026-01-02T00:00:00Z",
441 }
442 grantEvt := commitEvent(2, owner, tangled.SpindleMemberNSID, jsOpCreate, "mk", grant)
443 if err := handleJetstreamEvent(ctx, s, fake, ours, owner, grantEvt); err != nil {
444 t.Fatalf("handle grant: %v", err)
445 }
446 if added := fake.Added(); len(added) != 1 || added[0] != "knot.example" {
447 t.Fatalf("AddKnot after grant = %v, want [knot.example]", added)
448 }
449}
450
451// TestSpindleMemberRevokeUnsubscribesKnot verifies that revoking a
452// previously-granted membership tears down the only-held-by-that-member
453// knot subscription. Without this, an attacker who briefly held
454// membership could leave us dialing their chosen knot until restart.
455func TestSpindleMemberRevokeUnsubscribesKnot(t *testing.T) {
456 s := newTestStore(t)
457 ctx := context.Background()
458
459 const ours = "tack.example"
460 const owner = "did:plc:owner"
461 const alice = "did:plc:alice"
462
463 // Seed: owner has previously vouched for alice, and alice has
464 // already published a repo on knot.example pointing at us.
465 if err := s.UpsertSpindleMember(ctx, owner, "mk", ours, alice, "t"); err != nil {
466 t.Fatal(err)
467 }
468 if err := s.UpsertRepo(ctx, alice, "rk", "knot.example", "myrepo", ours, "", "t"); err != nil {
469 t.Fatal(err)
470 }
471
472 // Owner publishes a delete of the membership grant.
473 revoke := commitEvent(10, owner, tangled.SpindleMemberNSID, jsOpDelete, "mk", nil)
474
475 fake := &fakeKnotConsumer{}
476 if err := handleJetstreamEvent(ctx, s, fake, ours, owner, revoke); err != nil {
477 t.Fatalf("handle revoke: %v", err)
478 }
479 if removed := fake.Removed(); len(removed) != 1 || removed[0] != "knot.example" {
480 t.Fatalf("RemoveKnot calls = %v, want [knot.example]", removed)
481 }
482}
483
484// TestSpindleMemberGrantFromNonOwnerIgnored confirms that a forged
485// membership record published by anyone other than the spindle owner
486// does NOT trigger a knot subscription, even if a matching repo for
487// the named subject exists. This mirrors AuthorizePipelineActor's
488// rule that the membership grant's publisher must equal the spindle
489// owner.
490func TestSpindleMemberGrantFromNonOwnerIgnored(t *testing.T) {
491 s := newTestStore(t)
492 ctx := context.Background()
493
494 const ours = "tack.example"
495 const owner = "did:plc:owner"
496 const alice = "did:plc:alice"
497
498 if err := s.UpsertRepo(ctx, alice, "rk", "knot.example", "myrepo", ours, "", "t"); err != nil {
499 t.Fatal(err)
500 }
501
502 // Forged grant: alice "vouches" for herself. Stored, but must
503 // not flip her into authorized status.
504 forgery := tangled.SpindleMember{
505 Instance: ours,
506 Subject: alice,
507 CreatedAt: "2026-01-02T00:00:00Z",
508 }
509 evt := commitEvent(2, alice, tangled.SpindleMemberNSID, jsOpCreate, "mk", forgery)
510
511 fake := &fakeKnotConsumer{}
512 if err := handleJetstreamEvent(ctx, s, fake, ours, owner, evt); err != nil {
513 t.Fatalf("handle forged grant: %v", err)
514 }
515 if added := fake.Added(); len(added) != 0 {
516 t.Fatalf("AddKnot calls = %v, want none (grant publisher != owner)", added)
517 }
518}
519
520// TestRepoEventIgnoresKnotForOtherSpindle confirms repos pointing at a
521// *different* spindle do not pull us into watching their knot. Without
522// this guard, tack would dial every knot named in any sh.tangled.repo
523// it sees over the firehose, which is most of them.
524func TestRepoEventIgnoresKnotForOtherSpindle(t *testing.T) {
525 s := newTestStore(t)
526 ctx := context.Background()
527
528 other := "other-spindle.example"
529 rec := tangled.Repo{
530 Knot: "knot.example",
531 Name: "myrepo",
532 Spindle: &other,
533 CreatedAt: "2026-01-01T00:00:00Z",
534 }
535 evt := commitEvent(1, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "rk", rec)
536
537 fake := &fakeKnotConsumer{}
538 if err := handleJetstreamEvent(ctx, s, fake, "tack.example", "did:plc:owner", evt); err != nil {
539 t.Fatalf("handle: %v", err)
540 }
541 if added := fake.Added(); len(added) != 0 {
542 t.Fatalf("AddKnot calls = %v, want none", added)
543 }
544}
545
546// TestRepoUpdateSpindleAwayFromUsRemovesKnot covers the case where a
547// repo we'd previously been watching gets its .spindle field flipped to
548// some other spindle. Once that's the only repo we had on that knot,
549// the reconciliation must call RemoveKnot.
550func TestRepoUpdateSpindleAwayFromUsRemovesKnot(t *testing.T) {
551 s := newTestStore(t)
552 ctx := context.Background()
553
554 const ours = "tack.example"
555 const knot = "knot.example"
556
557 // Seed: a repo that names us as its spindle on `knot`.
558 if err := s.UpsertRepo(ctx, "did:plc:a", "rk", knot, "repo-a", ours, "", "t"); err != nil {
559 t.Fatal(err)
560 }
561
562 // Update: same record, now points at a different spindle.
563 other := "other.example"
564 rec := tangled.Repo{
565 Knot: knot,
566 Name: "repo-a",
567 Spindle: &other,
568 CreatedAt: "2026-01-01T00:00:00Z",
569 }
570 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpUpdate, "rk", rec)
571
572 fake := &fakeKnotConsumer{}
573 if err := handleJetstreamEvent(ctx, s, fake, ours, "did:plc:owner", evt); err != nil {
574 t.Fatalf("handle: %v", err)
575 }
576 if added := fake.Added(); len(added) != 0 {
577 t.Fatalf("AddKnot calls = %v, want none", added)
578 }
579 if removed := fake.Removed(); len(removed) != 1 || removed[0] != knot {
580 t.Fatalf("RemoveKnot calls = %v, want [%s]", removed, knot)
581 }
582}
583
584// TestRepoUpdateSpindleAwayFromUsKeepsKnotIfShared ensures we don't
585// over-eagerly unsubscribe from a knot when one of multiple repos on
586// that knot moves away from us. The other repo's subscription must keep
587// the knot in our wanted set.
588func TestRepoUpdateSpindleAwayFromUsKeepsKnotIfShared(t *testing.T) {
589 s := newTestStore(t)
590 ctx := context.Background()
591
592 const ours = "tack.example"
593 const knot = "knot.example"
594
595 // Both publishers must be vouched for; otherwise IsKnotWanted's
596 // membership filter would zero them out and we'd unsubscribe
597 // regardless. The reconciliation we want to test only matters
598 // when there is a real authorized hold to keep.
599 if err := s.UpsertSpindleMember(ctx, "did:plc:owner", "mk1", ours, "did:plc:a", "t"); err != nil {
600 t.Fatal(err)
601 }
602 if err := s.UpsertSpindleMember(ctx, "did:plc:owner", "mk2", ours, "did:plc:b", "t"); err != nil {
603 t.Fatal(err)
604 }
605
606 // Two repos sharing one knot, both pointed at us.
607 if err := s.UpsertRepo(ctx, "did:plc:a", "rk1", knot, "a", ours, "", "t"); err != nil {
608 t.Fatal(err)
609 }
610 if err := s.UpsertRepo(ctx, "did:plc:b", "rk2", knot, "b", ours, "", "t"); err != nil {
611 t.Fatal(err)
612 }
613
614 // Repo A flips to a different spindle. B still wants us.
615 other := "other.example"
616 rec := tangled.Repo{
617 Knot: knot,
618 Name: "a",
619 Spindle: &other,
620 CreatedAt: "2026-01-01T00:00:00Z",
621 }
622 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpUpdate, "rk1", rec)
623
624 fake := &fakeKnotConsumer{}
625 if err := handleJetstreamEvent(ctx, s, fake, ours, "did:plc:owner", evt); err != nil {
626 t.Fatalf("handle: %v", err)
627 }
628 if removed := fake.Removed(); len(removed) != 0 {
629 t.Fatalf("RemoveKnot calls = %v, want none (B still wants us on %s)", removed, knot)
630 }
631}
632
633// TestRepoUpdateChangingKnotSwapsSubscription verifies that a repo
634// staying with us but changing its .knot field unsubscribes the old
635// knot (if no other repo holds it) and subscribes the new one.
636func TestRepoUpdateChangingKnotSwapsSubscription(t *testing.T) {
637 s := newTestStore(t)
638 ctx := context.Background()
639
640 const ours = "tack.example"
641 const oldKnot = "old.example"
642 const newKnot = "new.example"
643
644 // Vouch for did:plc:a so reconcileKnot will actually call
645 // AddKnot for the new host. Without the grant the publisher
646 // is unauthorized and the AddKnot half is (correctly) skipped.
647 if err := s.UpsertSpindleMember(ctx, "did:plc:owner", "mk1", ours, "did:plc:a", "t"); err != nil {
648 t.Fatal(err)
649 }
650
651 if err := s.UpsertRepo(ctx, "did:plc:a", "rk", oldKnot, "a", ours, "", "t"); err != nil {
652 t.Fatal(err)
653 }
654
655 spindle := ours
656 rec := tangled.Repo{
657 Knot: newKnot,
658 Name: "a",
659 Spindle: &spindle,
660 CreatedAt: "2026-01-01T00:00:00Z",
661 }
662 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpUpdate, "rk", rec)
663
664 fake := &fakeKnotConsumer{}
665 if err := handleJetstreamEvent(ctx, s, fake, ours, "did:plc:owner", evt); err != nil {
666 t.Fatalf("handle: %v", err)
667 }
668 if added := fake.Added(); len(added) != 1 || added[0] != newKnot {
669 t.Fatalf("AddKnot calls = %v, want [%s]", added, newKnot)
670 }
671 if removed := fake.Removed(); len(removed) != 1 || removed[0] != oldKnot {
672 t.Fatalf("RemoveKnot calls = %v, want [%s]", removed, oldKnot)
673 }
674}
675
676// TestRepoDeleteRemovesKnotWhenLast confirms deleting the last repo on
677// a knot we cared about triggers RemoveKnot.
678func TestRepoDeleteRemovesKnotWhenLast(t *testing.T) {
679 s := newTestStore(t)
680 ctx := context.Background()
681
682 const ours = "tack.example"
683 const knot = "knot.example"
684
685 if err := s.UpsertRepo(ctx, "did:plc:a", "rk", knot, "a", ours, "", "t"); err != nil {
686 t.Fatal(err)
687 }
688 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpDelete, "rk", nil)
689
690 fake := &fakeKnotConsumer{}
691 if err := handleJetstreamEvent(ctx, s, fake, ours, "did:plc:owner", evt); err != nil {
692 t.Fatalf("handle: %v", err)
693 }
694 if removed := fake.Removed(); len(removed) != 1 || removed[0] != knot {
695 t.Fatalf("RemoveKnot calls = %v, want [%s]", removed, knot)
696 }
697}
698
699// TestRepoDeleteKeepsKnotIfShared ensures deleting one of multiple
700// repos on a knot does not unsubscribe — the survivors still want it.
701func TestRepoDeleteKeepsKnotIfShared(t *testing.T) {
702 s := newTestStore(t)
703 ctx := context.Background()
704
705 const ours = "tack.example"
706 const knot = "knot.example"
707
708 // Both publishers vouched for; otherwise IsKnotWanted would
709 // (correctly) report neither holds the knot and we'd unsubscribe.
710 if err := s.UpsertSpindleMember(ctx, "did:plc:owner", "mk1", ours, "did:plc:a", "t"); err != nil {
711 t.Fatal(err)
712 }
713 if err := s.UpsertSpindleMember(ctx, "did:plc:owner", "mk2", ours, "did:plc:b", "t"); err != nil {
714 t.Fatal(err)
715 }
716
717 if err := s.UpsertRepo(ctx, "did:plc:a", "rk1", knot, "a", ours, "", "t"); err != nil {
718 t.Fatal(err)
719 }
720 if err := s.UpsertRepo(ctx, "did:plc:b", "rk2", knot, "b", ours, "", "t"); err != nil {
721 t.Fatal(err)
722 }
723 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpDelete, "rk1", nil)
724
725 fake := &fakeKnotConsumer{}
726 if err := handleJetstreamEvent(ctx, s, fake, ours, "did:plc:owner", evt); err != nil {
727 t.Fatalf("handle: %v", err)
728 }
729 if removed := fake.Removed(); len(removed) != 0 {
730 t.Fatalf("RemoveKnot calls = %v, want none", removed)
731 }
732}