a love letter to tangled (android, iOS, and a search API)
19
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor: replace resumeCursor with highWaterMark for event processing (ack resilience)

+90 -12
+30 -10
packages/api/internal/ingest/ingest.go
··· 37 37 processor *idx.Processor 38 38 consumerName string 39 39 log *slog.Logger 40 - resumeCursor int64 41 40 42 41 statusMu sync.Mutex 42 + highWaterMark int64 43 43 lastCursor string 44 44 processedTick int64 45 45 } ··· 98 98 ) 99 99 continue 100 100 } 101 - r.log.Debug("skipped previously-processed event", slog.Int64("event_id", event.ID), slog.Int64("resume_cursor", r.resumeCursor)) 101 + r.statusMu.Lock() 102 + highWaterMark := r.highWaterMark 103 + r.statusMu.Unlock() 104 + r.log.Debug("skipped previously-processed event", slog.Int64("event_id", event.ID), slog.Int64("resume_cursor", highWaterMark)) 102 105 continue 103 106 } 104 107 ··· 130 133 return nil 131 134 } 132 135 133 - r.resumeCursor = cursor 134 136 r.statusMu.Lock() 137 + r.highWaterMark = cursor 135 138 r.lastCursor = state.Cursor 136 139 r.statusMu.Unlock() 137 140 r.log.Info("indexer cursor resume enabled", slog.Int64("resume_cursor", cursor)) ··· 139 142 } 140 143 141 144 func (r *Runner) shouldSkipEvent(eventID int64) bool { 142 - return r.resumeCursor > 0 && eventID <= r.resumeCursor 145 + r.statusMu.Lock() 146 + defer r.statusMu.Unlock() 147 + return r.highWaterMark > 0 && eventID <= r.highWaterMark 143 148 } 144 149 145 150 func (r *Runner) processWithRetry(ctx context.Context, event normalize.TapRecordEvent) error { ··· 227 232 } 228 233 229 234 func (r *Runner) advanceCursorAndAck(ctx context.Context, eventID int64) error { 230 - cursor := fmt.Sprintf("%d", eventID) 231 - if err := r.persistCursorWithRetry(ctx, cursor, eventID); err != nil { 232 - return err 235 + cursorID, shouldPersist := r.nextCursor(eventID) 236 + cursor := fmt.Sprintf("%d", cursorID) 237 + if shouldPersist { 238 + if err := r.persistCursorWithRetry(ctx, cursor, eventID); err != nil { 239 + return err 240 + } 233 241 } 234 242 if err := r.tap.AckEvent(ctx, eventID); err != nil { 235 243 return err 236 244 } 237 - r.markProcessed(cursor) 245 + r.markProcessed(cursorID) 238 246 return nil 239 247 } 240 248 ··· 293 301 } 294 302 } 295 303 296 - func (r *Runner) markProcessed(cursor string) { 304 + func (r *Runner) nextCursor(eventID int64) (int64, bool) { 297 305 r.statusMu.Lock() 298 - r.lastCursor = cursor 306 + defer r.statusMu.Unlock() 307 + if eventID <= r.highWaterMark { 308 + return r.highWaterMark, false 309 + } 310 + return eventID, true 311 + } 312 + 313 + func (r *Runner) markProcessed(cursor int64) { 314 + r.statusMu.Lock() 315 + if cursor > r.highWaterMark { 316 + r.highWaterMark = cursor 317 + } 318 + r.lastCursor = fmt.Sprintf("%d", r.highWaterMark) 299 319 r.processedTick++ 300 320 r.statusMu.Unlock() 301 321 }
+60 -2
packages/api/internal/ingest/ingest_test.go
··· 270 270 } 271 271 } 272 272 273 + func TestRunner_CursorHighWaterMarkDoesNotRegress(t *testing.T) { 274 + st := newFakeStore() 275 + tap := &fakeTapClient{} 276 + r := newRunnerForTest(st, tap, "sh.tangled.*") 277 + 278 + newer := normalize.TapRecordEvent{ 279 + ID: 200, 280 + Type: "identity", 281 + Identity: &normalize.TapIdentity{ 282 + DID: "did:plc:newer", 283 + Handle: "newer.tangled.org", 284 + }, 285 + } 286 + if err := r.processEvent(context.Background(), newer); err != nil { 287 + t.Fatalf("process newer event: %v", err) 288 + } 289 + if st.syncCursor != "200" { 290 + t.Fatalf("cursor after newer event: got %q want 200", st.syncCursor) 291 + } 292 + 293 + older := normalize.TapRecordEvent{ 294 + ID: 150, 295 + Type: "identity", 296 + Identity: &normalize.TapIdentity{ 297 + DID: "did:plc:older", 298 + Handle: "older.tangled.org", 299 + }, 300 + } 301 + if err := r.processEvent(context.Background(), older); err != nil { 302 + t.Fatalf("process older event: %v", err) 303 + } 304 + 305 + if st.syncCursor != "200" { 306 + t.Fatalf("cursor regressed: got %q want 200", st.syncCursor) 307 + } 308 + if !r.shouldSkipEvent(150) { 309 + t.Fatal("expected older event id to be skipped once a newer cursor is recorded") 310 + } 311 + } 312 + 313 + func TestRunner_InitializeCursorUsesHighWaterMark(t *testing.T) { 314 + st := newFakeStore() 315 + st.initialSync = &store.SyncState{ConsumerName: "indexer-tap-v1", Cursor: "150"} 316 + tap := &fakeTapClient{} 317 + r := newRunnerForTest(st, tap, "sh.tangled.*") 318 + 319 + if err := r.initializeCursor(context.Background()); err != nil { 320 + t.Fatalf("initialize cursor: %v", err) 321 + } 322 + 323 + if !r.shouldSkipEvent(150) { 324 + t.Fatal("expected stored cursor to act as skip high-water mark") 325 + } 326 + if r.shouldSkipEvent(151) { 327 + t.Fatal("did not expect events above the high-water mark to be skipped") 328 + } 329 + } 330 + 273 331 func TestRunner_ProcessStateEvent(t *testing.T) { 274 332 st := newFakeStore() 275 333 tap := &fakeTapClient{} ··· 352 410 if err := r.initializeCursor(context.Background()); err != nil { 353 411 t.Fatalf("initialize cursor: %v", err) 354 412 } 355 - if r.resumeCursor != 150 { 356 - t.Fatalf("resume cursor: got %d want 150", r.resumeCursor) 413 + if r.highWaterMark != 150 { 414 + t.Fatalf("high-water mark: got %d want 150", r.highWaterMark) 357 415 } 358 416 if !r.shouldSkipEvent(149) { 359 417 t.Fatalf("expected event 149 to be skipped")