Stitch any CI into Tangled
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

knot listening

+435 -22
+3
go.mod
··· 11 11 ) 12 12 13 13 require ( 14 + github.com/avast/retry-go/v4 v4.6.1 // indirect 14 15 github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect 15 16 github.com/beorn7/perks v1.0.1 // indirect 16 17 github.com/bluesky-social/indigo v0.0.0-20260220055544-bf41e2ee75ab // indirect ··· 20 21 github.com/charmbracelet/x/ansi v0.8.0 // indirect 21 22 github.com/charmbracelet/x/cellbuf v0.0.13-0.20250311204145-2c3ea96c31dd // indirect 22 23 github.com/charmbracelet/x/term v0.2.1 // indirect 24 + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect 23 25 github.com/go-logfmt/logfmt v0.6.1 // indirect 24 26 github.com/goccy/go-json v0.10.5 // indirect 25 27 github.com/ipfs/go-cid v0.6.0 // indirect ··· 41 43 github.com/prometheus/client_model v0.6.2 // indirect 42 44 github.com/prometheus/common v0.67.5 // indirect 43 45 github.com/prometheus/procfs v0.19.2 // indirect 46 + github.com/redis/go-redis/v9 v9.7.3 // indirect 44 47 github.com/rivo/uniseg v0.4.7 // indirect 45 48 github.com/spaolacci/murmur3 v1.1.0 // indirect 46 49 github.com/whyrusleeping/cbor-gen v0.3.1 // indirect
+6
go.sum
··· 1 + github.com/avast/retry-go/v4 v4.6.1 h1:VkOLRubHdisGrHnTu89g08aQEWEgRU7LVEop3GbIcMk= 2 + github.com/avast/retry-go/v4 v4.6.1/go.mod h1:V6oF8njAwxJ5gRo1Q7Cxab24xs5NCWZBeaHHBklR8mA= 1 3 github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiErDT4WkJ2k= 2 4 github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8= 3 5 github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= ··· 22 24 github.com/charmbracelet/x/term v0.2.1/go.mod h1:oQ4enTYFV7QN4m0i9mzHrViD7TQKvNEEkHUMCmsxdUg= 23 25 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= 24 26 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= 27 + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= 28 + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= 25 29 github.com/go-logfmt/logfmt v0.6.1 h1:4hvbpePJKnIzH1B+8OR/JPbTx37NktoI9LE2QZBBkvE= 26 30 github.com/go-logfmt/logfmt v0.6.1/go.mod h1:EV2pOAQoZaT1ZXZbqDl5hrymndi4SY9ED9/z6CO0XAk= 27 31 github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= ··· 76 80 github.com/prometheus/common v0.67.5/go.mod h1:SjE/0MzDEEAyrdr5Gqc6G+sXI67maCxzaT3A2+HqjUw= 77 81 github.com/prometheus/procfs v0.19.2 h1:zUMhqEW66Ex7OXIiDkll3tl9a1ZdilUOd/F6ZXw4Vws= 78 82 github.com/prometheus/procfs v0.19.2/go.mod h1:M0aotyiemPhBCM0z5w87kL22CxfcH05ZpYlu+b4J7mw= 83 + github.com/redis/go-redis/v9 v9.7.3 h1:YpPyAayJV+XErNsatSElgRZZVCwXX9QzkKYNvO7x0wM= 84 + github.com/redis/go-redis/v9 v9.7.3/go.mod h1:bGUrSggJ9X9GUmZpZNEOQKaANxSGgOEBRltRTZHSvrA= 79 85 github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= 80 86 github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= 81 87 github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
+29 -12
jetstream.go
··· 51 51 // 52 52 // The logger is pulled from ctx (see log.go); falls back to slog.Default() 53 53 // if none is attached. 54 - func startJetstream(ctx context.Context, cfg config, st *store) error { 54 + func startJetstream(ctx context.Context, cfg config, st *store, knots KnotConsumer) error { 55 55 logger := loggerFrom(ctx).With("component", "jetstream") 56 56 57 57 // `wantedCollections` is a server-side filter: jetstream will only send ··· 69 69 clientCfg.WebsocketURL = cfg.JetstreamURL 70 70 clientCfg.WantedCollections = collections 71 71 72 - // The handler closes over `st` and `logger` so the scheduler signature 73 - // stays plain `func(ctx, *Event) error` — no need for a method 74 - // receiver or a global. 72 + // The handler closes over `st`, `knots` and the spindle hostname so 73 + // the scheduler signature stays plain `func(ctx, *Event) error` and 74 + // applyCommit can hand the knot consumer new sources as soon as 75 + // matching repo records arrive. 75 76 handler := func(ctx context.Context, evt *jsmodels.Event) error { 76 - return handleJetstreamEvent(ctx, st, evt) 77 + return handleJetstreamEvent(ctx, st, knots, cfg.Hostname, evt) 77 78 } 78 79 79 80 // Re-attach the component-scoped logger so handler — which the ··· 136 137 // applies the event to the store and advances the persisted cursor. Any 137 138 // returned error is logged by the scheduler but does not tear down the 138 139 // connection — the next event will retry the cursor write implicitly. 139 - func handleJetstreamEvent(ctx context.Context, st *store, evt *jsmodels.Event) error { 140 + func handleJetstreamEvent(ctx context.Context, st *store, knots KnotConsumer, hostname string, evt *jsmodels.Event) error { 140 141 // We only care about commits, which are the actual record CRUD 141 142 // operations on a user's PDS. Account/identity events are ignored 142 143 // for now; if we ever care about handle changes we can add them. ··· 148 149 // Dispatch on collection. Unknown collections shouldn't happen given 149 150 // our wantedCollections filter, but be defensive — jetstream may 150 151 // send schema changes ahead of us updating the filter. 151 - if err := applyCommit(ctx, st, evt); err != nil { 152 + if err := applyCommit(ctx, st, knots, hostname, evt); err != nil { 152 153 logger.Error("apply commit", 153 154 "err", err, 154 155 "did", evt.Did, ··· 175 176 176 177 // applyCommit routes a commit to the right store mutation based on its 177 178 // collection NSID and operation. 178 - func applyCommit(ctx context.Context, st *store, evt *jsmodels.Event) error { 179 + func applyCommit(ctx context.Context, st *store, knots KnotConsumer, hostname string, evt *jsmodels.Event) error { 179 180 c := evt.Commit 180 181 switch c.Collection { 181 182 case tangled.SpindleMemberNSID: 182 183 return applySpindleMember(ctx, st, evt.Did, c) 183 184 case tangled.RepoNSID: 184 - return applyRepo(ctx, st, evt.Did, c) 185 + return applyRepo(ctx, st, knots, hostname, evt.Did, c) 185 186 case tangled.RepoCollaboratorNSID: 186 187 return applyRepoCollaborator(ctx, st, evt.Did, c) 187 188 default: ··· 207 208 return nil 208 209 } 209 210 210 - func applyRepo(ctx context.Context, st *store, did string, c *jsmodels.Commit) error { 211 + func applyRepo(ctx context.Context, st *store, knots KnotConsumer, hostname string, did string, c *jsmodels.Commit) error { 211 212 switch c.Operation { 212 213 case jsOpCreate, jsOpUpdate: 213 214 var rec tangled.Repo 214 215 if err := json.Unmarshal(c.Record, &rec); err != nil { 215 216 return fmt.Errorf("decode repo: %w", err) 216 217 } 217 - return st.UpsertRepo(ctx, did, c.RKey, 218 + if err := st.UpsertRepo(ctx, did, c.RKey, 218 219 rec.Knot, rec.Name, 219 220 deref(rec.Spindle), deref(rec.RepoDid), 220 221 rec.CreatedAt, 221 - ) 222 + ); err != nil { 223 + return err 224 + } 225 + 226 + // If this repo just declared us as its spindle, start (or 227 + // continue) listening to its knot for pipeline triggers. The 228 + // knot consumer dedupes on its own so this is safe to call 229 + // even on update events that don't change the spindle field. 230 + if knots != nil && rec.Spindle != nil && *rec.Spindle == hostname && rec.Knot != "" { 231 + knots.AddKnot(ctx, rec.Knot) 232 + } 233 + 234 + return nil 222 235 case jsOpDelete: 236 + // We don't unsubscribe from the knot here: other repos may 237 + // still want us to watch it. A periodic reconciliation pass 238 + // (not yet implemented) is the right place to drop unused 239 + // subscriptions. 223 240 return st.DeleteRepo(ctx, did, c.RKey) 224 241 } 225 242 return nil
+62 -8
jetstream_test.go
··· 66 66 TimeUS: 100, 67 67 Kind: jsmodels.EventKindAccount, 68 68 } 69 - if err := handleJetstreamEvent(ctx, s, evt); err != nil { 69 + if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil { 70 70 t.Fatalf("handle: %v", err) 71 71 } 72 72 got, err := s.LoadCursor(ctx) ··· 90 90 CreatedAt: "2026-01-01T00:00:00Z", 91 91 } 92 92 evt := commitEvent(12345, "did:plc:owner", tangled.SpindleMemberNSID, jsOpCreate, "rk1", rec) 93 - if err := handleJetstreamEvent(ctx, s, evt); err != nil { 93 + if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil { 94 94 t.Fatalf("handle: %v", err) 95 95 } 96 96 ··· 118 118 t.Fatalf("seed: %v", err) 119 119 } 120 120 evt := commitEvent(99, "did:plc:owner", tangled.SpindleMemberNSID, jsOpDelete, "rk1", nil) 121 - if err := handleJetstreamEvent(ctx, s, evt); err != nil { 121 + if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil { 122 122 t.Fatalf("handle: %v", err) 123 123 } 124 124 if n := countRows(t, s, "spindle_members"); n != 0 { ··· 144 144 CreatedAt: "2026-01-01T00:00:00Z", 145 145 } 146 146 evt := commitEvent(7, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "repo1", rec) 147 - if err := handleJetstreamEvent(ctx, s, evt); err != nil { 147 + if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil { 148 148 t.Fatalf("handle: %v", err) 149 149 } 150 150 ··· 168 168 CreatedAt: "2026-01-01T00:00:00Z", 169 169 } 170 170 evt2 := commitEvent(8, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "repo2", rec2) 171 - if err := handleJetstreamEvent(ctx, s, evt2); err != nil { 171 + if err := handleJetstreamEvent(ctx, s, nil, "", evt2); err != nil { 172 172 t.Fatalf("handle nil-optionals: %v", err) 173 173 } 174 174 err = s.db.QueryRowContext(ctx, ··· 197 197 CreatedAt: "2026-01-01T00:00:00Z", 198 198 } 199 199 evt := commitEvent(55, "did:plc:owner", tangled.RepoCollaboratorNSID, jsOpCreate, "c1", rec) 200 - if err := handleJetstreamEvent(ctx, s, evt); err != nil { 200 + if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil { 201 201 t.Fatalf("handle: %v", err) 202 202 } 203 203 ··· 223 223 ctx := context.Background() 224 224 225 225 evt := commitEvent(42, "did:plc:owner", "app.bsky.feed.post", jsOpCreate, "rk", map[string]string{"text": "hi"}) 226 - if err := handleJetstreamEvent(ctx, s, evt); err != nil { 226 + if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil { 227 227 t.Fatalf("handle: %v", err) 228 228 } 229 229 requireCursor(t, s, 42) ··· 248 248 Record: json.RawMessage(`{not valid json`), 249 249 }, 250 250 } 251 - if err := handleJetstreamEvent(ctx, s, evt); err != nil { 251 + if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil { 252 252 t.Fatalf("handle should swallow decode error, got: %v", err) 253 253 } 254 254 if n := countRows(t, s, "spindle_members"); n != 0 { ··· 256 256 } 257 257 requireCursor(t, s, 1000) 258 258 } 259 + 260 + // TestRepoEventSubscribesKnotForOurSpindle confirms that observing a 261 + // sh.tangled.repo whose .spindle field equals our hostname results in a 262 + // dynamic AddKnot call. This is the hot path for picking up new repos 263 + // without a tack restart. 264 + func TestRepoEventSubscribesKnotForOurSpindle(t *testing.T) { 265 + s := newTestStore(t) 266 + ctx := context.Background() 267 + 268 + const ours = "tack.example" 269 + spindle := ours 270 + rec := tangled.Repo{ 271 + Knot: "knot.example", 272 + Name: "myrepo", 273 + Spindle: &spindle, 274 + CreatedAt: "2026-01-01T00:00:00Z", 275 + } 276 + evt := commitEvent(1, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "rk", rec) 277 + 278 + fake := &fakeKnotConsumer{} 279 + if err := handleJetstreamEvent(ctx, s, fake, ours, evt); err != nil { 280 + t.Fatalf("handle: %v", err) 281 + } 282 + added := fake.Added() 283 + if len(added) != 1 || added[0] != "knot.example" { 284 + t.Fatalf("AddKnot calls = %v, want [knot.example]", added) 285 + } 286 + } 287 + 288 + // TestRepoEventIgnoresKnotForOtherSpindle confirms repos pointing at a 289 + // *different* spindle do not pull us into watching their knot. Without 290 + // this guard, tack would dial every knot named in any sh.tangled.repo 291 + // it sees over the firehose, which is most of them. 292 + func TestRepoEventIgnoresKnotForOtherSpindle(t *testing.T) { 293 + s := newTestStore(t) 294 + ctx := context.Background() 295 + 296 + other := "other-spindle.example" 297 + rec := tangled.Repo{ 298 + Knot: "knot.example", 299 + Name: "myrepo", 300 + Spindle: &other, 301 + CreatedAt: "2026-01-01T00:00:00Z", 302 + } 303 + evt := commitEvent(1, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "rk", rec) 304 + 305 + fake := &fakeKnotConsumer{} 306 + if err := handleJetstreamEvent(ctx, s, fake, "tack.example", evt); err != nil { 307 + t.Fatalf("handle: %v", err) 308 + } 309 + if added := fake.Added(); len(added) != 0 { 310 + t.Fatalf("AddKnot calls = %v, want none", added) 311 + } 312 + }
+187
knot.go
··· 1 + package main 2 + 3 + // Knot event-stream subscriber. 4 + // 5 + // Tangled knot servers expose a websocket at ws[s]://<knot>/events that 6 + // streams JSON-wrapped record events for that knot, including the 7 + // sh.tangled.pipeline trigger records that drive CI. Pipeline triggers do 8 + // *not* come over the AT Proto firehose (jetstream); the knot publishes 9 + // them itself, so as a spindle we have to dial each knot whose repos have 10 + // pointed at us. 11 + // 12 + // We use tangled-core's `eventconsumer` package, which already handles 13 + // per-source connection management, retries, ordered processing and 14 + // cursor tracking. We hand it: 15 + // 16 + // 1. The initial set of knots, derived from previously-observed 17 + // sh.tangled.repo records that named us as their .spindle field. 18 + // 2. A ProcessFunc that, for now, simply logs every received event. 19 + // Once the build pipeline is wired up this is where pipeline 20 + // triggers will be translated into Buildkite builds. 21 + // 22 + // The jetstream consumer also gets a back-reference (via the knotAdder 23 + // interface) so it can dynamically subscribe to a new knot the moment a 24 + // matching sh.tangled.repo record arrives, without waiting for a tack 25 + // restart. 26 + 27 + import ( 28 + "context" 29 + "encoding/json" 30 + "fmt" 31 + "log/slog" 32 + 33 + "tangled.org/core/api/tangled" 34 + "tangled.org/core/eventconsumer" 35 + ) 36 + 37 + // KnotConsumer is the surface area the rest of tack uses to interact 38 + // with the knot event-stream subscriber. It exists so we can test 39 + // knot interactions from tests. 40 + // 41 + // Implementations must be safe for concurrent use: AddKnot is invoked 42 + // from the jetstream goroutine while the consumer's worker goroutines 43 + // are independently processing inbound knot events. 44 + // 45 + // The fake at knot_fake.go provides a no-network implementation suitable 46 + // for use from tests. 47 + type KnotConsumer interface { 48 + // AddKnot subscribes to the given knot's /events websocket if not 49 + // already subscribed. Calling with the same knot more than once is 50 + // a no-op. An empty knot string is ignored. The supplied context 51 + // scopes the dial; cancelling it tears the subscription down. 52 + AddKnot(ctx context.Context, knot string) 53 + } 54 + 55 + // knotConsumer is the production KnotConsumer. It wraps 56 + // eventconsumer.Consumer with the small surface the rest of tack actually 57 + // wants: AddKnot for dynamic subscription, plus a Stop lifecycle hook 58 + // owned by main. 59 + // 60 + // Wrapping (instead of exposing *eventconsumer.Consumer directly) keeps 61 + // callers from importing eventconsumer just to construct a KnotSource, 62 + // and lets us swap or extend the underlying transport later. 63 + type knotConsumer struct { 64 + c *eventconsumer.Consumer 65 + log *slog.Logger 66 + } 67 + 68 + // Compile-time interface conformance check. 69 + var _ KnotConsumer = (*knotConsumer)(nil) 70 + 71 + // startKnotConsumer builds a knot event consumer pre-loaded with every 72 + // knot already known to the store, starts its connection loops in the 73 + // background, and returns the wrapper. The consumer keeps running until 74 + // ctx is cancelled. 75 + // 76 + // Cursor persistence is intentionally in-memory for now: we only log 77 + // events, so re-receiving a few seconds of pipeline triggers after a 78 + // restart is harmless. When we start translating triggers into real 79 + // Buildkite builds, this should switch to a SQLite-backed cursor store 80 + // to avoid duplicate builds. 81 + func startKnotConsumer(ctx context.Context, cfg config, st *store) (*knotConsumer, error) { 82 + logger := loggerFrom(ctx).With("component", "knotconsumer") 83 + 84 + knots, err := st.KnotsForSpindle(ctx, cfg.Hostname) 85 + if err != nil { 86 + return nil, fmt.Errorf("load known knots: %w", err) 87 + } 88 + 89 + kc := &knotConsumer{log: logger} 90 + 91 + ccfg := eventconsumer.NewConsumerConfig() 92 + ccfg.Logger = logger 93 + ccfg.Dev = cfg.Dev 94 + ccfg.ProcessFunc = kc.process 95 + for _, k := range knots { 96 + ccfg.Sources[eventconsumer.NewKnotSource(k)] = struct{}{} 97 + logger.Info("seeding knot source", "knot", k) 98 + } 99 + kc.c = eventconsumer.NewConsumer(*ccfg) 100 + 101 + // Start workers + per-source connection loops. Consumer.Start is 102 + // non-blocking; the goroutines it spawns observe ctx for shutdown. 103 + kc.c.Start(ctx) 104 + logger.Info("knot consumer started", "initial_knots", len(knots)) 105 + 106 + return kc, nil 107 + } 108 + 109 + // AddKnot subscribes to a knot we hadn't been watching before. Safe to 110 + // call repeatedly: eventconsumer.Consumer.AddSource deduplicates by the 111 + // source's Key (the knot hostname), so passing the same knot twice is a 112 + // no-op. 113 + func (k *knotConsumer) AddKnot(ctx context.Context, knot string) { 114 + if knot == "" { 115 + return 116 + } 117 + k.log.Info("adding knot source", "knot", knot) 118 + k.c.AddSource(ctx, eventconsumer.NewKnotSource(knot)) 119 + } 120 + 121 + // Stop tears down all knot websocket connections and waits for the 122 + // consumer's goroutines to exit. It must be called exactly once. 123 + func (k *knotConsumer) Stop() { 124 + k.c.Stop() 125 + } 126 + 127 + // process is the ProcessFunc handed to eventconsumer. It runs once per 128 + // inbound message, on a worker goroutine. For now we only care about 129 + // pipeline records — everything else is logged at debug and dropped. 130 + // 131 + // Returning an error only logs it (the consumer keeps reading); the 132 + // cursor is advanced before the ProcessFunc runs, so a returned error 133 + // does *not* cause a replay. 134 + func (k *knotConsumer) process(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 135 + switch msg.Nsid { 136 + case tangled.PipelineNSID: 137 + var p tangled.Pipeline 138 + if err := json.Unmarshal(msg.EventJson, &p); err != nil { 139 + k.log.Error("decode pipeline", 140 + "err", err, 141 + "knot", src.Key(), 142 + "rkey", msg.Rkey, 143 + ) 144 + return err 145 + } 146 + 147 + // Pull a couple of fields out of the trigger metadata for log 148 + // context. They're all optional in the schema, so each one is 149 + // guarded — we want a noisy log entry, not a nil-deref. 150 + var ( 151 + triggerKind string 152 + repoDid string 153 + repoName string 154 + ) 155 + if p.TriggerMetadata != nil { 156 + triggerKind = p.TriggerMetadata.Kind 157 + if p.TriggerMetadata.Repo != nil { 158 + repoDid = p.TriggerMetadata.Repo.Did 159 + if p.TriggerMetadata.Repo.Repo != nil { 160 + repoName = *p.TriggerMetadata.Repo.Repo 161 + } 162 + } 163 + } 164 + 165 + k.log.Info("pipeline event", 166 + "knot", src.Key(), 167 + "rkey", msg.Rkey, 168 + "trigger", triggerKind, 169 + "repo_did", repoDid, 170 + "repo", repoName, 171 + "workflows", len(p.Workflows), 172 + ) 173 + 174 + default: 175 + // Knots may publish other record types over the same stream; we 176 + // don't care about them yet. Debug-only so it's available when 177 + // chasing "why isn't my pipeline firing" but doesn't drown out 178 + // info-level logs. 179 + k.log.Debug("ignored knot event", 180 + "knot", src.Key(), 181 + "nsid", msg.Nsid, 182 + "rkey", msg.Rkey, 183 + ) 184 + } 185 + 186 + return nil 187 + }
+44
knot_fake.go
··· 1 + package main 2 + 3 + // Test fake for KnotConsumer. 4 + // 5 + // Lives in a non-_test.go file so it can be referenced from tests across 6 + // any future test files (and, if we ever split tack into subpackages, can 7 + // be promoted to an exported helper without moving code around). 8 + // 9 + // It does no I/O: AddKnot just records the knot it was handed so tests 10 + // can assert on the side effect. 11 + 12 + import ( 13 + "context" 14 + "sync" 15 + ) 16 + 17 + // fakeKnotConsumer is an in-memory KnotConsumer suitable for tests. The 18 + // zero value is ready to use; concurrent calls to AddKnot are safe. 19 + type fakeKnotConsumer struct { 20 + mu sync.Mutex 21 + added []string 22 + } 23 + 24 + // Compile-time interface conformance check — keeps the fake honest if 25 + // the KnotConsumer surface ever grows a new method. 26 + var _ KnotConsumer = (*fakeKnotConsumer)(nil) 27 + 28 + // AddKnot records the knot for later inspection via Added(). 29 + func (f *fakeKnotConsumer) AddKnot(_ context.Context, knot string) { 30 + f.mu.Lock() 31 + defer f.mu.Unlock() 32 + f.added = append(f.added, knot) 33 + } 34 + 35 + // Added returns a copy of the knots passed to AddKnot, in call order. 36 + // A copy is returned so callers can't accidentally mutate the fake's 37 + // internal slice while comparing. 38 + func (f *fakeKnotConsumer) Added() []string { 39 + f.mu.Lock() 40 + defer f.mu.Unlock() 41 + out := make([]string, len(f.added)) 42 + copy(out, f.added) 43 + return out 44 + }
+29 -2
main.go
··· 21 21 // spindle without surprises. 22 22 type config struct { 23 23 Addr string 24 + Hostname string 24 25 OwnerDID string 25 26 JetstreamURL string 26 27 DBPath string 28 + // Dev flips the knot event-stream scheme from wss:// to ws://. 29 + // Useful when running against a local knot during development. 30 + Dev bool 27 31 } 28 32 29 33 func loadConfig() (config, error) { 30 34 cfg := config{ 31 35 Addr: envOr("TACK_LISTEN_ADDR", ":8080"), 36 + Hostname: os.Getenv("TACK_HOSTNAME"), 32 37 OwnerDID: os.Getenv("TACK_OWNER_DID"), 33 38 JetstreamURL: envOr("TACK_JETSTREAM_URL", "wss://jetstream1.us-west.bsky.network/subscribe"), 34 39 DBPath: envOr("TACK_DB_PATH", "tack.db"), 40 + Dev: os.Getenv("TACK_DEV") != "", 35 41 } 36 42 addrFlag := flag.String("addr", cfg.Addr, "HTTP listen address (overrides TACK_LISTEN_ADDR)") 37 43 flag.Parse() ··· 40 46 if cfg.OwnerDID == "" { 41 47 return cfg, errors.New("TACK_OWNER_DID is required") 42 48 } 49 + 50 + // Hostname identifies *us* in sh.tangled.repo records (the .spindle 51 + // field). Without it we have no way to know which repos point at us 52 + // and therefore which knots we should subscribe to for pipeline 53 + // triggers — so we refuse to start rather than silently subscribe to 54 + // nothing. 55 + if cfg.Hostname == "" { 56 + return cfg, errors.New("TACK_HOSTNAME is required") 57 + } 58 + 43 59 return cfg, nil 44 60 } 45 61 ··· 92 108 }() 93 109 logger.Info("store open", "path", cfg.DBPath) 94 110 95 - // Start the JetStream listener in the background. 96 - if err := startJetstream(ctx, cfg, st); err != nil { 111 + // Start the knot event-stream consumer first so the jetstream 112 + // loop has somewhere to register newly-observed knots into. 113 + knots, err := startKnotConsumer(ctx, cfg, st) 114 + if err != nil { 115 + logger.Error("failed to start knot consumer", "err", err) 116 + os.Exit(1) 117 + } 118 + defer knots.Stop() 119 + 120 + // Start the JetStream listener in the background. It hands the knot 121 + // consumer any new knot referenced by an incoming sh.tangled.repo 122 + // record so we don't have to wait for a restart to pick it up. 123 + if err := startJetstream(ctx, cfg, st, knots); err != nil { 97 124 logger.Error("failed to start jetstream consumer", "err", err) 98 125 os.Exit(1) 99 126 }
+30
store.go
··· 194 194 return nil 195 195 } 196 196 197 + // KnotsForSpindle returns the distinct knot hostnames of all repos that 198 + // have declared the given spindle hostname as their CI spindle. The knot 199 + // event-stream subscriber uses this to decide which knots to dial. 200 + // 201 + // Returns an empty slice (not nil) when nothing matches, so callers can 202 + // range over the result without a nil check. 203 + func (s *store) KnotsForSpindle(ctx context.Context, hostname string) ([]string, error) { 204 + rows, err := s.db.QueryContext(ctx, 205 + `SELECT DISTINCT knot FROM repos WHERE spindle = ? AND knot <> ''`, 206 + hostname, 207 + ) 208 + if err != nil { 209 + return nil, fmt.Errorf("query knots: %w", err) 210 + } 211 + defer rows.Close() 212 + 213 + out := []string{} 214 + for rows.Next() { 215 + var k string 216 + if err := rows.Scan(&k); err != nil { 217 + return nil, fmt.Errorf("scan knot: %w", err) 218 + } 219 + out = append(out, k) 220 + } 221 + if err := rows.Err(); err != nil { 222 + return nil, fmt.Errorf("iterate knots: %w", err) 223 + } 224 + return out, nil 225 + } 226 + 197 227 // DeleteRepoCollaborator removes a collaborator record by its ATProto 198 228 // identity. 199 229 func (s *store) DeleteRepoCollaborator(ctx context.Context, did, rkey string) error {
+45
store_test.go
··· 239 239 } 240 240 } 241 241 242 + // TestKnotsForSpindle verifies the query returns only knots from repos 243 + // whose .spindle field matches the given hostname, and that duplicate 244 + // knots collapse to a single entry. 245 + func TestKnotsForSpindle(t *testing.T) { 246 + s := newTestStore(t) 247 + ctx := context.Background() 248 + 249 + const ours = "tack.example" 250 + const other = "other.example" 251 + 252 + // Two repos on the same knot pointing at us — should collapse to 1. 253 + if err := s.UpsertRepo(ctx, "did:plc:a", "rk1", "knot1.example", "repo-a", ours, "", "t"); err != nil { 254 + t.Fatal(err) 255 + } 256 + if err := s.UpsertRepo(ctx, "did:plc:b", "rk2", "knot1.example", "repo-b", ours, "", "t"); err != nil { 257 + t.Fatal(err) 258 + } 259 + // A second knot pointing at us. 260 + if err := s.UpsertRepo(ctx, "did:plc:c", "rk3", "knot2.example", "repo-c", ours, "", "t"); err != nil { 261 + t.Fatal(err) 262 + } 263 + // A repo pointing at a different spindle — must be excluded. 264 + if err := s.UpsertRepo(ctx, "did:plc:d", "rk4", "knot3.example", "repo-d", other, "", "t"); err != nil { 265 + t.Fatal(err) 266 + } 267 + // A repo with no spindle declared — must be excluded. 268 + if err := s.UpsertRepo(ctx, "did:plc:e", "rk5", "knot4.example", "repo-e", "", "", "t"); err != nil { 269 + t.Fatal(err) 270 + } 271 + 272 + got, err := s.KnotsForSpindle(ctx, ours) 273 + if err != nil { 274 + t.Fatalf("KnotsForSpindle: %v", err) 275 + } 276 + want := map[string]struct{}{"knot1.example": {}, "knot2.example": {}} 277 + if len(got) != len(want) { 278 + t.Fatalf("got %v, want %v", got, want) 279 + } 280 + for _, k := range got { 281 + if _, ok := want[k]; !ok { 282 + t.Fatalf("unexpected knot %q in %v", k, got) 283 + } 284 + } 285 + } 286 + 242 287 // countRows is a small SELECT COUNT(*) helper used by lifecycle tests 243 288 // to verify deletes actually removed the row. Table name is interpolated 244 289 // directly because callers pass a constant from the schema, not user