Coffee journaling on ATProto (alpha) alpha.arabica.social
coffee
17
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: firehose integration test cases

authored by

Patrick Dewey and committed by tangled.org b8c1dcb7 08d15d00

+660 -19
+33 -11
internal/firehose/consumer.go
··· 17 17 "github.com/rs/zerolog/log" 18 18 ) 19 19 20 + // JetstreamCommit represents the commit data within a Jetstream event. 21 + type JetstreamCommit struct { 22 + Rev string `json:"rev"` 23 + Operation string `json:"operation"` // "create", "update", "delete" 24 + Collection string `json:"collection"` 25 + RKey string `json:"rkey"` 26 + Record json.RawMessage `json:"record,omitempty"` 27 + CID string `json:"cid"` 28 + } 29 + 20 30 // JetstreamEvent represents an event from Jetstream 21 31 type JetstreamEvent struct { 22 - DID string `json:"did"` 23 - TimeUS int64 `json:"time_us"` 24 - Kind string `json:"kind"` // "commit", "identity", "account" 25 - Commit *struct { 26 - Rev string `json:"rev"` 27 - Operation string `json:"operation"` // "create", "update", "delete" 28 - Collection string `json:"collection"` 29 - RKey string `json:"rkey"` 30 - Record json.RawMessage `json:"record,omitempty"` 31 - CID string `json:"cid"` 32 - } `json:"commit,omitempty"` 32 + DID string `json:"did"` 33 + TimeUS int64 `json:"time_us"` 34 + Kind string `json:"kind"` // "commit", "identity", "account" 35 + Commit *JetstreamCommit `json:"commit,omitempty"` 33 36 } 34 37 35 38 // Consumer consumes events from Jetstream and indexes them ··· 321 324 Str("operation", commit.Operation). 322 325 Str("rkey", commit.RKey). 323 326 Msg("firehose: processing event") 327 + 328 + return c.processCommit(event) 329 + } 330 + 331 + // ProcessEvent processes a single Jetstream event through the indexing pipeline. 332 + // Exported for use in integration tests where events are fed from a test PDS 333 + // firehose rather than a live Jetstream connection. 334 + func (c *Consumer) ProcessEvent(event JetstreamEvent) error { 335 + if event.Kind != "commit" || event.Commit == nil { 336 + return nil 337 + } 338 + if !strings.HasPrefix(event.Commit.Collection, "social.arabica.alpha.") { 339 + return nil 340 + } 341 + return c.processCommit(event) 342 + } 343 + 344 + func (c *Consumer) processCommit(event JetstreamEvent) error { 345 + commit := event.Commit 324 346 325 347 switch commit.Operation { 326 348 case "create", "update":
+481
tests/integration/firehose_test.go
··· 1 + package integration 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "net/http" 7 + "net/url" 8 + "testing" 9 + "time" 10 + 11 + "tangled.org/arabica.social/arabica/internal/atproto" 12 + "tangled.org/arabica.social/arabica/internal/firehose" 13 + "tangled.org/arabica.social/arabica/internal/lexicons" 14 + 15 + "github.com/stretchr/testify/assert" 16 + "github.com/stretchr/testify/require" 17 + ) 18 + 19 + const firehoseWait = 5 * time.Second 20 + 21 + // TestFirehose_RecordIndexing verifies that records created via the HTTP API 22 + // are picked up by the testpds firehose and indexed into the FeedIndex. 23 + func TestFirehose_RecordIndexing(t *testing.T) { 24 + h := StartHarness(t, &HarnessOptions{EnableFirehose: true}) 25 + ctx := context.Background() 26 + 27 + // Create a roaster via the HTTP API. 28 + rkey := mustRKey(t, h.PostForm("/api/roasters", form("name", "Firehose Roaster", "location", "Portland, OR")), "roaster") 29 + uri := atproto.BuildATURI(h.PrimaryAccount.DID, atproto.NSIDRoaster, rkey) 30 + 31 + // Wait for the firehose to index the record. 32 + h.WaitForRecord(uri, firehoseWait) 33 + 34 + rec, err := h.FeedIndex.GetRecord(ctx, uri) 35 + require.NoError(t, err) 36 + require.NotNil(t, rec) 37 + assert.Equal(t, h.PrimaryAccount.DID, rec.DID) 38 + assert.Equal(t, atproto.NSIDRoaster, rec.Collection) 39 + assert.Equal(t, rkey, rec.RKey) 40 + assert.Contains(t, string(rec.Record), "Firehose Roaster") 41 + } 42 + 43 + // TestFirehose_MultipleEntityTypes verifies that different entity types are all 44 + // indexed through the firehose pipeline. 45 + func TestFirehose_MultipleEntityTypes(t *testing.T) { 46 + h := StartHarness(t, &HarnessOptions{EnableFirehose: true}) 47 + 48 + roasterRKey := mustRKey(t, h.PostForm("/api/roasters", form("name", "Multi Roaster")), "roaster") 49 + beanRKey := mustRKey(t, h.PostForm("/api/beans", form( 50 + "name", "Multi Bean", 51 + "origin", "Colombia", 52 + "roaster_rkey", roasterRKey, 53 + )), "bean") 54 + grinderRKey := mustRKey(t, h.PostForm("/api/grinders", form("name", "Multi Grinder")), "grinder") 55 + brewerRKey := mustRKey(t, h.PostForm("/api/brewers", form("name", "Multi Brewer")), "brewer") 56 + 57 + uris := map[string]string{ 58 + "roaster": atproto.BuildATURI(h.PrimaryAccount.DID, atproto.NSIDRoaster, roasterRKey), 59 + "bean": atproto.BuildATURI(h.PrimaryAccount.DID, atproto.NSIDBean, beanRKey), 60 + "grinder": atproto.BuildATURI(h.PrimaryAccount.DID, atproto.NSIDGrinder, grinderRKey), 61 + "brewer": atproto.BuildATURI(h.PrimaryAccount.DID, atproto.NSIDBrewer, brewerRKey), 62 + } 63 + 64 + for label, uri := range uris { 65 + h.WaitForRecord(uri, firehoseWait) 66 + rec, err := h.FeedIndex.GetRecord(context.Background(), uri) 67 + require.NoError(t, err, "%s should be indexed", label) 68 + require.NotNil(t, rec, "%s should be indexed", label) 69 + } 70 + } 71 + 72 + // TestFirehose_DeleteRemovesFromIndex verifies that deleting a record via the 73 + // HTTP API triggers a firehose delete event that removes it from the index. 74 + func TestFirehose_DeleteRemovesFromIndex(t *testing.T) { 75 + h := StartHarness(t, &HarnessOptions{EnableFirehose: true}) 76 + 77 + rkey := mustRKey(t, h.PostForm("/api/roasters", form("name", "Delete Me")), "roaster") 78 + uri := atproto.BuildATURI(h.PrimaryAccount.DID, atproto.NSIDRoaster, rkey) 79 + 80 + // Wait for firehose to index it first. 81 + h.WaitForRecord(uri, firehoseWait) 82 + 83 + // Delete via HTTP API. 84 + resp := h.Delete("/api/roasters/" + rkey) 85 + require.Equal(t, 200, resp.StatusCode, statusErr(resp, ReadBody(t, resp))) 86 + 87 + // Wait for firehose to remove it. 88 + h.WaitForRecordAbsent(uri, firehoseWait) 89 + 90 + rec, err := h.FeedIndex.GetRecord(context.Background(), uri) 91 + assert.NoError(t, err) 92 + assert.Nil(t, rec, "record should be removed from index after delete") 93 + } 94 + 95 + // TestFirehose_LikeCreatesNotification verifies that a like event from the 96 + // firehose creates a notification for the record owner. 97 + func TestFirehose_LikeCreatesNotification(t *testing.T) { 98 + h := StartHarness(t, &HarnessOptions{EnableFirehose: true}) 99 + ctx := context.Background() 100 + 101 + // Alice creates a roaster. 102 + rkey := mustRKey(t, h.PostForm("/api/roasters", form("name", "Likeable Roaster")), "roaster") 103 + subjectURI, subjectCID := subjectRefFor(t, h, h.PrimaryAccount, atproto.NSIDRoaster, rkey) 104 + 105 + // Bob likes Alice's roaster. 106 + bob := h.CreateAccount("bob@test.com", "bob.test", "hunter2") 107 + bobClient := h.NewClientForAccount(bob) 108 + 109 + likeResp, err := bobClient.PostForm(h.URL("/api/likes/toggle"), url.Values{ 110 + "subject_uri": {subjectURI}, 111 + "subject_cid": {subjectCID}, 112 + }) 113 + require.NoError(t, err) 114 + require.Equal(t, 200, likeResp.StatusCode, statusErr(likeResp, ReadBody(t, likeResp))) 115 + 116 + // Poll for the like notification. The like count may appear first (via 117 + // HTTP handler write-through), but the notification is only created by 118 + // the firehose consumer, so poll until it shows up. 119 + deadline := time.Now().Add(firehoseWait) 120 + var foundLikeNotif bool 121 + for time.Now().Before(deadline) { 122 + notifs, _, err := h.FeedIndex.GetNotifications(h.PrimaryAccount.DID, 10, "") 123 + require.NoError(t, err) 124 + for _, n := range notifs { 125 + if n.Type == "like" && n.ActorDID == bob.DID && n.SubjectURI == subjectURI { 126 + foundLikeNotif = true 127 + break 128 + } 129 + } 130 + if foundLikeNotif { 131 + break 132 + } 133 + time.Sleep(50 * time.Millisecond) 134 + } 135 + assert.True(t, foundLikeNotif, "Alice should have a like notification from Bob") 136 + assert.Equal(t, 1, h.FeedIndex.GetLikeCount(ctx, subjectURI), "firehose should index the like") 137 + } 138 + 139 + // TestFirehose_CommentCreatesNotification verifies that a comment event from 140 + // the firehose creates a notification for the record owner. 141 + func TestFirehose_CommentCreatesNotification(t *testing.T) { 142 + h := StartHarness(t, &HarnessOptions{EnableFirehose: true}) 143 + ctx := context.Background() 144 + 145 + // Alice creates a roaster. 146 + rkey := mustRKey(t, h.PostForm("/api/roasters", form("name", "Commentable Roaster")), "roaster") 147 + subjectURI, subjectCID := subjectRefFor(t, h, h.PrimaryAccount, atproto.NSIDRoaster, rkey) 148 + 149 + // Bob comments on Alice's roaster. 150 + bob := h.CreateAccount("bob@test.com", "bob.test", "hunter2") 151 + bobClient := h.NewClientForAccount(bob) 152 + 153 + commentResp, err := bobClient.PostForm(h.URL("/api/comments"), url.Values{ 154 + "subject_uri": {subjectURI}, 155 + "subject_cid": {subjectCID}, 156 + "text": {"Great roaster!"}, 157 + }) 158 + require.NoError(t, err) 159 + require.Equal(t, 200, commentResp.StatusCode, statusErr(commentResp, ReadBody(t, commentResp))) 160 + 161 + // Wait for the comment to be indexed. 162 + deadline := time.Now().Add(firehoseWait) 163 + for time.Now().Before(deadline) { 164 + if h.FeedIndex.GetCommentCount(ctx, subjectURI) > 0 { 165 + break 166 + } 167 + time.Sleep(50 * time.Millisecond) 168 + } 169 + assert.Equal(t, 1, h.FeedIndex.GetCommentCount(ctx, subjectURI), "firehose should index the comment") 170 + 171 + // Alice should have a notification. 172 + notifs, _, err := h.FeedIndex.GetNotifications(h.PrimaryAccount.DID, 10, "") 173 + require.NoError(t, err) 174 + 175 + var foundCommentNotif bool 176 + for _, n := range notifs { 177 + if n.Type == "comment" && n.ActorDID == bob.DID && n.SubjectURI == subjectURI { 178 + foundCommentNotif = true 179 + break 180 + } 181 + } 182 + assert.True(t, foundCommentNotif, "Alice should have a comment notification from Bob") 183 + } 184 + 185 + // --- Feed query end-to-end --- 186 + 187 + // TestFirehose_FeedQueryReturnsIndexedRecords creates several entity types via 188 + // HTTP, waits for firehose indexing, and verifies they appear in feed queries. 189 + func TestFirehose_FeedQueryReturnsIndexedRecords(t *testing.T) { 190 + h := StartHarness(t, &HarnessOptions{EnableFirehose: true}) 191 + ctx := context.Background() 192 + 193 + // Create entities that should appear in the feed. 194 + roasterRKey := mustRKey(t, h.PostForm("/api/roasters", form("name", "Feed Roaster")), "roaster") 195 + beanRKey := mustRKey(t, h.PostForm("/api/beans", form( 196 + "name", "Feed Bean", 197 + "roaster_rkey", roasterRKey, 198 + )), "bean") 199 + grinderRKey := mustRKey(t, h.PostForm("/api/grinders", form("name", "Feed Grinder")), "grinder") 200 + 201 + // Wait for all records to be indexed. 202 + roasterURI := atproto.BuildATURI(h.PrimaryAccount.DID, atproto.NSIDRoaster, roasterRKey) 203 + beanURI := atproto.BuildATURI(h.PrimaryAccount.DID, atproto.NSIDBean, beanRKey) 204 + grinderURI := atproto.BuildATURI(h.PrimaryAccount.DID, atproto.NSIDGrinder, grinderRKey) 205 + 206 + h.WaitForRecord(roasterURI, firehoseWait) 207 + h.WaitForRecord(beanURI, firehoseWait) 208 + h.WaitForRecord(grinderURI, firehoseWait) 209 + 210 + // Query the feed — all three should appear. 211 + result, err := h.FeedIndex.GetFeedWithQuery(ctx, firehose.FeedQuery{Limit: 10}) 212 + require.NoError(t, err) 213 + require.NotNil(t, result) 214 + assert.GreaterOrEqual(t, len(result.Items), 3, "feed should contain at least 3 items") 215 + 216 + // Verify specific items are present by checking record types. 217 + types := map[lexicons.RecordType]bool{} 218 + for _, item := range result.Items { 219 + types[item.RecordType] = true 220 + } 221 + assert.True(t, types[lexicons.RecordTypeRoaster], "feed should contain a roaster") 222 + assert.True(t, types[lexicons.RecordTypeBean], "feed should contain a bean") 223 + assert.True(t, types[lexicons.RecordTypeGrinder], "feed should contain a grinder") 224 + } 225 + 226 + // TestFirehose_FeedQueryTypeFilter verifies that feed queries can be filtered 227 + // to a specific record type. 228 + func TestFirehose_FeedQueryTypeFilter(t *testing.T) { 229 + h := StartHarness(t, &HarnessOptions{EnableFirehose: true}) 230 + ctx := context.Background() 231 + 232 + roasterRKey := mustRKey(t, h.PostForm("/api/roasters", form("name", "Filter Roaster")), "roaster") 233 + mustRKey(t, h.PostForm("/api/grinders", form("name", "Filter Grinder")), "grinder") 234 + 235 + roasterURI := atproto.BuildATURI(h.PrimaryAccount.DID, atproto.NSIDRoaster, roasterRKey) 236 + h.WaitForRecord(roasterURI, firehoseWait) 237 + 238 + // Filter to roasters only. 239 + result, err := h.FeedIndex.GetFeedWithQuery(ctx, firehose.FeedQuery{ 240 + Limit: 10, 241 + TypeFilter: lexicons.RecordTypeRoaster, 242 + }) 243 + require.NoError(t, err) 244 + for _, item := range result.Items { 245 + assert.Equal(t, lexicons.RecordTypeRoaster, item.RecordType, 246 + "all items should be roasters when filtered") 247 + } 248 + } 249 + 250 + // TestFirehose_FeedQueryWithLikeAndCommentCounts verifies that feed items 251 + // include accurate like and comment counts populated by the firehose. 252 + func TestFirehose_FeedQueryWithLikeAndCommentCounts(t *testing.T) { 253 + h := StartHarness(t, &HarnessOptions{EnableFirehose: true}) 254 + ctx := context.Background() 255 + 256 + rkey := mustRKey(t, h.PostForm("/api/roasters", form("name", "Popular Roaster")), "roaster") 257 + subjectURI, subjectCID := subjectRefFor(t, h, h.PrimaryAccount, atproto.NSIDRoaster, rkey) 258 + 259 + // Bob likes the roaster. 260 + bob := h.CreateAccount("bob@test.com", "bob.test", "hunter2") 261 + bobClient := h.NewClientForAccount(bob) 262 + 263 + likeResp, err := bobClient.PostForm(h.URL("/api/likes/toggle"), url.Values{ 264 + "subject_uri": {subjectURI}, 265 + "subject_cid": {subjectCID}, 266 + }) 267 + require.NoError(t, err) 268 + require.Equal(t, 200, likeResp.StatusCode) 269 + 270 + // Bob comments on the roaster. 271 + commentResp, err := bobClient.PostForm(h.URL("/api/comments"), url.Values{ 272 + "subject_uri": {subjectURI}, 273 + "subject_cid": {subjectCID}, 274 + "text": {"Nice one!"}, 275 + }) 276 + require.NoError(t, err) 277 + require.Equal(t, 200, commentResp.StatusCode) 278 + 279 + // Wait for firehose to process the like and comment. 280 + waitFor(t, firehoseWait, func() bool { 281 + return h.FeedIndex.GetLikeCount(ctx, subjectURI) >= 1 && 282 + h.FeedIndex.GetCommentCount(ctx, subjectURI) >= 1 283 + }) 284 + 285 + // Query the feed and find the roaster item. 286 + result, err := h.FeedIndex.GetFeedWithQuery(ctx, firehose.FeedQuery{ 287 + Limit: 10, 288 + TypeFilter: lexicons.RecordTypeRoaster, 289 + }) 290 + require.NoError(t, err) 291 + 292 + var found *firehose.FeedItem 293 + for _, item := range result.Items { 294 + if item.SubjectURI == subjectURI { 295 + found = item 296 + break 297 + } 298 + } 299 + require.NotNil(t, found, "roaster should appear in feed") 300 + assert.Equal(t, 1, found.LikeCount, "feed item should show 1 like") 301 + assert.Equal(t, 1, found.CommentCount, "feed item should show 1 comment") 302 + } 303 + 304 + // --- Record update via firehose --- 305 + 306 + // TestFirehose_RecordUpdateReflected verifies that updating a record via HTTP 307 + // results in the firehose re-indexing it with the new data. 308 + func TestFirehose_RecordUpdateReflected(t *testing.T) { 309 + h := StartHarness(t, &HarnessOptions{EnableFirehose: true}) 310 + ctx := context.Background() 311 + 312 + rkey := mustRKey(t, h.PostForm("/api/roasters", form("name", "Original Name", "location", "NYC")), "roaster") 313 + uri := atproto.BuildATURI(h.PrimaryAccount.DID, atproto.NSIDRoaster, rkey) 314 + 315 + h.WaitForRecord(uri, firehoseWait) 316 + 317 + // Verify original data is indexed. 318 + rec, err := h.FeedIndex.GetRecord(ctx, uri) 319 + require.NoError(t, err) 320 + assert.Contains(t, string(rec.Record), "Original Name") 321 + 322 + // Update the roaster. 323 + updateResp := h.PutForm("/api/roasters/"+rkey, form("name", "Updated Name", "location", "SF")) 324 + require.Equal(t, 200, updateResp.StatusCode, statusErr(updateResp, ReadBody(t, updateResp))) 325 + 326 + // Wait for the firehose to re-index with the updated data. 327 + waitFor(t, firehoseWait, func() bool { 328 + rec, err := h.FeedIndex.GetRecord(ctx, uri) 329 + if err != nil || rec == nil { 330 + return false 331 + } 332 + return string(rec.Record) != "" && 333 + assert.ObjectsAreEqual("Updated Name", extractField(rec.Record, "name")) 334 + }) 335 + 336 + rec, err = h.FeedIndex.GetRecord(ctx, uri) 337 + require.NoError(t, err) 338 + assert.Contains(t, string(rec.Record), "Updated Name", "index should reflect updated name") 339 + assert.Contains(t, string(rec.Record), "SF", "index should reflect updated location") 340 + assert.NotContains(t, string(rec.Record), "Original Name", "index should not contain old name") 341 + } 342 + 343 + // --- Unlike / uncomment via firehose --- 344 + 345 + // TestFirehose_UnlikeCleansUpIndex verifies that unliking a record (toggling 346 + // the like off) results in the firehose removing the like from the index and 347 + // cleaning up the notification. 348 + func TestFirehose_UnlikeCleansUpIndex(t *testing.T) { 349 + h := StartHarness(t, &HarnessOptions{EnableFirehose: true}) 350 + ctx := context.Background() 351 + 352 + // Alice creates a roaster. 353 + rkey := mustRKey(t, h.PostForm("/api/roasters", form("name", "Unlike Roaster")), "roaster") 354 + subjectURI, subjectCID := subjectRefFor(t, h, h.PrimaryAccount, atproto.NSIDRoaster, rkey) 355 + 356 + // Bob likes it. 357 + bob := h.CreateAccount("bob@test.com", "bob.test", "hunter2") 358 + bobClient := h.NewClientForAccount(bob) 359 + 360 + likeResp, err := bobClient.PostForm(h.URL("/api/likes/toggle"), url.Values{ 361 + "subject_uri": {subjectURI}, 362 + "subject_cid": {subjectCID}, 363 + }) 364 + require.NoError(t, err) 365 + require.Equal(t, 200, likeResp.StatusCode) 366 + 367 + // Wait for the like to be fully indexed via firehose. 368 + waitFor(t, firehoseWait, func() bool { 369 + return h.FeedIndex.GetLikeCount(ctx, subjectURI) == 1 370 + }) 371 + assert.True(t, h.FeedIndex.HasUserLiked(ctx, bob.DID, subjectURI)) 372 + 373 + // Bob unlikes it (toggle again). 374 + unlikeResp, err := bobClient.PostForm(h.URL("/api/likes/toggle"), url.Values{ 375 + "subject_uri": {subjectURI}, 376 + "subject_cid": {subjectCID}, 377 + }) 378 + require.NoError(t, err) 379 + require.Equal(t, 200, unlikeResp.StatusCode) 380 + 381 + // Wait for the unlike to propagate through the firehose. The HTTP handler 382 + // does write-through DeleteLike, but the firehose consumer also processes 383 + // the delete event and removes the notification. 384 + waitFor(t, firehoseWait, func() bool { 385 + return h.FeedIndex.GetLikeCount(ctx, subjectURI) == 0 386 + }) 387 + 388 + assert.Equal(t, 0, h.FeedIndex.GetLikeCount(ctx, subjectURI), "like count should be 0 after unlike") 389 + assert.False(t, h.FeedIndex.HasUserLiked(ctx, bob.DID, subjectURI), "Bob should no longer have liked") 390 + 391 + // The notification should have been cleaned up. 392 + notifs, _, err := h.FeedIndex.GetNotifications(h.PrimaryAccount.DID, 10, "") 393 + require.NoError(t, err) 394 + for _, n := range notifs { 395 + if n.Type == "like" && n.ActorDID == bob.DID && n.SubjectURI == subjectURI { 396 + t.Error("like notification should have been removed after unlike") 397 + } 398 + } 399 + } 400 + 401 + // TestFirehose_CommentDeleteCleansUpIndex verifies that deleting a comment 402 + // results in the firehose removing the comment from the index and cleaning up 403 + // the notification. 404 + func TestFirehose_CommentDeleteCleansUpIndex(t *testing.T) { 405 + h := StartHarness(t, &HarnessOptions{EnableFirehose: true}) 406 + ctx := context.Background() 407 + 408 + // Alice creates a roaster. 409 + rkey := mustRKey(t, h.PostForm("/api/roasters", form("name", "Uncomment Roaster")), "roaster") 410 + subjectURI, subjectCID := subjectRefFor(t, h, h.PrimaryAccount, atproto.NSIDRoaster, rkey) 411 + 412 + // Bob comments on it. 413 + bob := h.CreateAccount("bob@test.com", "bob.test", "hunter2") 414 + bobClient := h.NewClientForAccount(bob) 415 + 416 + commentResp, err := bobClient.PostForm(h.URL("/api/comments"), url.Values{ 417 + "subject_uri": {subjectURI}, 418 + "subject_cid": {subjectCID}, 419 + "text": {"Delete me later"}, 420 + }) 421 + require.NoError(t, err) 422 + require.Equal(t, 200, commentResp.StatusCode) 423 + 424 + // Wait for the comment to be indexed. 425 + waitFor(t, firehoseWait, func() bool { 426 + return h.FeedIndex.GetCommentCount(ctx, subjectURI) == 1 427 + }) 428 + 429 + // Find the comment rkey. 430 + comments := h.FeedIndex.GetThreadedCommentsForSubject(ctx, subjectURI, 10, "") 431 + require.Len(t, comments, 1) 432 + commentRKey := comments[0].RKey 433 + 434 + // Bob deletes the comment. 435 + deleteReq, err := http.NewRequest("DELETE", h.URL("/api/comments/"+commentRKey), nil) 436 + require.NoError(t, err) 437 + deleteResp, err := bobClient.Do(deleteReq) 438 + require.NoError(t, err) 439 + require.Equal(t, 200, deleteResp.StatusCode) 440 + 441 + // Wait for the comment to be removed from the index via firehose. 442 + waitFor(t, firehoseWait, func() bool { 443 + return h.FeedIndex.GetCommentCount(ctx, subjectURI) == 0 444 + }) 445 + 446 + assert.Equal(t, 0, h.FeedIndex.GetCommentCount(ctx, subjectURI), "comment count should be 0 after delete") 447 + 448 + // The notification should have been cleaned up. 449 + notifs, _, err := h.FeedIndex.GetNotifications(h.PrimaryAccount.DID, 10, "") 450 + require.NoError(t, err) 451 + for _, n := range notifs { 452 + if n.Type == "comment" && n.ActorDID == bob.DID && n.SubjectURI == subjectURI { 453 + t.Error("comment notification should have been removed after delete") 454 + } 455 + } 456 + } 457 + 458 + // --- helpers --- 459 + 460 + // waitFor polls condition until it returns true or the timeout expires. 461 + func waitFor(t *testing.T, timeout time.Duration, condition func() bool) { 462 + t.Helper() 463 + deadline := time.Now().Add(timeout) 464 + for time.Now().Before(deadline) { 465 + if condition() { 466 + return 467 + } 468 + time.Sleep(50 * time.Millisecond) 469 + } 470 + t.Fatal("timed out waiting for condition") 471 + } 472 + 473 + // extractField pulls a top-level string field from a JSON record. 474 + func extractField(record []byte, key string) string { 475 + var m map[string]any 476 + if err := json.Unmarshal(record, &m); err != nil { 477 + return "" 478 + } 479 + v, _ := m[key].(string) 480 + return v 481 + }
+3 -2
tests/integration/go.mod
··· 5 5 replace ( 6 6 github.com/haileyok/cocoon => github.com/ptdewey/cocoon v0.0.0-20260406233545-539d73959ca6 7 7 tangled.org/arabica.social/arabica => ../.. 8 + tangled.org/pdewey.com/atp => ../../../atp 8 9 ) 9 10 10 11 require ( 11 - github.com/bluesky-social/indigo v0.0.0-20260318212431-cbaa83aee9dd 12 - github.com/haileyok/cocoon v0.9.0 12 + github.com/bluesky-social/indigo v0.0.0-20260409212512-2031017ff411 13 13 github.com/ptdewey/shutter v0.2.1 14 14 github.com/rs/zerolog v1.34.0 15 15 github.com/stretchr/testify v1.11.1 ··· 57 57 github.com/gorilla/sessions v1.4.0 // indirect 58 58 github.com/gorilla/websocket v1.5.3 // indirect 59 59 github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 // indirect 60 + github.com/haileyok/cocoon v0.9.0 // indirect 60 61 github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect 61 62 github.com/hako/durafmt v0.0.0-20210608085754-5c1018a4e16b // indirect 62 63 github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
+2 -4
tests/integration/go.sum
··· 65 65 github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYEDvkta6I8/rnYM5gSdSV2tJ6XbZuEtY= 66 66 github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k= 67 67 github.com/bketelsen/crypt v0.0.4/go.mod h1:aI6NrJ0pMGgvZKL1iVgXLnfIFJtfV+bKCoqOes/6LfM= 68 - github.com/bluesky-social/indigo v0.0.0-20260318212431-cbaa83aee9dd h1:FZSMlxClfm7jCA6A/vwTNw5EPxSngPPpK09MxuEx9l0= 69 - github.com/bluesky-social/indigo v0.0.0-20260318212431-cbaa83aee9dd/go.mod h1:VG/LeqLGNI3Ew7lsYixajnZGFfWPv144qbUddh+Oyag= 68 + github.com/bluesky-social/indigo v0.0.0-20260409212512-2031017ff411 h1:zi+cbEqTeVqNsNkp6hFMDD5olYTalW9kdOaJIq/kXio= 69 + github.com/bluesky-social/indigo v0.0.0-20260409212512-2031017ff411/go.mod h1:JqQkz8lrOI6YZivP38GHmtVOTtzsNToITKj1gMpU5Jo= 70 70 github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= 71 71 github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= 72 72 github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792 h1:R8vQdOQdZ9Y3SkEwmHoWBmX1DNXhXZqlTpq6s4tyJGc= ··· 1065 1065 rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= 1066 1066 rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= 1067 1067 rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= 1068 - tangled.org/pdewey.com/atp v0.0.0-20260407015143-f53954e5e783 h1:2ZJGx2rhmISlDIgcLSmtjt/PKbI3LcGIx3jqZL6o3wE= 1069 - tangled.org/pdewey.com/atp v0.0.0-20260407015143-f53954e5e783/go.mod h1:+ayXDZgZZpb3UF2how+7BmPGLsD3JVJjCzhCk2lrv4o=
+140 -1
tests/integration/harness.go
··· 25 25 26 26 "github.com/bluesky-social/indigo/atproto/atclient" 27 27 "github.com/bluesky-social/indigo/atproto/syntax" 28 - "github.com/haileyok/cocoon/testpds" 28 + "tangled.org/pdewey.com/atp/testpds" 29 29 "github.com/rs/zerolog" 30 30 zlog "github.com/rs/zerolog/log" 31 31 "github.com/stretchr/testify/require" ··· 89 89 Server *httptest.Server 90 90 Handler *handlers.Handler 91 91 FeedIndex *firehose.FeedIndex 92 + Consumer *firehose.Consumer 92 93 SessionCache *atproto.SessionCache 93 94 94 95 // PrimaryAccount is the default account created on harness setup. ··· 106 107 // atpClients maps DID -> atp.Client for direct PDS access in tests. 107 108 atpClients map[string]*atp.Client 108 109 110 + // firehoseCancel stops the firehose bridge goroutine. 111 + firehoseCancel context.CancelFunc 112 + 109 113 cleanup []func() 110 114 } 111 115 ··· 125 129 PrimaryEmail string 126 130 // PrimaryPassword is the password for the default account. Defaults to "hunter2". 127 131 PrimaryPassword string 132 + // EnableFirehose subscribes to the test PDS firehose and feeds events 133 + // through the Consumer → FeedIndex pipeline, so records created via the 134 + // PDS are automatically indexed. Use WaitForRecord to synchronise. 135 + EnableFirehose bool 128 136 } 129 137 130 138 // StartHarness boots a test PDS, creates a primary account, builds the full ··· 221 229 harness.Server = server 222 230 harness.Handler = h 223 231 harness.cleanup = append(harness.cleanup, server.Close, func() { _ = feedIndex.Close() }) 232 + 233 + // Wire up the firehose consumer before creating accounts so events are 234 + // indexed as they happen. 235 + if opts.EnableFirehose { 236 + consumer := firehose.NewConsumer(&firehose.Config{ 237 + WantedCollections: firehose.ArabicaCollections, 238 + }, feedIndex) 239 + harness.Consumer = consumer 240 + 241 + ctx, cancel := context.WithCancel(context.Background()) 242 + harness.firehoseCancel = cancel 243 + harness.cleanup = append(harness.cleanup, cancel) 244 + 245 + ch, err := pds.Subscribe(ctx, 0) 246 + require.NoError(t, err) 247 + 248 + go harness.firehoseBridge(ctx, ch) 249 + } 224 250 225 251 // Create the primary account and register it. 226 252 harness.PrimaryAccount = harness.CreateAccount(opts.PrimaryEmail, opts.PrimaryHandle, opts.PrimaryPassword) ··· 474 500 Password: password, 475 501 AccessJwt: result.AccessJwt, 476 502 } 503 + } 504 + 505 + // --- firehose bridge --- 506 + 507 + // firehoseBridge reads testpds firehose events, fetches records via XRPC, and 508 + // feeds them through the Consumer's event processing pipeline. 509 + func (h *Harness) firehoseBridge(ctx context.Context, ch <-chan testpds.FirehoseEvent) { 510 + for { 511 + select { 512 + case <-ctx.Done(): 513 + return 514 + case evt, ok := <-ch: 515 + if !ok { 516 + return 517 + } 518 + if evt.Commit == nil { 519 + continue 520 + } 521 + for _, op := range evt.Commit.Ops { 522 + parts := strings.SplitN(op.Path, "/", 2) 523 + if len(parts) != 2 { 524 + continue 525 + } 526 + collection, rkey := parts[0], parts[1] 527 + if !strings.HasPrefix(collection, "social.arabica.alpha.") { 528 + continue 529 + } 530 + 531 + event := firehose.JetstreamEvent{ 532 + DID: evt.Commit.Repo, 533 + TimeUS: time.Now().UnixMicro(), 534 + Kind: "commit", 535 + } 536 + 537 + switch op.Action { 538 + case "create", "update": 539 + record, cid, err := h.fetchRecordJSON(evt.Commit.Repo, collection, rkey) 540 + if err != nil { 541 + continue 542 + } 543 + event.Commit = &firehose.JetstreamCommit{ 544 + Operation: op.Action, 545 + Collection: collection, 546 + RKey: rkey, 547 + Record: record, 548 + CID: cid, 549 + } 550 + case "delete": 551 + event.Commit = &firehose.JetstreamCommit{ 552 + Operation: "delete", 553 + Collection: collection, 554 + RKey: rkey, 555 + } 556 + default: 557 + continue 558 + } 559 + 560 + _ = h.Consumer.ProcessEvent(event) 561 + } 562 + } 563 + } 564 + } 565 + 566 + // fetchRecordJSON fetches a record from the test PDS as raw JSON. 567 + func (h *Harness) fetchRecordJSON(did, collection, rkey string) (json.RawMessage, string, error) { 568 + u := fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=%s", 569 + h.PDS.URL, url.QueryEscape(did), url.QueryEscape(collection), url.QueryEscape(rkey)) 570 + resp, err := http.Get(u) 571 + if err != nil { 572 + return nil, "", err 573 + } 574 + defer resp.Body.Close() 575 + if resp.StatusCode != 200 { 576 + return nil, "", fmt.Errorf("getRecord %d", resp.StatusCode) 577 + } 578 + var result struct { 579 + CID string `json:"cid"` 580 + Value json.RawMessage `json:"value"` 581 + } 582 + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { 583 + return nil, "", err 584 + } 585 + return result.Value, result.CID, nil 586 + } 587 + 588 + // WaitForRecord polls the FeedIndex until the record with the given AT-URI is 589 + // indexed, or fails the test after timeout. 590 + func (h *Harness) WaitForRecord(uri string, timeout time.Duration) { 591 + h.T.Helper() 592 + deadline := time.Now().Add(timeout) 593 + for time.Now().Before(deadline) { 594 + rec, _ := h.FeedIndex.GetRecord(context.Background(), uri) 595 + if rec != nil { 596 + return 597 + } 598 + time.Sleep(50 * time.Millisecond) 599 + } 600 + h.T.Fatalf("timed out waiting for record %s to be indexed", uri) 601 + } 602 + 603 + // WaitForRecordAbsent polls the FeedIndex until the record with the given 604 + // AT-URI is no longer present, or fails the test after timeout. 605 + func (h *Harness) WaitForRecordAbsent(uri string, timeout time.Duration) { 606 + h.T.Helper() 607 + deadline := time.Now().Add(timeout) 608 + for time.Now().Before(deadline) { 609 + rec, _ := h.FeedIndex.GetRecord(context.Background(), uri) 610 + if rec == nil { 611 + return 612 + } 613 + time.Sleep(50 * time.Millisecond) 614 + } 615 + h.T.Fatalf("timed out waiting for record %s to be removed from index", uri) 477 616 } 478 617 479 618 // statusErr is a small helper for assertions that print useful diagnostics.
+1 -1
tests/integration/pds_test.go
··· 13 13 14 14 "github.com/bluesky-social/indigo/atproto/atclient" 15 15 "github.com/bluesky-social/indigo/atproto/syntax" 16 - "github.com/haileyok/cocoon/testpds" 16 + "tangled.org/pdewey.com/atp/testpds" 17 17 "github.com/stretchr/testify/assert" 18 18 "github.com/stretchr/testify/require" 19 19 "tangled.org/pdewey.com/atp"