a love letter to tangled (android, iOS, and a search API)
19
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: backfill status and ack config

+505 -35
+1
apps/twisted/package.json
··· 15 15 "dependencies": { 16 16 "@atcute/bluesky": "^3.3.0", 17 17 "@atcute/client": "^4.2.1", 18 + "@atcute/oauth-browser-client": "^3.0.0", 18 19 "@atcute/tangled": "^1.0.17", 19 20 "@capacitor/android": "^8.2.0", 20 21 "@capacitor/app": "8.0.1",
+3
docs/api/specs/04-data-pipeline.md
··· 114 114 ### Cursor Persistence Rules 115 115 116 116 - If DB commit fails → cursor does not advance → event will be retried 117 + - After successful DB writes, ack Tap first, then persist cursor for operator-visible resume 118 + - If ack fails → cursor does not advance 119 + - If ack succeeds but cursor persistence fails → retry cursor persistence until successful or process exit 117 120 - If normalization fails → log error, optionally dead-letter, skip → cursor advances 118 121 - If embedding scheduling fails → document remains keyword-searchable → cursor advances 119 122
+4 -1
docs/api/specs/07-graph-backfill.md
··· 59 59 60 60 For each discovered user: 61 61 62 - 1. **Check if already tracked**: Query Tap's `/info/:did` endpoint — if the repo is already tracked and backfilled, skip 62 + 1. **Check Tap status**: Query Tap's `/info/:did` endpoint and classify by status: 63 + - tracked + backfilled: skip 64 + - tracked + backfilling/in-progress: skip and let current backfill finish 65 + - untracked or tracked-without-backfill-state: submit to `/repos/add` 63 66 2. **Register with Tap**: POST to `/repos/add` with the DID — Tap handles the actual repo export and event delivery 64 67 3. **Tap backfill flow**: Tap fetches full repo history from PDS via `com.atproto.sync.getRepo`, then delivers historical events (`live: false`) through the normal WebSocket channel 65 68 4. **Indexer processes normally**: The indexer's existing ingestion loop handles backfill events the same as live events — no special backfill code path needed
+2 -2
docs/api/tasks/phase-1-mvp.md
··· 203 203 204 204 ### Deliverables 205 205 206 - - HTTP server (chi or net/http) 206 + - HTTP server (net/http) 207 207 - `GET /healthz` — liveness 208 208 - `GET /readyz` — readiness (DB connectivity) 209 209 - `GET /search` — keyword search with configurable mode ··· 214 214 215 215 ### Tasks 216 216 217 - - [ ] Set up HTTP server with chi router 217 + - [ ] Set up HTTP server with net/http router 218 218 - [ ] Implement `/healthz` (always 200) and `/readyz` (SELECT 1 against DB) 219 219 - [ ] Implement search repository with FTS queries: 220 220
+7 -7
docs/app/tasks/phase-4.md
··· 3 3 ## OAuth Setup 4 4 5 5 - [ ] Install `@atcute/oauth-browser-client` 6 - - [ ] Host OAuth client metadata JSON at a public URL (or configure for local dev) 6 + - [ ] Host OAuth client metadata JSON at a public URL & configure for local dev 7 7 - [ ] Create `core/auth/oauth.ts` — call `configureOAuth()` with client metadata URL and redirect URI 8 8 - [ ] Create `core/auth/session.ts` — session management: get, list, delete stored sessions 9 9 - [ ] Create `core/auth/store.ts` — Pinia auth store with state machine (idle → authenticating → authenticated → error) ··· 62 62 - [ ] Add reaction button/picker to PR and issue detail views 63 63 - [ ] Show reaction counts grouped by type 64 64 65 - ## Personalized Feed 66 - 67 - - [ ] When signed in, filter activity feed to show activity from followed users and starred repos 68 - - [ ] Add "For You" / "Global" toggle on Activity tab 69 - - [ ] If appview provides a personalized endpoint, use it; otherwise filter client-side 70 - 71 65 ## Profile Tab (Authenticated) 72 66 73 67 - [ ] Wire Profile tab to show current user's profile data 74 68 - [ ] Show pinned repos, stats, starred repos, following list 75 69 - [ ] Add logout button 76 70 - [ ] Add account switcher UI 71 + 72 + ## Personalized Feed 73 + 74 + - [ ] When signed in, filter activity feed to show activity from followed users and starred repos 75 + - [ ] Add "For You" / "Global" toggle on Activity tab 76 + - [ ] If appview provides a personalized endpoint, use it; otherwise filter client-side 77 77 78 78 ## Quality 79 79
+9 -2
packages/api/internal/backfill/backfill.go
··· 83 83 } 84 84 85 85 alreadyTracked := 0 86 + inProgress := 0 86 87 toSubmit := make([]string, 0, len(discovered)) 87 88 for _, user := range discovered { 88 - tracked, err := r.tap.IsTracked(ctx, user.DID) 89 + status, err := r.tap.RepoStatus(ctx, user.DID) 89 90 if err != nil { 90 91 return fmt.Errorf("tap info for %s: %w", user.DID, err) 91 92 } 92 - if tracked { 93 + if status.Tracked && status.Backfilled { 93 94 alreadyTracked++ 94 95 continue 95 96 } 97 + if status.Tracked && status.Backfilling { 98 + inProgress++ 99 + continue 100 + } 96 101 toSubmit = append(toSubmit, user.DID) 97 102 } 98 103 99 104 r.log.Info("tap classification complete", 100 105 slog.Int("already_tracked", alreadyTracked), 106 + slog.Int("backfill_in_progress", inProgress), 101 107 slog.Int("to_submit", len(toSubmit)), 102 108 ) 103 109 ··· 130 136 r.log.Info("backfill complete", 131 137 slog.Int("discovered_total", len(discovered)), 132 138 slog.Int("already_tracked", alreadyTracked), 139 + slog.Int("backfill_in_progress", inProgress), 133 140 slog.Int("submitted", submitted), 134 141 ) 135 142 return nil
+34 -6
packages/api/internal/backfill/backfill_test.go
··· 26 26 } 27 27 28 28 type fakeTapAdmin struct { 29 - tracked map[string]bool 30 - added [][]string 29 + statuses map[string]RepoStatus 30 + added [][]string 31 31 } 32 32 33 - func (f *fakeTapAdmin) IsTracked(_ context.Context, did string) (bool, error) { 34 - return f.tracked[did], nil 33 + func (f *fakeTapAdmin) RepoStatus(_ context.Context, did string) (RepoStatus, error) { 34 + if status, ok := f.statuses[did]; ok { 35 + return status, nil 36 + } 37 + return RepoStatus{Found: false, Tracked: false}, nil 35 38 } 36 39 37 40 func (f *fakeTapAdmin) AddRepos(_ context.Context, dids []string) error { ··· 59 62 }, 60 63 } 61 64 follows := &fakeFollowFetcher{follows: map[string][]string{"did:plc:seed": {"did:plc:f1"}}} 62 - tap := &fakeTapAdmin{tracked: map[string]bool{"did:plc:f1": true}} 65 + tap := &fakeTapAdmin{statuses: map[string]RepoStatus{"did:plc:f1": {Found: true, Tracked: true, Backfilled: true}}} 63 66 resolver := &fakeResolver{mapping: map[string]string{"alice.tangled.sh": "did:plc:seed"}} 64 67 log := slog.New(slog.NewTextHandler(io.Discard, nil)) 65 68 r := NewRunnerWithDeps(st, tap, resolver, follows, log) ··· 91 94 func TestRunner_DryRunSkipsMutations(t *testing.T) { 92 95 st := &fakeStore{collaborators: map[string][]string{}} 93 96 follows := &fakeFollowFetcher{follows: map[string][]string{}} 94 - tap := &fakeTapAdmin{tracked: map[string]bool{}} 97 + tap := &fakeTapAdmin{statuses: map[string]RepoStatus{}} 95 98 resolver := &fakeResolver{mapping: map[string]string{"alice.tangled.sh": "did:plc:seed"}} 96 99 log := slog.New(slog.NewTextHandler(io.Discard, nil)) 97 100 r := NewRunnerWithDeps(st, tap, resolver, follows, log) ··· 116 119 t.Fatalf("expected no tap submissions in dry-run, got %#v", tap.added) 117 120 } 118 121 } 122 + 123 + func TestRunner_SkipsInProgressBackfills(t *testing.T) { 124 + st := &fakeStore{collaborators: map[string][]string{}} 125 + follows := &fakeFollowFetcher{follows: map[string][]string{}} 126 + tap := &fakeTapAdmin{statuses: map[string]RepoStatus{ 127 + "did:plc:seed": {Found: true, Tracked: true, Backfilling: true}, 128 + }} 129 + resolver := &fakeResolver{mapping: map[string]string{"alice.tangled.sh": "did:plc:seed"}} 130 + log := slog.New(slog.NewTextHandler(io.Discard, nil)) 131 + r := NewRunnerWithDeps(st, tap, resolver, follows, log) 132 + 133 + dir := t.TempDir() 134 + seedsPath := filepath.Join(dir, "seeds.txt") 135 + if err := os.WriteFile(seedsPath, []byte("alice.tangled.sh\n"), 0o644); err != nil { 136 + t.Fatalf("write seeds: %v", err) 137 + } 138 + 139 + err := r.Run(context.Background(), Options{SeedsPath: seedsPath, MaxHops: 0}) 140 + if err != nil { 141 + t.Fatalf("run backfill: %v", err) 142 + } 143 + if len(tap.added) != 0 { 144 + t.Fatalf("expected no submission for in-progress did, got %#v", tap.added) 145 + } 146 + }
+103 -7
packages/api/internal/backfill/tap_admin.go
··· 6 6 "encoding/base64" 7 7 "encoding/json" 8 8 "fmt" 9 + "io" 9 10 "net/http" 10 11 "net/url" 11 12 "strings" ··· 13 14 ) 14 15 15 16 type tapAdmin interface { 16 - IsTracked(ctx context.Context, did string) (bool, error) 17 + RepoStatus(ctx context.Context, did string) (RepoStatus, error) 17 18 AddRepos(ctx context.Context, dids []string) error 19 + } 20 + 21 + type RepoStatus struct { 22 + Found bool 23 + Tracked bool 24 + Backfilled bool 25 + Backfilling bool 26 + State string 18 27 } 19 28 20 29 // HTTPTapAdmin calls Tap admin endpoints for backfill orchestration. ··· 38 47 }, nil 39 48 } 40 49 41 - func (t *HTTPTapAdmin) IsTracked(ctx context.Context, did string) (bool, error) { 50 + func (t *HTTPTapAdmin) RepoStatus(ctx context.Context, did string) (RepoStatus, error) { 42 51 endpoint := fmt.Sprintf("%s/info/%s", t.baseURL, url.PathEscape(did)) 43 52 req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil) 44 53 if err != nil { 45 - return false, fmt.Errorf("build tap info request: %w", err) 54 + return RepoStatus{}, fmt.Errorf("build tap info request: %w", err) 46 55 } 47 56 t.addAuth(req) 48 57 49 58 resp, err := t.client.Do(req) 50 59 if err != nil { 51 - return false, fmt.Errorf("tap info request: %w", err) 60 + return RepoStatus{}, fmt.Errorf("tap info request: %w", err) 52 61 } 53 62 defer resp.Body.Close() 54 63 55 64 if resp.StatusCode == http.StatusNotFound { 56 - return false, nil 65 + return RepoStatus{Found: false}, nil 57 66 } 58 67 if resp.StatusCode < 200 || resp.StatusCode >= 300 { 59 - return false, fmt.Errorf("tap info request failed: status %d", resp.StatusCode) 68 + return RepoStatus{}, fmt.Errorf("tap info request failed: status %d", resp.StatusCode) 69 + } 70 + 71 + body, err := io.ReadAll(resp.Body) 72 + if err != nil { 73 + return RepoStatus{}, fmt.Errorf("read tap info response: %w", err) 74 + } 75 + if len(bytes.TrimSpace(body)) == 0 { 76 + return RepoStatus{Found: true, Tracked: true}, nil 77 + } 78 + 79 + var payload map[string]any 80 + if err := json.Unmarshal(body, &payload); err != nil { 81 + return RepoStatus{}, fmt.Errorf("decode tap info response: %w", err) 82 + } 83 + 84 + status := RepoStatus{Found: true, Tracked: true} 85 + if tracked, ok := boolFromAnyWithPresence(payload, "tracked", "isTracked", "enabled", "registered"); ok { 86 + status.Tracked = tracked 87 + } 88 + status.Backfilled = boolFromAny(payload, "backfilled", "isBackfilled", "complete", "done") 89 + status.Backfilling = boolFromAny(payload, "backfilling", "inProgress", "in_progress", "pendingBackfill") 90 + status.State = stringFromAny(payload, "status", "state") 91 + 92 + if stateImpliesBackfilled(status.State) { 93 + status.Backfilled = true 94 + } 95 + if stateImpliesBackfilling(status.State) { 96 + status.Backfilling = true 60 97 } 61 - return true, nil 98 + return status, nil 62 99 } 63 100 64 101 func (t *HTTPTapAdmin) AddRepos(ctx context.Context, dids []string) error { ··· 126 163 } 127 164 return strings.TrimSuffix(u.String(), "/"), nil 128 165 } 166 + 167 + func boolFromAny(payload map[string]any, keys ...string) bool { 168 + v, _ := boolFromAnyWithPresence(payload, keys...) 169 + return v 170 + } 171 + 172 + func boolFromAnyWithPresence(payload map[string]any, keys ...string) (bool, bool) { 173 + for _, key := range keys { 174 + raw, ok := payload[key] 175 + if !ok { 176 + continue 177 + } 178 + switch v := raw.(type) { 179 + case bool: 180 + return v, true 181 + case float64: 182 + return v != 0, true 183 + case string: 184 + switch strings.ToLower(strings.TrimSpace(v)) { 185 + case "true", "1", "yes", "y", "active", "complete", "done", "backfilled", "backfilling", "in_progress", "in-progress": 186 + return true, true 187 + case "false", "0", "no", "n", "inactive": 188 + return false, true 189 + } 190 + } 191 + } 192 + return false, false 193 + } 194 + 195 + func stringFromAny(payload map[string]any, keys ...string) string { 196 + for _, key := range keys { 197 + raw, ok := payload[key] 198 + if !ok { 199 + continue 200 + } 201 + if value, ok := raw.(string); ok { 202 + return strings.TrimSpace(strings.ToLower(value)) 203 + } 204 + } 205 + return "" 206 + } 207 + 208 + func stateImpliesBackfilled(state string) bool { 209 + switch strings.TrimSpace(strings.ToLower(state)) { 210 + case "backfilled", "complete", "completed", "done", "ready", "synced": 211 + return true 212 + default: 213 + return false 214 + } 215 + } 216 + 217 + func stateImpliesBackfilling(state string) bool { 218 + switch strings.TrimSpace(strings.ToLower(state)) { 219 + case "backfilling", "in-progress", "in_progress", "pending", "queued", "running": 220 + return true 221 + default: 222 + return false 223 + } 224 + }
+86
packages/api/internal/backfill/tap_admin_test.go
··· 1 + package backfill 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "net/http" 7 + "net/http/httptest" 8 + "testing" 9 + ) 10 + 11 + func TestRepoStatusNotFound(t *testing.T) { 12 + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 13 + if r.URL.Path != "/info/did:plc:missing" { 14 + t.Fatalf("unexpected path: %s", r.URL.Path) 15 + } 16 + http.NotFound(w, r) 17 + })) 18 + defer ts.Close() 19 + 20 + admin, err := NewHTTPTapAdmin(ts.URL+"/channel", "") 21 + if err != nil { 22 + t.Fatalf("new admin: %v", err) 23 + } 24 + 25 + status, err := admin.RepoStatus(context.Background(), "did:plc:missing") 26 + if err != nil { 27 + t.Fatalf("repo status: %v", err) 28 + } 29 + if status.Found { 30 + t.Fatalf("expected found=false, got %#v", status) 31 + } 32 + if status.Tracked { 33 + t.Fatalf("expected tracked=false for 404") 34 + } 35 + } 36 + 37 + func TestRepoStatusParsesBackfillState(t *testing.T) { 38 + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 39 + if r.URL.Path != "/info/did:plc:abc" { 40 + t.Fatalf("unexpected path: %s", r.URL.Path) 41 + } 42 + w.Header().Set("Content-Type", "application/json") 43 + _, _ = fmt.Fprint(w, `{"tracked":true,"status":"backfilling"}`) 44 + })) 45 + defer ts.Close() 46 + 47 + admin, err := NewHTTPTapAdmin(ts.URL+"/channel", "") 48 + if err != nil { 49 + t.Fatalf("new admin: %v", err) 50 + } 51 + 52 + status, err := admin.RepoStatus(context.Background(), "did:plc:abc") 53 + if err != nil { 54 + t.Fatalf("repo status: %v", err) 55 + } 56 + if !status.Found || !status.Tracked { 57 + t.Fatalf("expected tracked found status, got %#v", status) 58 + } 59 + if !status.Backfilling { 60 + t.Fatalf("expected backfilling=true, got %#v", status) 61 + } 62 + if status.Backfilled { 63 + t.Fatalf("expected backfilled=false, got %#v", status) 64 + } 65 + } 66 + 67 + func TestRepoStatusParsesExplicitTrackedFalse(t *testing.T) { 68 + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { 69 + w.Header().Set("Content-Type", "application/json") 70 + _, _ = fmt.Fprint(w, `{"tracked":false}`) 71 + })) 72 + defer ts.Close() 73 + 74 + admin, err := NewHTTPTapAdmin(ts.URL+"/channel", "") 75 + if err != nil { 76 + t.Fatalf("new admin: %v", err) 77 + } 78 + 79 + status, err := admin.RepoStatus(context.Background(), "did:plc:abc") 80 + if err != nil { 81 + t.Fatalf("repo status: %v", err) 82 + } 83 + if status.Tracked { 84 + t.Fatalf("expected tracked=false, got %#v", status) 85 + } 86 + }
+76 -2
packages/api/internal/ingest/ingest.go
··· 5 5 "fmt" 6 6 "log/slog" 7 7 "math" 8 + "strconv" 8 9 "strings" 9 10 "sync" 10 11 "time" ··· 33 34 allowlist allowlist 34 35 consumerName string 35 36 log *slog.Logger 37 + resumeCursor int64 36 38 37 39 statusMu sync.Mutex 38 40 lastCursor string ··· 55 57 56 58 func (r *Runner) Run(ctx context.Context) error { 57 59 defer r.tap.Close() 60 + if err := r.initializeCursor(ctx); err != nil { 61 + return err 62 + } 58 63 59 64 go r.runStatusLogger(ctx) 60 65 ··· 72 77 continue 73 78 } 74 79 80 + if r.shouldSkipEvent(event.ID) { 81 + if err := r.tap.AckEvent(ctx, event.ID); err != nil { 82 + r.log.Warn("tap ack skipped event failed", 83 + slog.Int64("event_id", event.ID), 84 + slog.String("error", err.Error()), 85 + ) 86 + continue 87 + } 88 + r.log.Info("skipped previously-processed event", slog.Int64("event_id", event.ID), slog.Int64("resume_cursor", r.resumeCursor)) 89 + continue 90 + } 91 + 75 92 if err := r.processWithRetry(ctx, event); err != nil { 76 93 if ctx.Err() != nil { 77 94 return nil ··· 81 98 } 82 99 } 83 100 101 + func (r *Runner) initializeCursor(ctx context.Context) error { 102 + state, err := r.store.GetSyncState(ctx, r.consumerName) 103 + if err != nil { 104 + return fmt.Errorf("load sync cursor: %w", err) 105 + } 106 + if state == nil || strings.TrimSpace(state.Cursor) == "" { 107 + r.log.Info("indexer cursor resume disabled", slog.String("reason", "no prior sync_state")) 108 + return nil 109 + } 110 + 111 + cursor, err := strconv.ParseInt(strings.TrimSpace(state.Cursor), 10, 64) 112 + if err != nil { 113 + r.log.Warn("indexer cursor parse failed; resume disabled", 114 + slog.String("cursor", state.Cursor), 115 + slog.String("error", err.Error()), 116 + ) 117 + return nil 118 + } 119 + 120 + r.resumeCursor = cursor 121 + r.statusMu.Lock() 122 + r.lastCursor = state.Cursor 123 + r.statusMu.Unlock() 124 + r.log.Info("indexer cursor resume enabled", slog.Int64("resume_cursor", cursor)) 125 + return nil 126 + } 127 + 128 + func (r *Runner) shouldSkipEvent(eventID int64) bool { 129 + return r.resumeCursor > 0 && eventID <= r.resumeCursor 130 + } 131 + 84 132 func (r *Runner) processWithRetry(ctx context.Context, event normalize.TapRecordEvent) error { 85 133 attempt := 0 86 134 for { ··· 227 275 228 276 func (r *Runner) advanceCursorAndAck(ctx context.Context, eventID int64) error { 229 277 cursor := fmt.Sprintf("%d", eventID) 230 - if err := r.store.SetSyncState(ctx, r.consumerName, cursor); err != nil { 278 + if err := r.tap.AckEvent(ctx, eventID); err != nil { 231 279 return err 232 280 } 233 - if err := r.tap.AckEvent(ctx, eventID); err != nil { 281 + if err := r.persistCursorWithRetry(ctx, cursor, eventID); err != nil { 234 282 return err 235 283 } 236 284 r.markProcessed(cursor) 237 285 return nil 286 + } 287 + 288 + func (r *Runner) persistCursorWithRetry(ctx context.Context, cursor string, eventID int64) error { 289 + attempt := 0 290 + for { 291 + if ctx.Err() != nil { 292 + return ctx.Err() 293 + } 294 + if err := r.store.SetSyncState(ctx, r.consumerName, cursor); err == nil { 295 + return nil 296 + } else { 297 + attempt++ 298 + backoff := retryBackoff(attempt) 299 + r.log.Error("cursor persist failed after ack", 300 + slog.Int64("event_id", eventID), 301 + slog.Int("attempt", attempt), 302 + slog.Duration("retry_in", backoff), 303 + slog.String("error", err.Error()), 304 + ) 305 + select { 306 + case <-ctx.Done(): 307 + return ctx.Err() 308 + case <-time.After(backoff): 309 + } 310 + } 311 + } 238 312 } 239 313 240 314 func (r *Runner) runStatusLogger(ctx context.Context) {
+74 -1
packages/api/internal/ingest/ingest_test.go
··· 12 12 13 13 type fakeTapClient struct { 14 14 acked []int64 15 + onAck func(id int64) 15 16 } 16 17 17 18 func (f *fakeTapClient) ReadEvent(_ context.Context) (normalize.TapRecordEvent, error) { ··· 19 20 } 20 21 21 22 func (f *fakeTapClient) AckEvent(_ context.Context, id int64) error { 23 + if f.onAck != nil { 24 + f.onAck(id) 25 + } 22 26 f.acked = append(f.acked, id) 23 27 return nil 24 28 } ··· 29 33 docs map[string]*store.Document 30 34 deleted map[string]bool 31 35 syncCursor string 36 + initialSync *store.SyncState 32 37 recordStates map[string]string 33 38 handles map[string]string 34 39 enqueued map[string]bool 40 + onSetSync func() 35 41 } 36 42 37 43 func newFakeStore() *fakeStore { ··· 60 66 } 61 67 62 68 func (f *fakeStore) GetSyncState(_ context.Context, _ string) (*store.SyncState, error) { 63 - return nil, nil 69 + if f.initialSync != nil { 70 + state := *f.initialSync 71 + return &state, nil 72 + } 73 + if f.syncCursor == "" { 74 + return nil, nil 75 + } 76 + return &store.SyncState{ConsumerName: "indexer-tap-v1", Cursor: f.syncCursor}, nil 64 77 } 65 78 66 79 func (f *fakeStore) SetSyncState(_ context.Context, _ string, cursor string) error { 80 + if f.onSetSync != nil { 81 + f.onSetSync() 82 + } 67 83 f.syncCursor = cursor 68 84 return nil 69 85 } ··· 264 280 t.Fatal("unexpected match") 265 281 } 266 282 } 283 + 284 + func TestRunner_InitializeCursorResume(t *testing.T) { 285 + st := newFakeStore() 286 + st.initialSync = &store.SyncState{ConsumerName: "indexer-tap-v1", Cursor: "150"} 287 + tap := &fakeTapClient{} 288 + r := newRunnerForTest(st, tap, "sh.tangled.*") 289 + 290 + if err := r.initializeCursor(context.Background()); err != nil { 291 + t.Fatalf("initialize cursor: %v", err) 292 + } 293 + if r.resumeCursor != 150 { 294 + t.Fatalf("resume cursor: got %d want 150", r.resumeCursor) 295 + } 296 + if !r.shouldSkipEvent(149) { 297 + t.Fatalf("expected event 149 to be skipped") 298 + } 299 + if !r.shouldSkipEvent(150) { 300 + t.Fatalf("expected event 150 to be skipped") 301 + } 302 + if r.shouldSkipEvent(151) { 303 + t.Fatalf("expected event 151 to be processed") 304 + } 305 + } 306 + 307 + func TestRunner_AckBeforeCursorPersist(t *testing.T) { 308 + st := newFakeStore() 309 + tap := &fakeTapClient{} 310 + r := newRunnerForTest(st, tap, "sh.tangled.*") 311 + 312 + acked := false 313 + tap.onAck = func(_ int64) { acked = true } 314 + st.onSetSync = func() { 315 + if !acked { 316 + t.Fatalf("cursor persisted before ack") 317 + } 318 + } 319 + 320 + event := normalize.TapRecordEvent{ 321 + ID: 901, 322 + Type: "record", 323 + Record: &normalize.TapRecord{ 324 + DID: "did:plc:author", 325 + Collection: "sh.tangled.repo", 326 + RKey: "repo1", 327 + Action: "create", 328 + CID: "cid-1", 329 + Record: map[string]any{ 330 + "name": "repo-one", 331 + "description": "test repo", 332 + }, 333 + }, 334 + } 335 + 336 + if err := r.processEvent(context.Background(), event); err != nil { 337 + t.Fatalf("process event: %v", err) 338 + } 339 + }
+28 -7
packages/api/internal/tapclient/tapclient.go
··· 8 8 "log/slog" 9 9 "math/rand/v2" 10 10 "net/http" 11 + "os" 11 12 "strconv" 12 13 "strings" 13 14 "sync" ··· 30 31 password string 31 32 log *slog.Logger 32 33 33 - mu sync.Mutex 34 - conn *websocket.Conn 35 - ackAsJSON bool 34 + mu sync.Mutex 35 + conn *websocket.Conn 36 + ackAsJSON bool 37 + disableAcks bool 36 38 } 37 39 38 40 func New(url, password string, log *slog.Logger) *Client { 39 41 if log == nil { 40 42 log = slog.Default() 41 43 } 44 + disableAcks, _ := strconv.ParseBool(strings.TrimSpace(os.Getenv("TAP_DISABLE_ACKS"))) 42 45 return &Client{ 43 - url: url, 44 - password: password, 45 - log: log, 46 - ackAsJSON: true, 46 + url: url, 47 + password: password, 48 + log: log, 49 + ackAsJSON: true, 50 + disableAcks: disableAcks, 47 51 } 48 52 } 49 53 ··· 75 79 } 76 80 77 81 func (c *Client) AckEvent(ctx context.Context, id int64) error { 82 + if c.disableAcks { 83 + return nil 84 + } 85 + 78 86 conn, err := c.ensureConnected(ctx) 79 87 if err != nil { 80 88 return err ··· 90 98 return nil 91 99 } else if isConnectionWriteError(err) { 92 100 c.resetConn(websocket.StatusInternalError, "ack json write failed") 101 + return fmt.Errorf("ack event %d: %w", id, err) 102 + } else if !isAckFormatError(err) { 93 103 return fmt.Errorf("ack event %d: %w", id, err) 94 104 } 95 105 ··· 222 232 strings.Contains(msg, "closed network connection") || 223 233 strings.Contains(msg, "i/o timeout") 224 234 } 235 + 236 + func isAckFormatError(err error) bool { 237 + if err == nil { 238 + return false 239 + } 240 + msg := strings.ToLower(err.Error()) 241 + return strings.Contains(msg, "invalid") || 242 + strings.Contains(msg, "unsupported") || 243 + strings.Contains(msg, "bad payload") || 244 + strings.Contains(msg, "unexpected message") 245 + }
+78
pnpm-lock.yaml
··· 16 16 '@atcute/client': 17 17 specifier: ^4.2.1 18 18 version: 4.2.1 19 + '@atcute/oauth-browser-client': 20 + specifier: ^3.0.0 21 + version: 3.0.0(@atcute/identity@1.1.4) 19 22 '@atcute/tangled': 20 23 specifier: ^1.0.17 21 24 version: 1.0.17 ··· 152 155 '@atcute/client@4.2.1': 153 156 resolution: {integrity: sha512-ZBFM2pW075JtgGFu5g7HHZBecrClhlcNH8GVP9Zz1aViWR+cjjBsTpeE63rJs+FCOHFYlirUyo5L8SGZ4kMINw==} 154 157 158 + '@atcute/identity-resolver@1.2.2': 159 + resolution: {integrity: sha512-eUh/UH4bFvuXS0X7epYCeJC/kj4rbBXfSRumLEH4smMVwNOgTo7cL/0Srty+P/qVPoZEyXdfEbS0PHJyzoXmHw==} 160 + peerDependencies: 161 + '@atcute/identity': ^1.0.0 162 + 155 163 '@atcute/identity@1.1.4': 156 164 resolution: {integrity: sha512-RCw1IqflfuSYCxK5m0lZCm0UnvIzcUnuhngiBhJEJb9a9Mc2SEf1xP3H8N5r8pvEH1LoAYd6/zrvCNU+uy9esw==} 157 165 158 166 '@atcute/lexicons@1.2.9': 159 167 resolution: {integrity: sha512-/RRHm2Cw9o8Mcsrq0eo8fjS9okKYLGfuFwrQ0YoP/6sdSDsXshaTLJsvLlcUcaDaSJ1YFOuHIo3zr2Om2F/16g==} 160 168 169 + '@atcute/multibase@1.2.0': 170 + resolution: {integrity: sha512-ZK2GRra+qIYq9nNuQB52m2ul0hOmCQEtPobGfTSUxm7pF0OGEkWGkWHugFhNEDVzHzTwPxHp6VGotdZFue4lYQ==} 171 + 172 + '@atcute/oauth-browser-client@3.0.0': 173 + resolution: {integrity: sha512-7AbKV8tTe7aRJNJV7gCcWHSVEADb2nr58O1p7dQsf73HSe9pvlBkj/Vk1yjjtH691uAVYkwhHSh0bC7D8XdwJw==} 174 + 175 + '@atcute/oauth-crypto@0.1.0': 176 + resolution: {integrity: sha512-qZYDCNLF/4B6AndYT1rsQelN8621AC5u/sL5PHvlr/qqAbmmUwCBGjEgRSyZtHE1AqD60VNiSMlOgAuEQTSl3w==} 177 + 178 + '@atcute/oauth-keyset@0.1.0': 179 + resolution: {integrity: sha512-+wqT/+I5Lg9VzKnKY3g88+N45xbq+wsdT6bHDGqCVa2u57gRvolFF4dY+weMfc/OX641BIZO6/o+zFtKBsMQnQ==} 180 + 181 + '@atcute/oauth-types@0.1.1': 182 + resolution: {integrity: sha512-u+3KMjse3Uc/9hDyilu1QVN7IpcnjVXgRzhddzBB8Uh6wePHNVBDdi9wQvFTVVA3zmxtMJVptXRyLLg6Ou9bqg==} 183 + 161 184 '@atcute/tangled@1.0.17': 162 185 resolution: {integrity: sha512-YE2KXYSawWbASt/XB6bi0gPGkF089+wsbxQcqtksrSy7j02LGOKUFgX1gfG1ZImv40eQHjFnhLHr1OZVXeflZA==} 163 186 164 187 '@atcute/uint8array@1.1.1': 165 188 resolution: {integrity: sha512-3LsC8XB8TKe9q/5hOA5sFuzGaIFdJZJNewC5OKa3o/eU6+K7JR6see9Zy2JbQERNVnRl11EzbNov1efgLMAs4g==} 189 + 190 + '@atcute/util-fetch@1.0.5': 191 + resolution: {integrity: sha512-qjHj01BGxjSjIFdPiAjSARnodJIIyKxnCMMEcXMESo9TAyND6XZQqrie5fia+LlYWVXdpsTds8uFQwc9jdKTig==} 166 192 167 193 '@atcute/util-text@1.2.0': 168 194 resolution: {integrity: sha512-b8WSh+Z7K601eUFFmTFj8QPKDO8Ic0VDDj63sdKzpkm+ySQKsYT5nXekViGqFVKbyKj1V5FyvZvgXad6/aI4QQ==} ··· 2550 2576 engines: {node: ^10 || ^12 || ^13.7 || ^14 || >=15.0.1} 2551 2577 hasBin: true 2552 2578 2579 + nanoid@5.1.7: 2580 + resolution: {integrity: sha512-ua3NDgISf6jdwezAheMOk4mbE1LXjm1DfMUDMuJf4AqxLFK3ccGpgWizwa5YV7Yz9EpXwEaWoRXSb/BnV0t5dQ==} 2581 + engines: {node: ^18 || >=20} 2582 + hasBin: true 2583 + 2553 2584 native-run@2.0.3: 2554 2585 resolution: {integrity: sha512-U1PllBuzW5d1gfan+88L+Hky2eZx+9gv3Pf6rNBxKbORxi7boHzqiA6QFGSnqMem4j0A9tZ08NMIs5+0m/VS1Q==} 2555 2586 engines: {node: '>=16.0.0'} ··· 3402 3433 '@atcute/identity': 1.1.4 3403 3434 '@atcute/lexicons': 1.2.9 3404 3435 3436 + '@atcute/identity-resolver@1.2.2(@atcute/identity@1.1.4)': 3437 + dependencies: 3438 + '@atcute/identity': 1.1.4 3439 + '@atcute/lexicons': 1.2.9 3440 + '@atcute/util-fetch': 1.0.5 3441 + '@badrap/valita': 0.4.6 3442 + 3405 3443 '@atcute/identity@1.1.4': 3406 3444 dependencies: 3407 3445 '@atcute/lexicons': 1.2.9 ··· 3414 3452 '@standard-schema/spec': 1.1.0 3415 3453 esm-env: 1.2.2 3416 3454 3455 + '@atcute/multibase@1.2.0': 3456 + dependencies: 3457 + '@atcute/uint8array': 1.1.1 3458 + 3459 + '@atcute/oauth-browser-client@3.0.0(@atcute/identity@1.1.4)': 3460 + dependencies: 3461 + '@atcute/client': 4.2.1 3462 + '@atcute/identity-resolver': 1.2.2(@atcute/identity@1.1.4) 3463 + '@atcute/lexicons': 1.2.9 3464 + '@atcute/multibase': 1.2.0 3465 + '@atcute/oauth-crypto': 0.1.0 3466 + '@atcute/oauth-types': 0.1.1 3467 + nanoid: 5.1.7 3468 + transitivePeerDependencies: 3469 + - '@atcute/identity' 3470 + 3471 + '@atcute/oauth-crypto@0.1.0': 3472 + dependencies: 3473 + '@atcute/multibase': 1.2.0 3474 + '@atcute/uint8array': 1.1.1 3475 + '@badrap/valita': 0.4.6 3476 + nanoid: 5.1.7 3477 + 3478 + '@atcute/oauth-keyset@0.1.0': 3479 + dependencies: 3480 + '@atcute/oauth-crypto': 0.1.0 3481 + 3482 + '@atcute/oauth-types@0.1.1': 3483 + dependencies: 3484 + '@atcute/identity': 1.1.4 3485 + '@atcute/lexicons': 1.2.9 3486 + '@atcute/oauth-keyset': 0.1.0 3487 + '@badrap/valita': 0.4.6 3488 + 3417 3489 '@atcute/tangled@1.0.17': 3418 3490 dependencies: 3419 3491 '@atcute/atproto': 3.1.10 3420 3492 '@atcute/lexicons': 1.2.9 3421 3493 3422 3494 '@atcute/uint8array@1.1.1': {} 3495 + 3496 + '@atcute/util-fetch@1.0.5': 3497 + dependencies: 3498 + '@badrap/valita': 0.4.6 3423 3499 3424 3500 '@atcute/util-text@1.2.0': 3425 3501 dependencies: ··· 6028 6104 muggle-string@0.4.1: {} 6029 6105 6030 6106 nanoid@3.3.11: {} 6107 + 6108 + nanoid@5.1.7: {} 6031 6109 6032 6110 native-run@2.0.3: 6033 6111 dependencies: