Stitch any CI into Tangled
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 473 lines 15 kB view raw
1package main 2 3// Tests for handleJetstreamEvent. We exercise the full path — collection 4// dispatch, record decoding, store mutation, and cursor advancement — 5// using an in-memory store from store_test.go's newTestStore helper. 6// 7// Events are constructed by hand rather than recorded from a live 8// jetstream so the tests don't need network access and stay fast. 9 10import ( 11 "context" 12 "encoding/json" 13 "testing" 14 15 jsmodels "github.com/bluesky-social/jetstream/pkg/models" 16 "tangled.org/core/api/tangled" 17) 18 19// commitEvent builds a jetstream commit event for tests. timeUS is the 20// cursor value the handler should persist after applying. record can be 21// nil for delete operations, which carry no body. 22func commitEvent(timeUS int64, did, collection, op, rkey string, record any) *jsmodels.Event { 23 var raw json.RawMessage 24 if record != nil { 25 b, err := json.Marshal(record) 26 if err != nil { 27 panic(err) // test helper — callers control the input 28 } 29 raw = b 30 } 31 return &jsmodels.Event{ 32 Did: did, 33 TimeUS: timeUS, 34 Kind: jsmodels.EventKindCommit, 35 Commit: &jsmodels.Commit{ 36 Operation: op, 37 Collection: collection, 38 RKey: rkey, 39 Record: raw, 40 }, 41 } 42} 43 44// requireCursor asserts the persisted cursor equals want. Pulled out 45// because almost every test in this file checks it. 46func requireCursor(t *testing.T, s *store, want int64) { 47 t.Helper() 48 got, err := s.LoadCursor(context.Background()) 49 if err != nil { 50 t.Fatalf("load cursor: %v", err) 51 } 52 if got == nil || *got != want { 53 t.Fatalf("cursor = %v, want %d", got, want) 54 } 55} 56 57// TestHandleNonCommitEvent confirms account/identity events are ignored 58// without error and without advancing the cursor — they don't have a 59// TimeUS we want to commit to. 60func TestHandleNonCommitEvent(t *testing.T) { 61 s := newTestStore(t) 62 ctx := context.Background() 63 64 evt := &jsmodels.Event{ 65 Did: "did:plc:foo", 66 TimeUS: 100, 67 Kind: jsmodels.EventKindAccount, 68 } 69 if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil { 70 t.Fatalf("handle: %v", err) 71 } 72 got, err := s.LoadCursor(ctx) 73 if err != nil { 74 t.Fatalf("load cursor: %v", err) 75 } 76 if got != nil { 77 t.Fatalf("expected no cursor for non-commit, got %d", *got) 78 } 79} 80 81// TestHandleSpindleMemberCreate exercises the happy path: a create 82// commit lands a row in spindle_members and advances the cursor. 83func TestHandleSpindleMemberCreate(t *testing.T) { 84 s := newTestStore(t) 85 ctx := context.Background() 86 87 rec := tangled.SpindleMember{ 88 Instance: "https://spindle.example", 89 Subject: "did:plc:alice", 90 CreatedAt: "2026-01-01T00:00:00Z", 91 } 92 evt := commitEvent(12345, "did:plc:owner", tangled.SpindleMemberNSID, jsOpCreate, "rk1", rec) 93 if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil { 94 t.Fatalf("handle: %v", err) 95 } 96 97 var instance, subject string 98 err := s.db.QueryRowContext(ctx, 99 `SELECT instance, subject FROM spindle_members WHERE did = ? AND rkey = ?`, 100 "did:plc:owner", "rk1", 101 ).Scan(&instance, &subject) 102 if err != nil { 103 t.Fatalf("query: %v", err) 104 } 105 if instance != "https://spindle.example" || subject != "did:plc:alice" { 106 t.Fatalf("got (%q,%q)", instance, subject) 107 } 108 requireCursor(t, s, 12345) 109} 110 111// TestHandleSpindleMemberDelete confirms a delete commit removes the 112// previously-persisted row. 113func TestHandleSpindleMemberDelete(t *testing.T) { 114 s := newTestStore(t) 115 ctx := context.Background() 116 117 if err := s.UpsertSpindleMember(ctx, "did:plc:owner", "rk1", "i", "s", "t"); err != nil { 118 t.Fatalf("seed: %v", err) 119 } 120 evt := commitEvent(99, "did:plc:owner", tangled.SpindleMemberNSID, jsOpDelete, "rk1", nil) 121 if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil { 122 t.Fatalf("handle: %v", err) 123 } 124 if n := countRows(t, s, "spindle_members"); n != 0 { 125 t.Fatalf("after delete: %d rows, want 0", n) 126 } 127 requireCursor(t, s, 99) 128} 129 130// TestHandleRepoCreateOptionals verifies pointer-typed optional fields 131// on tangled.Repo are derefed correctly (and nils become empty strings) 132// when written to the store. 133func TestHandleRepoCreateOptionals(t *testing.T) { 134 s := newTestStore(t) 135 ctx := context.Background() 136 137 spindle := "https://spindle.example" 138 repoDid := "did:plc:repo" 139 rec := tangled.Repo{ 140 Knot: "knot.example", 141 Name: "myrepo", 142 Spindle: &spindle, 143 RepoDid: &repoDid, 144 CreatedAt: "2026-01-01T00:00:00Z", 145 } 146 evt := commitEvent(7, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "repo1", rec) 147 if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil { 148 t.Fatalf("handle: %v", err) 149 } 150 151 var gotSpindle, gotRepoDid string 152 err := s.db.QueryRowContext(ctx, 153 `SELECT spindle, repo_did FROM repos WHERE did = ? AND rkey = ?`, 154 "did:plc:owner", "repo1", 155 ).Scan(&gotSpindle, &gotRepoDid) 156 if err != nil { 157 t.Fatalf("query: %v", err) 158 } 159 if gotSpindle != spindle || gotRepoDid != repoDid { 160 t.Fatalf("got (%q,%q)", gotSpindle, gotRepoDid) 161 } 162 163 // Now a record with both optionals nil — should land as empty 164 // strings, not crash on a nil dereference in deref(). 165 rec2 := tangled.Repo{ 166 Knot: "knot.example", 167 Name: "other", 168 CreatedAt: "2026-01-01T00:00:00Z", 169 } 170 evt2 := commitEvent(8, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "repo2", rec2) 171 if err := handleJetstreamEvent(ctx, s, nil, "", evt2); err != nil { 172 t.Fatalf("handle nil-optionals: %v", err) 173 } 174 err = s.db.QueryRowContext(ctx, 175 `SELECT spindle, repo_did FROM repos WHERE did = ? AND rkey = ?`, 176 "did:plc:owner", "repo2", 177 ).Scan(&gotSpindle, &gotRepoDid) 178 if err != nil { 179 t.Fatalf("query nil-optionals: %v", err) 180 } 181 if gotSpindle != "" || gotRepoDid != "" { 182 t.Fatalf("nil optionals: got (%q,%q), want both empty", gotSpindle, gotRepoDid) 183 } 184 requireCursor(t, s, 8) 185} 186 187// TestHandleRepoCollaboratorCreate covers the third dispatch arm so each 188// collection has at least one apply test. 189func TestHandleRepoCollaboratorCreate(t *testing.T) { 190 s := newTestStore(t) 191 ctx := context.Background() 192 193 repo := "myrepo" 194 rec := tangled.RepoCollaborator{ 195 Repo: &repo, 196 Subject: "did:plc:carol", 197 CreatedAt: "2026-01-01T00:00:00Z", 198 } 199 evt := commitEvent(55, "did:plc:owner", tangled.RepoCollaboratorNSID, jsOpCreate, "c1", rec) 200 if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil { 201 t.Fatalf("handle: %v", err) 202 } 203 204 var subject string 205 err := s.db.QueryRowContext(ctx, 206 `SELECT subject FROM repo_collaborators WHERE did = ? AND rkey = ?`, 207 "did:plc:owner", "c1", 208 ).Scan(&subject) 209 if err != nil { 210 t.Fatalf("query: %v", err) 211 } 212 if subject != "did:plc:carol" { 213 t.Fatalf("subject = %q", subject) 214 } 215 requireCursor(t, s, 55) 216} 217 218// TestHandleUnknownCollection makes sure a collection we didn't ask for 219// (jetstream filter changes, schema drift) is silently dropped — no 220// error, no row, but cursor still advances so we don't replay it. 221func TestHandleUnknownCollection(t *testing.T) { 222 s := newTestStore(t) 223 ctx := context.Background() 224 225 evt := commitEvent(42, "did:plc:owner", "app.bsky.feed.post", jsOpCreate, "rk", map[string]string{"text": "hi"}) 226 if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil { 227 t.Fatalf("handle: %v", err) 228 } 229 requireCursor(t, s, 42) 230} 231 232// TestHandleBadRecordAdvancesCursor is the failure-mode counterpart to 233// the happy paths: a malformed record body must be logged-and-skipped 234// (not returned as an error that pauses the scheduler) and the cursor 235// must still advance so we don't loop on the same bad event forever. 236func TestHandleBadRecordAdvancesCursor(t *testing.T) { 237 s := newTestStore(t) 238 ctx := context.Background() 239 240 evt := &jsmodels.Event{ 241 Did: "did:plc:owner", 242 TimeUS: 1000, 243 Kind: jsmodels.EventKindCommit, 244 Commit: &jsmodels.Commit{ 245 Operation: jsOpCreate, 246 Collection: tangled.SpindleMemberNSID, 247 RKey: "broken", 248 Record: json.RawMessage(`{not valid json`), 249 }, 250 } 251 if err := handleJetstreamEvent(ctx, s, nil, "", evt); err != nil { 252 t.Fatalf("handle should swallow decode error, got: %v", err) 253 } 254 if n := countRows(t, s, "spindle_members"); n != 0 { 255 t.Fatalf("bad record should not have inserted; got %d rows", n) 256 } 257 requireCursor(t, s, 1000) 258} 259 260// TestRepoEventSubscribesKnotForOurSpindle confirms that observing a 261// sh.tangled.repo whose .spindle field equals our hostname results in a 262// dynamic AddKnot call. This is the hot path for picking up new repos 263// without a tack restart. 264func TestRepoEventSubscribesKnotForOurSpindle(t *testing.T) { 265 s := newTestStore(t) 266 ctx := context.Background() 267 268 const ours = "tack.example" 269 spindle := ours 270 rec := tangled.Repo{ 271 Knot: "knot.example", 272 Name: "myrepo", 273 Spindle: &spindle, 274 CreatedAt: "2026-01-01T00:00:00Z", 275 } 276 evt := commitEvent(1, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "rk", rec) 277 278 fake := &fakeKnotConsumer{} 279 if err := handleJetstreamEvent(ctx, s, fake, ours, evt); err != nil { 280 t.Fatalf("handle: %v", err) 281 } 282 added := fake.Added() 283 if len(added) != 1 || added[0] != "knot.example" { 284 t.Fatalf("AddKnot calls = %v, want [knot.example]", added) 285 } 286} 287 288// TestRepoEventIgnoresKnotForOtherSpindle confirms repos pointing at a 289// *different* spindle do not pull us into watching their knot. Without 290// this guard, tack would dial every knot named in any sh.tangled.repo 291// it sees over the firehose, which is most of them. 292func TestRepoEventIgnoresKnotForOtherSpindle(t *testing.T) { 293 s := newTestStore(t) 294 ctx := context.Background() 295 296 other := "other-spindle.example" 297 rec := tangled.Repo{ 298 Knot: "knot.example", 299 Name: "myrepo", 300 Spindle: &other, 301 CreatedAt: "2026-01-01T00:00:00Z", 302 } 303 evt := commitEvent(1, "did:plc:owner", tangled.RepoNSID, jsOpCreate, "rk", rec) 304 305 fake := &fakeKnotConsumer{} 306 if err := handleJetstreamEvent(ctx, s, fake, "tack.example", evt); err != nil { 307 t.Fatalf("handle: %v", err) 308 } 309 if added := fake.Added(); len(added) != 0 { 310 t.Fatalf("AddKnot calls = %v, want none", added) 311 } 312} 313 314// TestRepoUpdateSpindleAwayFromUsRemovesKnot covers the case where a 315// repo we'd previously been watching gets its .spindle field flipped to 316// some other spindle. Once that's the only repo we had on that knot, 317// the reconciliation must call RemoveKnot. 318func TestRepoUpdateSpindleAwayFromUsRemovesKnot(t *testing.T) { 319 s := newTestStore(t) 320 ctx := context.Background() 321 322 const ours = "tack.example" 323 const knot = "knot.example" 324 325 // Seed: a repo that names us as its spindle on `knot`. 326 if err := s.UpsertRepo(ctx, "did:plc:a", "rk", knot, "repo-a", ours, "", "t"); err != nil { 327 t.Fatal(err) 328 } 329 330 // Update: same record, now points at a different spindle. 331 other := "other.example" 332 rec := tangled.Repo{ 333 Knot: knot, 334 Name: "repo-a", 335 Spindle: &other, 336 CreatedAt: "2026-01-01T00:00:00Z", 337 } 338 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpUpdate, "rk", rec) 339 340 fake := &fakeKnotConsumer{} 341 if err := handleJetstreamEvent(ctx, s, fake, ours, evt); err != nil { 342 t.Fatalf("handle: %v", err) 343 } 344 if added := fake.Added(); len(added) != 0 { 345 t.Fatalf("AddKnot calls = %v, want none", added) 346 } 347 if removed := fake.Removed(); len(removed) != 1 || removed[0] != knot { 348 t.Fatalf("RemoveKnot calls = %v, want [%s]", removed, knot) 349 } 350} 351 352// TestRepoUpdateSpindleAwayFromUsKeepsKnotIfShared ensures we don't 353// over-eagerly unsubscribe from a knot when one of multiple repos on 354// that knot moves away from us. The other repo's subscription must keep 355// the knot in our wanted set. 356func TestRepoUpdateSpindleAwayFromUsKeepsKnotIfShared(t *testing.T) { 357 s := newTestStore(t) 358 ctx := context.Background() 359 360 const ours = "tack.example" 361 const knot = "knot.example" 362 363 // Two repos sharing one knot, both pointed at us. 364 if err := s.UpsertRepo(ctx, "did:plc:a", "rk1", knot, "a", ours, "", "t"); err != nil { 365 t.Fatal(err) 366 } 367 if err := s.UpsertRepo(ctx, "did:plc:b", "rk2", knot, "b", ours, "", "t"); err != nil { 368 t.Fatal(err) 369 } 370 371 // Repo A flips to a different spindle. B still wants us. 372 other := "other.example" 373 rec := tangled.Repo{ 374 Knot: knot, 375 Name: "a", 376 Spindle: &other, 377 CreatedAt: "2026-01-01T00:00:00Z", 378 } 379 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpUpdate, "rk1", rec) 380 381 fake := &fakeKnotConsumer{} 382 if err := handleJetstreamEvent(ctx, s, fake, ours, evt); err != nil { 383 t.Fatalf("handle: %v", err) 384 } 385 if removed := fake.Removed(); len(removed) != 0 { 386 t.Fatalf("RemoveKnot calls = %v, want none (B still wants us on %s)", removed, knot) 387 } 388} 389 390// TestRepoUpdateChangingKnotSwapsSubscription verifies that a repo 391// staying with us but changing its .knot field unsubscribes the old 392// knot (if no other repo holds it) and subscribes the new one. 393func TestRepoUpdateChangingKnotSwapsSubscription(t *testing.T) { 394 s := newTestStore(t) 395 ctx := context.Background() 396 397 const ours = "tack.example" 398 const oldKnot = "old.example" 399 const newKnot = "new.example" 400 401 if err := s.UpsertRepo(ctx, "did:plc:a", "rk", oldKnot, "a", ours, "", "t"); err != nil { 402 t.Fatal(err) 403 } 404 405 spindle := ours 406 rec := tangled.Repo{ 407 Knot: newKnot, 408 Name: "a", 409 Spindle: &spindle, 410 CreatedAt: "2026-01-01T00:00:00Z", 411 } 412 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpUpdate, "rk", rec) 413 414 fake := &fakeKnotConsumer{} 415 if err := handleJetstreamEvent(ctx, s, fake, ours, evt); err != nil { 416 t.Fatalf("handle: %v", err) 417 } 418 if added := fake.Added(); len(added) != 1 || added[0] != newKnot { 419 t.Fatalf("AddKnot calls = %v, want [%s]", added, newKnot) 420 } 421 if removed := fake.Removed(); len(removed) != 1 || removed[0] != oldKnot { 422 t.Fatalf("RemoveKnot calls = %v, want [%s]", removed, oldKnot) 423 } 424} 425 426// TestRepoDeleteRemovesKnotWhenLast confirms deleting the last repo on 427// a knot we cared about triggers RemoveKnot. 428func TestRepoDeleteRemovesKnotWhenLast(t *testing.T) { 429 s := newTestStore(t) 430 ctx := context.Background() 431 432 const ours = "tack.example" 433 const knot = "knot.example" 434 435 if err := s.UpsertRepo(ctx, "did:plc:a", "rk", knot, "a", ours, "", "t"); err != nil { 436 t.Fatal(err) 437 } 438 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpDelete, "rk", nil) 439 440 fake := &fakeKnotConsumer{} 441 if err := handleJetstreamEvent(ctx, s, fake, ours, evt); err != nil { 442 t.Fatalf("handle: %v", err) 443 } 444 if removed := fake.Removed(); len(removed) != 1 || removed[0] != knot { 445 t.Fatalf("RemoveKnot calls = %v, want [%s]", removed, knot) 446 } 447} 448 449// TestRepoDeleteKeepsKnotIfShared ensures deleting one of multiple 450// repos on a knot does not unsubscribe — the survivors still want it. 451func TestRepoDeleteKeepsKnotIfShared(t *testing.T) { 452 s := newTestStore(t) 453 ctx := context.Background() 454 455 const ours = "tack.example" 456 const knot = "knot.example" 457 458 if err := s.UpsertRepo(ctx, "did:plc:a", "rk1", knot, "a", ours, "", "t"); err != nil { 459 t.Fatal(err) 460 } 461 if err := s.UpsertRepo(ctx, "did:plc:b", "rk2", knot, "b", ours, "", "t"); err != nil { 462 t.Fatal(err) 463 } 464 evt := commitEvent(1, "did:plc:a", tangled.RepoNSID, jsOpDelete, "rk1", nil) 465 466 fake := &fakeKnotConsumer{} 467 if err := handleJetstreamEvent(ctx, s, fake, ours, evt); err != nil { 468 t.Fatalf("handle: %v", err) 469 } 470 if removed := fake.Removed(); len(removed) != 0 { 471 t.Fatalf("RemoveKnot calls = %v, want none", removed) 472 } 473}