···215215 if err := json.Unmarshal(c.Record, &rec); err != nil {
216216 return fmt.Errorf("decode repo: %w", err)
217217 }
218218+219219+ // Capture the prior (knot, spindle) before the upsert so the
220220+ // post-mutation reconcile below can detect transitions like
221221+ // "repo used to point at us, no longer does" — which would
222222+ // otherwise leave a knot subscription dangling.
223223+ oldKnot, oldSpindle, err := st.GetRepo(ctx, did, c.RKey)
224224+ if err != nil {
225225+ return err
226226+ }
227227+218228 if err := st.UpsertRepo(ctx, did, c.RKey,
219229 rec.Knot, rec.Name,
220230 deref(rec.Spindle), deref(rec.RepoDid),
···223233 return err
224234 }
225235226226- // If this repo just declared us as its spindle, start (or
227227- // continue) listening to its knot for pipeline triggers. The
228228- // knot consumer dedupes on its own so this is safe to call
229229- // even on update events that don't change the spindle field.
230230- if knots != nil && rec.Spindle != nil && *rec.Spindle == hostname && rec.Knot != "" {
231231- knots.AddKnot(ctx, rec.Knot)
236236+ newSpindle := deref(rec.Spindle)
237237+ return reconcileKnot(ctx, st, knots, hostname,
238238+ oldKnot, oldSpindle,
239239+ rec.Knot, newSpindle,
240240+ )
241241+242242+ case jsOpDelete:
243243+ // Same shape as the update path, just with no "new" side: we
244244+ // have to read the row out before deleting so we can decide
245245+ // whether deletion freed the last hold on a knot we'd been
246246+ // subscribed to.
247247+ oldKnot, oldSpindle, err := st.GetRepo(ctx, did, c.RKey)
248248+ if err != nil {
249249+ return err
232250 }
251251+ if err := st.DeleteRepo(ctx, did, c.RKey); err != nil {
252252+ return err
253253+ }
254254+ return reconcileKnot(ctx, st, knots, hostname,
255255+ oldKnot, oldSpindle,
256256+ "", "",
257257+ )
258258+ }
259259+ return nil
260260+}
233261262262+// reconcileKnot brings the knot consumer's subscriptions in line with
263263+// the latest store state after a single repo mutation. It is called
264264+// after the mutation has been applied so IsKnotWanted reflects the
265265+// post-mutation truth.
266266+//
267267+// Logic:
268268+// - If the new record names us as its spindle, ensure we're
269269+// subscribed to its knot (AddKnot is idempotent, so calling it on
270270+// an already-watched knot is cheap).
271271+// - If the old record named us as its spindle, check whether any
272272+// other repo still references that knot through us; if not,
273273+// RemoveKnot it. Skip this when the knot didn't change AND the
274274+// spindle didn't move away from us, because then nothing actually
275275+// released our hold.
276276+func reconcileKnot(
277277+ ctx context.Context,
278278+ st *store,
279279+ knots KnotConsumer,
280280+ hostname string,
281281+ oldKnot, oldSpindle string,
282282+ newKnot, newSpindle string,
283283+) error {
284284+ // Tests pass nil for the consumer when they only care about the
285285+ // store mutation half of the handler; tolerate that here so
286286+ // callers don't have to special-case it.
287287+ if knots == nil {
234288 return nil
235235- case jsOpDelete:
236236- // We don't unsubscribe from the knot here: other repos may
237237- // still want us to watch it. A periodic reconciliation pass
238238- // (not yet implemented) is the right place to drop unused
239239- // subscriptions.
240240- return st.DeleteRepo(ctx, did, c.RKey)
289289+ }
290290+291291+ if newSpindle == hostname && newKnot != "" {
292292+ knots.AddKnot(ctx, newKnot)
293293+ }
294294+295295+ // Did we just lose our claim on oldKnot? Two ways that can happen:
296296+ // the spindle field moved off of us, or the knot field moved to a
297297+ // different host. Either is a reason to consider unsubscribing
298298+ // from oldKnot — but only if no *other* repo still has us on it.
299299+ releasedOld := oldSpindle == hostname && oldKnot != "" &&
300300+ (newSpindle != hostname || newKnot != oldKnot)
301301+ if releasedOld {
302302+ stillWanted, err := st.IsKnotWanted(ctx, hostname, oldKnot)
303303+ if err != nil {
304304+ return err
305305+ }
306306+ if !stillWanted {
307307+ knots.RemoveKnot(ctx, oldKnot)
308308+ }
241309 }
242310 return nil
243311}
+161
jetstream_test.go
···310310 t.Fatalf("AddKnot calls = %v, want none", added)
311311 }
312312}
313313+314314+// TestRepoUpdateSpindleAwayFromUsRemovesKnot covers the case where a
315315+// repo we'd previously been watching gets its .spindle field flipped to
316316+// some other spindle. Once that's the only repo we had on that knot,
317317+// the reconciliation must call RemoveKnot.
318318+func TestRepoUpdateSpindleAwayFromUsRemovesKnot(t *testing.T) {
319319+ s := newTestStore(t)
320320+ ctx := context.Background()
321321+322322+ const ours = "tack.example"
323323+ const knot = "knot.example"
324324+325325+ // Seed: a repo that names us as its spindle on `knot`.
326326+ if err := s.UpsertRepo(ctx, "did:plc:a", "rk", knot, "repo-a", ours, "", "t"); err != nil {
327327+ t.Fatal(err)
328328+ }
329329+330330+ // Update: same record, now points at a different spindle.
331331+ other := "other.example"
332332+ rec := tangled.Repo{
333333+ Knot: knot,
334334+ Name: "repo-a",
335335+ Spindle: &other,
336336+ CreatedAt: "2026-01-01T00:00:00Z",
337337+ }
338338+ evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpUpdate, "rk", rec)
339339+340340+ fake := &fakeKnotConsumer{}
341341+ if err := handleJetstreamEvent(ctx, s, fake, ours, evt); err != nil {
342342+ t.Fatalf("handle: %v", err)
343343+ }
344344+ if added := fake.Added(); len(added) != 0 {
345345+ t.Fatalf("AddKnot calls = %v, want none", added)
346346+ }
347347+ if removed := fake.Removed(); len(removed) != 1 || removed[0] != knot {
348348+ t.Fatalf("RemoveKnot calls = %v, want [%s]", removed, knot)
349349+ }
350350+}
351351+352352+// TestRepoUpdateSpindleAwayFromUsKeepsKnotIfShared ensures we don't
353353+// over-eagerly unsubscribe from a knot when one of multiple repos on
354354+// that knot moves away from us. The other repo's subscription must keep
355355+// the knot in our wanted set.
356356+func TestRepoUpdateSpindleAwayFromUsKeepsKnotIfShared(t *testing.T) {
357357+ s := newTestStore(t)
358358+ ctx := context.Background()
359359+360360+ const ours = "tack.example"
361361+ const knot = "knot.example"
362362+363363+ // Two repos sharing one knot, both pointed at us.
364364+ if err := s.UpsertRepo(ctx, "did:plc:a", "rk1", knot, "a", ours, "", "t"); err != nil {
365365+ t.Fatal(err)
366366+ }
367367+ if err := s.UpsertRepo(ctx, "did:plc:b", "rk2", knot, "b", ours, "", "t"); err != nil {
368368+ t.Fatal(err)
369369+ }
370370+371371+ // Repo A flips to a different spindle. B still wants us.
372372+ other := "other.example"
373373+ rec := tangled.Repo{
374374+ Knot: knot,
375375+ Name: "a",
376376+ Spindle: &other,
377377+ CreatedAt: "2026-01-01T00:00:00Z",
378378+ }
379379+ evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpUpdate, "rk1", rec)
380380+381381+ fake := &fakeKnotConsumer{}
382382+ if err := handleJetstreamEvent(ctx, s, fake, ours, evt); err != nil {
383383+ t.Fatalf("handle: %v", err)
384384+ }
385385+ if removed := fake.Removed(); len(removed) != 0 {
386386+ t.Fatalf("RemoveKnot calls = %v, want none (B still wants us on %s)", removed, knot)
387387+ }
388388+}
389389+390390+// TestRepoUpdateChangingKnotSwapsSubscription verifies that a repo
391391+// staying with us but changing its .knot field unsubscribes the old
392392+// knot (if no other repo holds it) and subscribes the new one.
393393+func TestRepoUpdateChangingKnotSwapsSubscription(t *testing.T) {
394394+ s := newTestStore(t)
395395+ ctx := context.Background()
396396+397397+ const ours = "tack.example"
398398+ const oldKnot = "old.example"
399399+ const newKnot = "new.example"
400400+401401+ if err := s.UpsertRepo(ctx, "did:plc:a", "rk", oldKnot, "a", ours, "", "t"); err != nil {
402402+ t.Fatal(err)
403403+ }
404404+405405+ spindle := ours
406406+ rec := tangled.Repo{
407407+ Knot: newKnot,
408408+ Name: "a",
409409+ Spindle: &spindle,
410410+ CreatedAt: "2026-01-01T00:00:00Z",
411411+ }
412412+ evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpUpdate, "rk", rec)
413413+414414+ fake := &fakeKnotConsumer{}
415415+ if err := handleJetstreamEvent(ctx, s, fake, ours, evt); err != nil {
416416+ t.Fatalf("handle: %v", err)
417417+ }
418418+ if added := fake.Added(); len(added) != 1 || added[0] != newKnot {
419419+ t.Fatalf("AddKnot calls = %v, want [%s]", added, newKnot)
420420+ }
421421+ if removed := fake.Removed(); len(removed) != 1 || removed[0] != oldKnot {
422422+ t.Fatalf("RemoveKnot calls = %v, want [%s]", removed, oldKnot)
423423+ }
424424+}
425425+426426+// TestRepoDeleteRemovesKnotWhenLast confirms deleting the last repo on
427427+// a knot we cared about triggers RemoveKnot.
428428+func TestRepoDeleteRemovesKnotWhenLast(t *testing.T) {
429429+ s := newTestStore(t)
430430+ ctx := context.Background()
431431+432432+ const ours = "tack.example"
433433+ const knot = "knot.example"
434434+435435+ if err := s.UpsertRepo(ctx, "did:plc:a", "rk", knot, "a", ours, "", "t"); err != nil {
436436+ t.Fatal(err)
437437+ }
438438+ evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpDelete, "rk", nil)
439439+440440+ fake := &fakeKnotConsumer{}
441441+ if err := handleJetstreamEvent(ctx, s, fake, ours, evt); err != nil {
442442+ t.Fatalf("handle: %v", err)
443443+ }
444444+ if removed := fake.Removed(); len(removed) != 1 || removed[0] != knot {
445445+ t.Fatalf("RemoveKnot calls = %v, want [%s]", removed, knot)
446446+ }
447447+}
448448+449449+// TestRepoDeleteKeepsKnotIfShared ensures deleting one of multiple
450450+// repos on a knot does not unsubscribe — the survivors still want it.
451451+func TestRepoDeleteKeepsKnotIfShared(t *testing.T) {
452452+ s := newTestStore(t)
453453+ ctx := context.Background()
454454+455455+ const ours = "tack.example"
456456+ const knot = "knot.example"
457457+458458+ if err := s.UpsertRepo(ctx, "did:plc:a", "rk1", knot, "a", ours, "", "t"); err != nil {
459459+ t.Fatal(err)
460460+ }
461461+ if err := s.UpsertRepo(ctx, "did:plc:b", "rk2", knot, "b", ours, "", "t"); err != nil {
462462+ t.Fatal(err)
463463+ }
464464+ evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpDelete, "rk1", nil)
465465+466466+ fake := &fakeKnotConsumer{}
467467+ if err := handleJetstreamEvent(ctx, s, fake, ours, evt); err != nil {
468468+ t.Fatalf("handle: %v", err)
469469+ }
470470+ if removed := fake.Removed(); len(removed) != 0 {
471471+ t.Fatalf("RemoveKnot calls = %v, want none", removed)
472472+ }
473473+}
+27-4
knot.go
···1919// Once the build pipeline is wired up this is where pipeline
2020// triggers will be translated into Buildkite builds.
2121//
2222-// The jetstream consumer also gets a back-reference (via the knotAdder
2323-// interface) so it can dynamically subscribe to a new knot the moment a
2424-// matching sh.tangled.repo record arrives, without waiting for a tack
2525-// restart.
2222+// The jetstream consumer also gets a back-reference (via the
2323+// KnotConsumer interface) so it can dynamically subscribe a new knot —
2424+// or, conceptually, unsubscribe one — the moment a matching
2525+// sh.tangled.repo record arrives, without waiting for a tack restart.
26262727import (
2828 "context"
···5050 // a no-op. An empty knot string is ignored. The supplied context
5151 // scopes the dial; cancelling it tears the subscription down.
5252 AddKnot(ctx context.Context, knot string)
5353+5454+ // RemoveKnot stops processing events from the given knot. It is the
5555+ // inverse of AddKnot and must tolerate being called for a knot that
5656+ // was never added (no-op). An empty knot string is ignored.
5757+ //
5858+ // The production implementation is currently a no-op: tangled-core's
5959+ // eventconsumer does not expose a way to drop an individual source's
6060+ // websocket. Tracked upstream as
6161+ // https://tangled.org/did:plc:j5hmlfdrwkvtxm7cjmu7j2is/issues/510
6262+ RemoveKnot(ctx context.Context, knot string)
5363}
54645565// knotConsumer is the production KnotConsumer. It wraps
···116126 }
117127 k.log.Info("adding knot source", "knot", knot)
118128 k.c.AddSource(ctx, eventconsumer.NewKnotSource(knot))
129129+}
130130+131131+// RemoveKnot is currently a no-op — see the interface comment for the
132132+// upstream blocker. We still log at info so operators can see when the
133133+// reconciliation logic *would* have unsubscribed; once eventconsumer
134134+// gains a RemoveSource primitive, swap the body for a real call.
135135+func (k *knotConsumer) RemoveKnot(_ context.Context, knot string) {
136136+ if knot == "" {
137137+ return
138138+ }
139139+ k.log.Info("remove knot source requested (no-op until upstream supports it)",
140140+ "knot", knot,
141141+ )
119142}
120143121144// Stop tears down all knot websocket connections and waits for the
+23-5
knot_fake.go
···66// any future test files (and, if we ever split tack into subpackages, can
77// be promoted to an exported helper without moving code around).
88//
99-// It does no I/O: AddKnot just records the knot it was handed so tests
1010-// can assert on the side effect.
99+// It does no I/O: AddKnot and RemoveKnot just record the knot they were
1010+// handed so tests can assert on the side effect.
11111212import (
1313 "context"
···1515)
16161717// fakeKnotConsumer is an in-memory KnotConsumer suitable for tests. The
1818-// zero value is ready to use; concurrent calls to AddKnot are safe.
1818+// zero value is ready to use; concurrent calls are safe.
1919type fakeKnotConsumer struct {
2020- mu sync.Mutex
2121- added []string
2020+ mu sync.Mutex
2121+ added []string
2222+ removed []string
2223}
23242425// Compile-time interface conformance check — keeps the fake honest if
···3233 f.added = append(f.added, knot)
3334}
34353636+// RemoveKnot records the knot for later inspection via Removed().
3737+func (f *fakeKnotConsumer) RemoveKnot(_ context.Context, knot string) {
3838+ f.mu.Lock()
3939+ defer f.mu.Unlock()
4040+ f.removed = append(f.removed, knot)
4141+}
4242+3543// Added returns a copy of the knots passed to AddKnot, in call order.
3644// A copy is returned so callers can't accidentally mutate the fake's
3745// internal slice while comparing.
···4250 copy(out, f.added)
4351 return out
4452}
5353+5454+// Removed returns a copy of the knots passed to RemoveKnot, in call
5555+// order. See Added() for the rationale behind copying.
5656+func (f *fakeKnotConsumer) Removed() []string {
5757+ f.mu.Lock()
5858+ defer f.mu.Unlock()
5959+ out := make([]string, len(f.removed))
6060+ copy(out, f.removed)
6161+ return out
6262+}
+39
store.go
···194194 return nil
195195}
196196197197+// GetRepo returns the (knot, spindle) currently stored for a (did, rkey)
198198+// pair. Both are returned as empty strings when no row exists; callers
199199+// that need to distinguish "absent" from "stored but empty" should
200200+// pre-check existence themselves.
201201+//
202202+// This exists so applyRepo can read the *previous* spindle/knot of a
203203+// record before applying a mutation, which is what makes it possible to
204204+// detect transitions like "this repo used to be ours, now it isn't" and
205205+// trigger a knot unsubscribe.
206206+func (s *store) GetRepo(ctx context.Context, did, rkey string) (knot, spindle string, err error) {
207207+ err = s.db.QueryRowContext(ctx,
208208+ `SELECT knot, spindle FROM repos WHERE did = ? AND rkey = ?`,
209209+ did, rkey,
210210+ ).Scan(&knot, &spindle)
211211+ if errors.Is(err, sql.ErrNoRows) {
212212+ return "", "", nil
213213+ }
214214+ if err != nil {
215215+ return "", "", fmt.Errorf("get repo: %w", err)
216216+ }
217217+ return knot, spindle, nil
218218+}
219219+220220+// IsKnotWanted reports whether any repo currently stored still names the
221221+// given hostname as its spindle and the given knot as its host. After a
222222+// repo update or delete this is the question we ask to decide whether
223223+// to keep watching that knot or unsubscribe from it.
224224+func (s *store) IsKnotWanted(ctx context.Context, hostname, knot string) (bool, error) {
225225+ var n int
226226+ err := s.db.QueryRowContext(ctx,
227227+ `SELECT COUNT(*) FROM repos WHERE spindle = ? AND knot = ?`,
228228+ hostname, knot,
229229+ ).Scan(&n)
230230+ if err != nil {
231231+ return false, fmt.Errorf("count repos for knot: %w", err)
232232+ }
233233+ return n > 0, nil
234234+}
235235+197236// KnotsForSpindle returns the distinct knot hostnames of all repos that
198237// have declared the given spindle hostname as their CI spindle. The knot
199238// event-stream subscriber uses this to decide which knots to dial.