forked from
mitchellh.com/tack
Stitch any CI into Tangled
1package main
2
3// Tests for handleJetstreamEvent. We exercise the full path — collection
4// dispatch, record decoding, store mutation, and cursor advancement —
5// using an in-memory store from store_test.go's newTestStore helper.
6//
7// Events are constructed by hand rather than recorded from a live
8// jetstream so the tests don't need network access and stay fast.
9
10import (
11 "context"
12 "encoding/json"
13 "testing"
14
15 jsmodels "github.com/bluesky-social/jetstream/pkg/models"
16 "tangled.org/core/api/tangled"
17)
18
19// commitEvent builds a jetstream commit event for tests. timeUS is the
20// cursor value the handler should persist after applying. record can be
21// nil for delete operations, which carry no body.
22func commitEvent(timeUS int64, did, collection, op, rkey string, record any) *jsmodels.Event {
23 var raw json.RawMessage
24 if record != nil {
25 b, err := json.Marshal(record)
26 if err != nil {
27 panic(err) // test helper — callers control the input
28 }
29 raw = b
30 }
31 return &jsmodels.Event{
32 Did: did,
33 TimeUS: timeUS,
34 Kind: jsmodels.EventKindCommit,
35 Commit: &jsmodels.Commit{
36 Operation: op,
37 Collection: collection,
38 RKey: rkey,
39 Record: raw,
40 },
41 }
42}
43
44// requireCursor asserts the persisted cursor equals want. Pulled out
45// because almost every test in this file checks it.
46func requireCursor(t *testing.T, s *store, want int64) {
47 t.Helper()
48 got, err := s.LoadCursor(context.Background())
49 if err != nil {
50 t.Fatalf("load cursor: %v", err)
51 }
52 if got == nil || *got != want {
53 t.Fatalf("cursor = %v, want %d", got, want)
54 }
55}
56
57// TestHandleNonCommitEvent confirms account/identity events are ignored
58// without error and without advancing the cursor — they don't have a
59// TimeUS we want to commit to.
60func TestHandleNonCommitEvent(t *testing.T) {
61 s := newTestStore(t)
62 ctx := context.Background()
63
64 evt := &jsmodels.Event{
65 Did: "did:plc:foo",
66 TimeUS: 100,
67 Kind: jsmodels.EventKindAccount,
68 }
69 if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil {
70 t.Fatalf("handle: %v", err)
71 }
72 got, err := s.LoadCursor(ctx)
73 if err != nil {
74 t.Fatalf("load cursor: %v", err)
75 }
76 if got != nil {
77 t.Fatalf("expected no cursor for non-commit, got %d", *got)
78 }
79}
80
81// TestHandleSpindleMemberCreate exercises the happy path: a create
82// commit lands a row in spindle_members and advances the cursor.
83func TestHandleSpindleMemberCreate(t *testing.T) {
84 s := newTestStore(t)
85 ctx := context.Background()
86
87 rec := tangled.SpindleMember{
88 Instance: "https://spindle.example",
89 Subject: "did:plc:alice",
90 CreatedAt: "2026-01-01T00:00:00Z",
91 }
92 evt := commitEvent(12345, "did:plc:owner", tangled.SpindleMemberNSID, jsOpCreate, "rk1", rec)
93 if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil {
94 t.Fatalf("handle: %v", err)
95 }
96
97 var instance, subject string
98 err := s.db.QueryRowContext(ctx,
99 `SELECT instance, subject FROM spindle_members WHERE did = ? AND rkey = ?`,
100 "did:plc:owner", "rk1",
101 ).Scan(&instance, &subject)
102 if err != nil {
103 t.Fatalf("query: %v", err)
104 }
105 if instance != "https://spindle.example" || subject != "did:plc:alice" {
106 t.Fatalf("got (%q,%q)", instance, subject)
107 }
108 requireCursor(t, s, 12345)
109}
110
111// TestHandleSpindleMemberDelete confirms a delete commit removes the
112// previously-persisted row.
113func TestHandleSpindleMemberDelete(t *testing.T) {
114 s := newTestStore(t)
115 ctx := context.Background()
116
117 if err := s.UpsertSpindleMember(ctx, "did:plc:owner", "rk1", "i", "s", "t"); err != nil {
118 t.Fatalf("seed: %v", err)
119 }
120 evt := commitEvent(99, "did:plc:owner", tangled.SpindleMemberNSID, jsOpDelete, "rk1", nil)
121 if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil {
122 t.Fatalf("handle: %v", err)
123 }
124 if n := countRows(t, s, "spindle_members"); n != 0 {
125 t.Fatalf("after delete: %d rows, want 0", n)
126 }
127 requireCursor(t, s, 99)
128}
129
130// TestHandleRepoCreateOptionals verifies pointer-typed optional fields
131// on tangled.Repo are derefed correctly (and nils become empty strings)
132// when written to the store.
133func TestHandleRepoCreateOptionals(t *testing.T) {
134 s := newTestStore(t)
135 ctx := context.Background()
136
137 spindle := "https://spindle.example"
138 repoDid := "did:plc:repo"
139 rec := tangled.Repo{
140 Knot: "knot.example",
141 Name: "myrepo",
142 Spindle: &spindle,
143 RepoDid: &repoDid,
144 CreatedAt: "2026-01-01T00:00:00Z",
145 }
146 evt := commitEvent(7, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "repo1", rec)
147 if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil {
148 t.Fatalf("handle: %v", err)
149 }
150
151 var gotSpindle, gotRepoDid string
152 err := s.db.QueryRowContext(ctx,
153 `SELECT spindle, repo_did FROM repos WHERE did = ? AND rkey = ?`,
154 "did:plc:owner", "repo1",
155 ).Scan(&gotSpindle, &gotRepoDid)
156 if err != nil {
157 t.Fatalf("query: %v", err)
158 }
159 if gotSpindle != spindle || gotRepoDid != repoDid {
160 t.Fatalf("got (%q,%q)", gotSpindle, gotRepoDid)
161 }
162
163 // Now a record with both optionals nil — should land as empty
164 // strings, not crash on a nil dereference in deref().
165 rec2 := tangled.Repo{
166 Knot: "knot.example",
167 Name: "other",
168 CreatedAt: "2026-01-01T00:00:00Z",
169 }
170 evt2 := commitEvent(8, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "repo2", rec2)
171 if err := handleJetstreamEvent(ctx, s, nil, "", evt2); err != nil {
172 t.Fatalf("handle nil-optionals: %v", err)
173 }
174 err = s.db.QueryRowContext(ctx,
175 `SELECT spindle, repo_did FROM repos WHERE did = ? AND rkey = ?`,
176 "did:plc:owner", "repo2",
177 ).Scan(&gotSpindle, &gotRepoDid)
178 if err != nil {
179 t.Fatalf("query nil-optionals: %v", err)
180 }
181 if gotSpindle != "" || gotRepoDid != "" {
182 t.Fatalf("nil optionals: got (%q,%q), want both empty", gotSpindle, gotRepoDid)
183 }
184 requireCursor(t, s, 8)
185}
186
187// TestHandleRepoCollaboratorCreate covers the third dispatch arm so each
188// collection has at least one apply test.
189func TestHandleRepoCollaboratorCreate(t *testing.T) {
190 s := newTestStore(t)
191 ctx := context.Background()
192
193 repo := "myrepo"
194 rec := tangled.RepoCollaborator{
195 Repo: &repo,
196 Subject: "did:plc:carol",
197 CreatedAt: "2026-01-01T00:00:00Z",
198 }
199 evt := commitEvent(55, "did:plc:owner", tangled.RepoCollaboratorNSID, jsOpCreate, "c1", rec)
200 if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil {
201 t.Fatalf("handle: %v", err)
202 }
203
204 var subject string
205 err := s.db.QueryRowContext(ctx,
206 `SELECT subject FROM repo_collaborators WHERE did = ? AND rkey = ?`,
207 "did:plc:owner", "c1",
208 ).Scan(&subject)
209 if err != nil {
210 t.Fatalf("query: %v", err)
211 }
212 if subject != "did:plc:carol" {
213 t.Fatalf("subject = %q", subject)
214 }
215 requireCursor(t, s, 55)
216}
217
218// TestHandleUnknownCollection makes sure a collection we didn't ask for
219// (jetstream filter changes, schema drift) is silently dropped — no
220// error, no row, but cursor still advances so we don't replay it.
221func TestHandleUnknownCollection(t *testing.T) {
222 s := newTestStore(t)
223 ctx := context.Background()
224
225 evt := commitEvent(42, "did:plc:owner", "app.bsky.feed.post", jsOpCreate, "rk", map[string]string{"text": "hi"})
226 if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil {
227 t.Fatalf("handle: %v", err)
228 }
229 requireCursor(t, s, 42)
230}
231
232// TestHandleBadRecordAdvancesCursor is the failure-mode counterpart to
233// the happy paths: a malformed record body must be logged-and-skipped
234// (not returned as an error that pauses the scheduler) and the cursor
235// must still advance so we don't loop on the same bad event forever.
236func TestHandleBadRecordAdvancesCursor(t *testing.T) {
237 s := newTestStore(t)
238 ctx := context.Background()
239
240 evt := &jsmodels.Event{
241 Did: "did:plc:owner",
242 TimeUS: 1000,
243 Kind: jsmodels.EventKindCommit,
244 Commit: &jsmodels.Commit{
245 Operation: jsOpCreate,
246 Collection: tangled.SpindleMemberNSID,
247 RKey: "broken",
248 Record: json.RawMessage(`{not valid json`),
249 },
250 }
251 if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil {
252 t.Fatalf("handle should swallow decode error, got: %v", err)
253 }
254 if n := countRows(t, s, "spindle_members"); n != 0 {
255 t.Fatalf("bad record should not have inserted; got %d rows", n)
256 }
257 requireCursor(t, s, 1000)
258}
259
260// TestRepoEventSubscribesKnotForOurSpindle confirms that observing a
261// sh.tangled.repo whose .spindle field equals our hostname results in a
262// dynamic AddKnot call. This is the hot path for picking up new repos
263// without a tack restart.
264func TestRepoEventSubscribesKnotForOurSpindle(t *testing.T) {
265 s := newTestStore(t)
266 ctx := context.Background()
267
268 const ours = "tack.example"
269 spindle := ours
270 rec := tangled.Repo{
271 Knot: "knot.example",
272 Name: "myrepo",
273 Spindle: &spindle,
274 CreatedAt: "2026-01-01T00:00:00Z",
275 }
276 evt := commitEvent(1, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "rk", rec)
277
278 fake := &fakeKnotConsumer{}
279 if err := handleJetstreamEvent(ctx, s, fake, ours, evt); err != nil {
280 t.Fatalf("handle: %v", err)
281 }
282 added := fake.Added()
283 if len(added) != 1 || added[0] != "knot.example" {
284 t.Fatalf("AddKnot calls = %v, want [knot.example]", added)
285 }
286}
287
288// TestRepoEventIgnoresKnotForOtherSpindle confirms repos pointing at a
289// *different* spindle do not pull us into watching their knot. Without
290// this guard, tack would dial every knot named in any sh.tangled.repo
291// it sees over the firehose, which is most of them.
292func TestRepoEventIgnoresKnotForOtherSpindle(t *testing.T) {
293 s := newTestStore(t)
294 ctx := context.Background()
295
296 other := "other-spindle.example"
297 rec := tangled.Repo{
298 Knot: "knot.example",
299 Name: "myrepo",
300 Spindle: &other,
301 CreatedAt: "2026-01-01T00:00:00Z",
302 }
303 evt := commitEvent(1, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "rk", rec)
304
305 fake := &fakeKnotConsumer{}
306 if err := handleJetstreamEvent(ctx, s, fake, "tack.example", evt); err != nil {
307 t.Fatalf("handle: %v", err)
308 }
309 if added := fake.Added(); len(added) != 0 {
310 t.Fatalf("AddKnot calls = %v, want none", added)
311 }
312}
313
314// TestRepoUpdateSpindleAwayFromUsRemovesKnot covers the case where a
315// repo we'd previously been watching gets its .spindle field flipped to
316// some other spindle. Once that's the only repo we had on that knot,
317// the reconciliation must call RemoveKnot.
318func TestRepoUpdateSpindleAwayFromUsRemovesKnot(t *testing.T) {
319 s := newTestStore(t)
320 ctx := context.Background()
321
322 const ours = "tack.example"
323 const knot = "knot.example"
324
325 // Seed: a repo that names us as its spindle on `knot`.
326 if err := s.UpsertRepo(ctx, "did:plc:a", "rk", knot, "repo-a", ours, "", "t"); err != nil {
327 t.Fatal(err)
328 }
329
330 // Update: same record, now points at a different spindle.
331 other := "other.example"
332 rec := tangled.Repo{
333 Knot: knot,
334 Name: "repo-a",
335 Spindle: &other,
336 CreatedAt: "2026-01-01T00:00:00Z",
337 }
338 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpUpdate, "rk", rec)
339
340 fake := &fakeKnotConsumer{}
341 if err := handleJetstreamEvent(ctx, s, fake, ours, evt); err != nil {
342 t.Fatalf("handle: %v", err)
343 }
344 if added := fake.Added(); len(added) != 0 {
345 t.Fatalf("AddKnot calls = %v, want none", added)
346 }
347 if removed := fake.Removed(); len(removed) != 1 || removed[0] != knot {
348 t.Fatalf("RemoveKnot calls = %v, want [%s]", removed, knot)
349 }
350}
351
352// TestRepoUpdateSpindleAwayFromUsKeepsKnotIfShared ensures we don't
353// over-eagerly unsubscribe from a knot when one of multiple repos on
354// that knot moves away from us. The other repo's subscription must keep
355// the knot in our wanted set.
356func TestRepoUpdateSpindleAwayFromUsKeepsKnotIfShared(t *testing.T) {
357 s := newTestStore(t)
358 ctx := context.Background()
359
360 const ours = "tack.example"
361 const knot = "knot.example"
362
363 // Two repos sharing one knot, both pointed at us.
364 if err := s.UpsertRepo(ctx, "did:plc:a", "rk1", knot, "a", ours, "", "t"); err != nil {
365 t.Fatal(err)
366 }
367 if err := s.UpsertRepo(ctx, "did:plc:b", "rk2", knot, "b", ours, "", "t"); err != nil {
368 t.Fatal(err)
369 }
370
371 // Repo A flips to a different spindle. B still wants us.
372 other := "other.example"
373 rec := tangled.Repo{
374 Knot: knot,
375 Name: "a",
376 Spindle: &other,
377 CreatedAt: "2026-01-01T00:00:00Z",
378 }
379 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpUpdate, "rk1", rec)
380
381 fake := &fakeKnotConsumer{}
382 if err := handleJetstreamEvent(ctx, s, fake, ours, evt); err != nil {
383 t.Fatalf("handle: %v", err)
384 }
385 if removed := fake.Removed(); len(removed) != 0 {
386 t.Fatalf("RemoveKnot calls = %v, want none (B still wants us on %s)", removed, knot)
387 }
388}
389
390// TestRepoUpdateChangingKnotSwapsSubscription verifies that a repo
391// staying with us but changing its .knot field unsubscribes the old
392// knot (if no other repo holds it) and subscribes the new one.
393func TestRepoUpdateChangingKnotSwapsSubscription(t *testing.T) {
394 s := newTestStore(t)
395 ctx := context.Background()
396
397 const ours = "tack.example"
398 const oldKnot = "old.example"
399 const newKnot = "new.example"
400
401 if err := s.UpsertRepo(ctx, "did:plc:a", "rk", oldKnot, "a", ours, "", "t"); err != nil {
402 t.Fatal(err)
403 }
404
405 spindle := ours
406 rec := tangled.Repo{
407 Knot: newKnot,
408 Name: "a",
409 Spindle: &spindle,
410 CreatedAt: "2026-01-01T00:00:00Z",
411 }
412 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpUpdate, "rk", rec)
413
414 fake := &fakeKnotConsumer{}
415 if err := handleJetstreamEvent(ctx, s, fake, ours, evt); err != nil {
416 t.Fatalf("handle: %v", err)
417 }
418 if added := fake.Added(); len(added) != 1 || added[0] != newKnot {
419 t.Fatalf("AddKnot calls = %v, want [%s]", added, newKnot)
420 }
421 if removed := fake.Removed(); len(removed) != 1 || removed[0] != oldKnot {
422 t.Fatalf("RemoveKnot calls = %v, want [%s]", removed, oldKnot)
423 }
424}
425
426// TestRepoDeleteRemovesKnotWhenLast confirms deleting the last repo on
427// a knot we cared about triggers RemoveKnot.
428func TestRepoDeleteRemovesKnotWhenLast(t *testing.T) {
429 s := newTestStore(t)
430 ctx := context.Background()
431
432 const ours = "tack.example"
433 const knot = "knot.example"
434
435 if err := s.UpsertRepo(ctx, "did:plc:a", "rk", knot, "a", ours, "", "t"); err != nil {
436 t.Fatal(err)
437 }
438 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpDelete, "rk", nil)
439
440 fake := &fakeKnotConsumer{}
441 if err := handleJetstreamEvent(ctx, s, fake, ours, evt); err != nil {
442 t.Fatalf("handle: %v", err)
443 }
444 if removed := fake.Removed(); len(removed) != 1 || removed[0] != knot {
445 t.Fatalf("RemoveKnot calls = %v, want [%s]", removed, knot)
446 }
447}
448
449// TestRepoDeleteKeepsKnotIfShared ensures deleting one of multiple
450// repos on a knot does not unsubscribe — the survivors still want it.
451func TestRepoDeleteKeepsKnotIfShared(t *testing.T) {
452 s := newTestStore(t)
453 ctx := context.Background()
454
455 const ours = "tack.example"
456 const knot = "knot.example"
457
458 if err := s.UpsertRepo(ctx, "did:plc:a", "rk1", knot, "a", ours, "", "t"); err != nil {
459 t.Fatal(err)
460 }
461 if err := s.UpsertRepo(ctx, "did:plc:b", "rk2", knot, "b", ours, "", "t"); err != nil {
462 t.Fatal(err)
463 }
464 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpDelete, "rk1", nil)
465
466 fake := &fakeKnotConsumer{}
467 if err := handleJetstreamEvent(ctx, s, fake, ours, evt); err != nil {
468 t.Fatalf("handle: %v", err)
469 }
470 if removed := fake.Removed(); len(removed) != 0 {
471 t.Fatalf("RemoveKnot calls = %v, want none", removed)
472 }
473}