a love letter to tangled (android, iOS, and a search API)
19
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: remove event skipping

+5 -52
+1 -22
packages/api/internal/ingest/ingest.go
··· 90 90 continue 91 91 } 92 92 93 - if r.shouldSkipEvent(event.ID) { 94 - if err := r.tap.AckEvent(ctx, event.ID); err != nil { 95 - r.log.Warn("tap ack skipped event failed", 96 - slog.Int64("event_id", event.ID), 97 - slog.String("error", err.Error()), 98 - ) 99 - continue 100 - } 101 - r.statusMu.Lock() 102 - highWaterMark := r.highWaterMark 103 - r.statusMu.Unlock() 104 - r.log.Debug("skipped previously-processed event", slog.Int64("event_id", event.ID), slog.Int64("resume_cursor", highWaterMark)) 105 - continue 106 - } 107 - 108 93 if err := r.processWithRetry(ctx, event); err != nil { 109 94 if ctx.Err() != nil { 110 95 return nil ··· 137 122 r.highWaterMark = cursor 138 123 r.lastCursor = state.Cursor 139 124 r.statusMu.Unlock() 140 - r.log.Info("indexer cursor resume enabled", slog.Int64("resume_cursor", cursor)) 125 + r.log.Info("indexer cursor state loaded", slog.Int64("cursor", cursor)) 141 126 return nil 142 - } 143 - 144 - func (r *Runner) shouldSkipEvent(eventID int64) bool { 145 - r.statusMu.Lock() 146 - defer r.statusMu.Unlock() 147 - return r.highWaterMark > 0 && eventID <= r.highWaterMark 148 127 } 149 128 150 129 func (r *Runner) processWithRetry(ctx context.Context, event normalize.TapRecordEvent) error {
+4 -30
packages/api/internal/ingest/ingest_test.go
··· 305 305 if st.syncCursor != "200" { 306 306 t.Fatalf("cursor regressed: got %q want 200", st.syncCursor) 307 307 } 308 - if !r.shouldSkipEvent(150) { 309 - t.Fatal("expected older event id to be skipped once a newer cursor is recorded") 308 + if r.highWaterMark != 200 { 309 + t.Fatalf("high-water mark: got %d want 200", r.highWaterMark) 310 310 } 311 311 } 312 312 ··· 320 320 t.Fatalf("initialize cursor: %v", err) 321 321 } 322 322 323 - if !r.shouldSkipEvent(150) { 324 - t.Fatal("expected stored cursor to act as skip high-water mark") 325 - } 326 - if r.shouldSkipEvent(151) { 327 - t.Fatal("did not expect events above the high-water mark to be skipped") 323 + if r.highWaterMark != 150 { 324 + t.Fatalf("high-water mark: got %d want 150", r.highWaterMark) 328 325 } 329 326 } 330 327 ··· 398 395 } 399 396 if policy.Allows(store.IndexSourceTap, "app.bsky.feed.post") { 400 397 t.Fatal("unexpected match") 401 - } 402 - } 403 - 404 - func TestRunner_InitializeCursorResume(t *testing.T) { 405 - st := newFakeStore() 406 - st.initialSync = &store.SyncState{ConsumerName: "indexer-tap-v1", Cursor: "150"} 407 - tap := &fakeTapClient{} 408 - r := newRunnerForTest(st, tap, "sh.tangled.*") 409 - 410 - if err := r.initializeCursor(context.Background()); err != nil { 411 - t.Fatalf("initialize cursor: %v", err) 412 - } 413 - if r.highWaterMark != 150 { 414 - t.Fatalf("high-water mark: got %d want 150", r.highWaterMark) 415 - } 416 - if !r.shouldSkipEvent(149) { 417 - t.Fatalf("expected event 149 to be skipped") 418 - } 419 - if !r.shouldSkipEvent(150) { 420 - t.Fatalf("expected event 150 to be skipped") 421 - } 422 - if r.shouldSkipEvent(151) { 423 - t.Fatalf("expected event 151 to be processed") 424 398 } 425 399 } 426 400