a love letter to tangled (android, iOS, and a search API)
19
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: Tap event ingestion and handle management

+904 -9
+13
docs/TODO.md
··· 1 + --- 2 + title: To-Dos 3 + updated: 2026-03-23 4 + --- 5 + 6 + A catch-all for ideas, issues/bugs, and future work that doesn't fit into the current specs or tasks. This is a "parking lot." 7 + 8 + ## App 9 + 10 + - Repo stars, forks, etc. are not properly parsed from JSON. 11 + - ATOM/RSS feed link for repos: (`tangled.org/{did}/{repo}/feed.atom`) 12 + 13 + ## API
+8 -8
docs/api/tasks/phase-1-mvp.md
··· 51 51 52 52 ### Tasks 53 53 54 - - [ ] Define Tap event DTOs matching the documented event shape: 54 + - [x] Define Tap event DTOs matching the documented event shape: 55 55 56 56 ```go 57 57 type TapEvent struct { ··· 78 78 } 79 79 ``` 80 80 81 - - [ ] Implement WebSocket client: 81 + - [x] Implement WebSocket client: 82 82 - Connect to `TAP_URL` (e.g., `wss://tap.railway.internal/channel`) 83 83 - HTTP Basic auth with `admin:TAP_AUTH_PASSWORD` 84 84 - Auto-reconnect with exponential backoff 85 85 - Ack protocol: send event `id` back after successful processing 86 - - [ ] Implement ingestion loop: 86 + - [x] Implement ingestion loop: 87 87 1. Receive event from WebSocket 88 88 2. If `type == "identity"` → update handle cache, ack, continue 89 89 3. If `type == "record"` → check collection allowlist ··· 91 91 5. Decode `record.record` via adapter registry 92 92 6. Normalize to `Document` 93 93 7. Upsert to store 94 - 8. Schedule embedding job if eligible (Phase 2) 94 + 8. Schedule embedding job if eligible ([Phase 2](phase-2-semantic.md)) 95 95 9. Persist cursor (event ID) after successful DB commit 96 96 10. Ack the event 97 - - [ ] Implement collection allowlist from `INDEXED_COLLECTIONS` config 98 - - [ ] Handle state events (`sh.tangled.repo.issue.state`, `sh.tangled.repo.pull.status`) → update `record_state` 99 - - [ ] Handle normalization failures: log, skip, advance cursor 100 - - [ ] Handle DB failures: retry with backoff, do not advance cursor 97 + - [x] Implement collection allowlist from `INDEXED_COLLECTIONS` config 98 + - [x] Handle state events (`sh.tangled.repo.issue.state`, `sh.tangled.repo.pull.status`) → update `record_state` 99 + - [x] Handle normalization failures: log, skip, advance cursor 100 + - [x] Handle DB failures: retry with backoff, do not advance cursor 101 101 102 102 ### Verification 103 103
+280
packages/api/internal/ingest/ingest.go
··· 1 1 package ingest 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "log/slog" 7 + "math" 8 + "strings" 9 + "time" 10 + 11 + "tangled.org/desertthunder.dev/twister/internal/normalize" 12 + "tangled.org/desertthunder.dev/twister/internal/store" 13 + ) 14 + 15 + const ( 16 + defaultConsumerName = "indexer-tap-v1" 17 + maxDBRetryBackoff = 5 * time.Second 18 + ) 19 + 20 + type client interface { 21 + ReadEvent(ctx context.Context) (normalize.TapRecordEvent, error) 22 + AckEvent(ctx context.Context, id int64) error 23 + Close() error 24 + } 25 + 26 + // Runner ingests Tap events into the store. 27 + type Runner struct { 28 + store store.Store 29 + registry *normalize.Registry 30 + tap client 31 + allowlist allowlist 32 + consumerName string 33 + log *slog.Logger 34 + } 35 + 36 + func NewRunner(st store.Store, registry *normalize.Registry, tap client, indexedCollections string, log *slog.Logger) *Runner { 37 + if log == nil { 38 + log = slog.Default() 39 + } 40 + return &Runner{ 41 + store: st, 42 + registry: registry, 43 + tap: tap, 44 + allowlist: parseAllowlist(indexedCollections), 45 + consumerName: defaultConsumerName, 46 + log: log, 47 + } 48 + } 49 + 50 + func (r *Runner) Run(ctx context.Context) error { 51 + defer r.tap.Close() 52 + 53 + for { 54 + if ctx.Err() != nil { 55 + return nil 56 + } 57 + 58 + event, err := r.tap.ReadEvent(ctx) 59 + if err != nil { 60 + if ctx.Err() != nil { 61 + return nil 62 + } 63 + r.log.Warn("tap read error", slog.String("error", err.Error())) 64 + continue 65 + } 66 + 67 + if err := r.processWithRetry(ctx, event); err != nil { 68 + if ctx.Err() != nil { 69 + return nil 70 + } 71 + return err 72 + } 73 + } 74 + } 75 + 76 + func (r *Runner) processWithRetry(ctx context.Context, event normalize.TapRecordEvent) error { 77 + attempt := 0 78 + for { 79 + if ctx.Err() != nil { 80 + return ctx.Err() 81 + } 82 + 83 + err := r.processEvent(ctx, event) 84 + if err == nil { 85 + return nil 86 + } 87 + 88 + attempt++ 89 + backoff := retryBackoff(attempt) 90 + r.log.Warn("ingest retry", 91 + slog.Int64("event_id", event.ID), 92 + slog.Int("attempt", attempt), 93 + slog.Duration("retry_in", backoff), 94 + slog.String("error", err.Error()), 95 + ) 96 + 97 + select { 98 + case <-ctx.Done(): 99 + return ctx.Err() 100 + case <-time.After(backoff): 101 + } 102 + } 103 + } 104 + 105 + func (r *Runner) processEvent(ctx context.Context, event normalize.TapRecordEvent) error { 106 + switch event.Type { 107 + case "identity": 108 + if event.Identity == nil { 109 + return r.advanceCursorAndAck(ctx, event.ID) 110 + } 111 + id := event.Identity 112 + if err := r.store.UpsertIdentityHandle(ctx, id.DID, id.Handle, id.IsActive, id.Status); err != nil { 113 + return err 114 + } 115 + return r.advanceCursorAndAck(ctx, event.ID) 116 + case "record": 117 + return r.processRecordEvent(ctx, event) 118 + default: 119 + return r.advanceCursorAndAck(ctx, event.ID) 120 + } 121 + } 122 + 123 + func (r *Runner) processRecordEvent(ctx context.Context, event normalize.TapRecordEvent) error { 124 + if event.Record == nil { 125 + return r.advanceCursorAndAck(ctx, event.ID) 126 + } 127 + 128 + record := event.Record 129 + if !r.allowlist.match(record.Collection) { 130 + return r.advanceCursorAndAck(ctx, event.ID) 131 + } 132 + 133 + if handler, ok := r.registry.StateHandler(record.Collection); ok { 134 + if record.Action == "delete" { 135 + return r.advanceCursorAndAck(ctx, event.ID) 136 + } 137 + update, err := handler.HandleState(event) 138 + if err != nil { 139 + r.log.Warn("state normalization failed", 140 + slog.Int64("event_id", event.ID), 141 + slog.String("collection", record.Collection), 142 + slog.String("did", record.DID), 143 + slog.String("rkey", record.RKey), 144 + slog.String("error", err.Error()), 145 + ) 146 + return r.advanceCursorAndAck(ctx, event.ID) 147 + } 148 + if err := r.store.UpdateRecordState(ctx, update.SubjectURI, update.State); err != nil { 149 + return err 150 + } 151 + return r.advanceCursorAndAck(ctx, event.ID) 152 + } 153 + 154 + adapter, ok := r.registry.Adapter(record.Collection) 155 + if !ok { 156 + return r.advanceCursorAndAck(ctx, event.ID) 157 + } 158 + 159 + switch record.Action { 160 + case "delete": 161 + docID := normalize.StableID(record.DID, record.Collection, record.RKey) 162 + if err := r.store.MarkDeleted(ctx, docID); err != nil { 163 + return err 164 + } 165 + return r.advanceCursorAndAck(ctx, event.ID) 166 + case "create", "update": 167 + if record.Record == nil { 168 + r.log.Warn("record payload missing", 169 + slog.Int64("event_id", event.ID), 170 + slog.String("collection", record.Collection), 171 + slog.String("did", record.DID), 172 + slog.String("rkey", record.RKey), 173 + ) 174 + return r.advanceCursorAndAck(ctx, event.ID) 175 + } 176 + default: 177 + return r.advanceCursorAndAck(ctx, event.ID) 178 + } 179 + 180 + doc, err := adapter.Normalize(event) 181 + if err != nil { 182 + r.log.Warn("normalization failed", 183 + slog.Int64("event_id", event.ID), 184 + slog.String("collection", record.Collection), 185 + slog.String("did", record.DID), 186 + slog.String("rkey", record.RKey), 187 + slog.String("error", err.Error()), 188 + ) 189 + return r.advanceCursorAndAck(ctx, event.ID) 190 + } 191 + 192 + handle, err := r.store.GetIdentityHandle(ctx, record.DID) 193 + if err != nil { 194 + return err 195 + } 196 + if handle != "" { 197 + doc.AuthorHandle = handle 198 + if doc.RecordType == "profile" { 199 + doc.Title = handle 200 + } 201 + } 202 + 203 + if err := r.store.UpsertDocument(ctx, doc); err != nil { 204 + return err 205 + } 206 + 207 + if adapter.Searchable(record.Record) { 208 + if err := r.store.EnqueueEmbeddingJob(ctx, doc.ID); err != nil { 209 + r.log.Warn("embedding enqueue failed", 210 + slog.Int64("event_id", event.ID), 211 + slog.String("document_id", doc.ID), 212 + slog.String("error", err.Error()), 213 + ) 214 + } 215 + } 216 + 217 + return r.advanceCursorAndAck(ctx, event.ID) 218 + } 219 + 220 + func (r *Runner) advanceCursorAndAck(ctx context.Context, eventID int64) error { 221 + cursor := fmt.Sprintf("%d", eventID) 222 + if err := r.store.SetSyncState(ctx, r.consumerName, cursor); err != nil { 223 + return err 224 + } 225 + if err := r.tap.AckEvent(ctx, eventID); err != nil { 226 + return err 227 + } 228 + return nil 229 + } 230 + 231 + type allowlist struct { 232 + entries []string 233 + } 234 + 235 + func parseAllowlist(raw string) allowlist { 236 + if strings.TrimSpace(raw) == "" { 237 + return allowlist{entries: nil} 238 + } 239 + parts := strings.FieldsFunc(raw, func(r rune) bool { 240 + return r == ',' || r == ' ' || r == '\n' || r == '\t' 241 + }) 242 + entries := make([]string, 0, len(parts)) 243 + for _, part := range parts { 244 + entry := strings.TrimSpace(part) 245 + if entry == "" { 246 + continue 247 + } 248 + entries = append(entries, entry) 249 + } 250 + return allowlist{entries: entries} 251 + } 252 + 253 + func (a allowlist) match(collection string) bool { 254 + if len(a.entries) == 0 { 255 + return true 256 + } 257 + for _, entry := range a.entries { 258 + if entry == collection { 259 + return true 260 + } 261 + if strings.HasSuffix(entry, "*") { 262 + prefix := strings.TrimSuffix(entry, "*") 263 + if strings.HasPrefix(collection, prefix) { 264 + return true 265 + } 266 + } 267 + } 268 + return false 269 + } 270 + 271 + func retryBackoff(attempt int) time.Duration { 272 + if attempt < 1 { 273 + attempt = 1 274 + } 275 + exponent := math.Pow(2, float64(attempt-1)) 276 + d := time.Duration(float64(time.Second) * exponent) 277 + if d > maxDBRetryBackoff { 278 + return maxDBRetryBackoff 279 + } 280 + return d 281 + }
+254
packages/api/internal/ingest/ingest_test.go
··· 1 + package ingest 2 + 3 + import ( 4 + "context" 5 + "io" 6 + "log/slog" 7 + "testing" 8 + 9 + "tangled.org/desertthunder.dev/twister/internal/normalize" 10 + "tangled.org/desertthunder.dev/twister/internal/store" 11 + ) 12 + 13 + type fakeTapClient struct { 14 + acked []int64 15 + } 16 + 17 + func (f *fakeTapClient) ReadEvent(_ context.Context) (normalize.TapRecordEvent, error) { 18 + return normalize.TapRecordEvent{}, io.EOF 19 + } 20 + 21 + func (f *fakeTapClient) AckEvent(_ context.Context, id int64) error { 22 + f.acked = append(f.acked, id) 23 + return nil 24 + } 25 + 26 + func (f *fakeTapClient) Close() error { return nil } 27 + 28 + type fakeStore struct { 29 + docs map[string]*store.Document 30 + deleted map[string]bool 31 + syncCursor string 32 + recordStates map[string]string 33 + handles map[string]string 34 + enqueued map[string]bool 35 + } 36 + 37 + func newFakeStore() *fakeStore { 38 + return &fakeStore{ 39 + docs: make(map[string]*store.Document), 40 + deleted: make(map[string]bool), 41 + recordStates: make(map[string]string), 42 + handles: make(map[string]string), 43 + enqueued: make(map[string]bool), 44 + } 45 + } 46 + 47 + func (f *fakeStore) UpsertDocument(_ context.Context, doc *store.Document) error { 48 + clone := *doc 49 + f.docs[doc.ID] = &clone 50 + return nil 51 + } 52 + 53 + func (f *fakeStore) GetDocument(_ context.Context, id string) (*store.Document, error) { 54 + return f.docs[id], nil 55 + } 56 + 57 + func (f *fakeStore) MarkDeleted(_ context.Context, id string) error { 58 + f.deleted[id] = true 59 + return nil 60 + } 61 + 62 + func (f *fakeStore) GetSyncState(_ context.Context, _ string) (*store.SyncState, error) { 63 + return nil, nil 64 + } 65 + 66 + func (f *fakeStore) SetSyncState(_ context.Context, _ string, cursor string) error { 67 + f.syncCursor = cursor 68 + return nil 69 + } 70 + 71 + func (f *fakeStore) UpdateRecordState(_ context.Context, subjectURI string, state string) error { 72 + f.recordStates[subjectURI] = state 73 + return nil 74 + } 75 + 76 + func (f *fakeStore) UpsertIdentityHandle(_ context.Context, did, handle string, _ bool, _ string) error { 77 + f.handles[did] = handle 78 + return nil 79 + } 80 + 81 + func (f *fakeStore) GetIdentityHandle(_ context.Context, did string) (string, error) { 82 + return f.handles[did], nil 83 + } 84 + 85 + func (f *fakeStore) EnqueueEmbeddingJob(_ context.Context, documentID string) error { 86 + f.enqueued[documentID] = true 87 + return nil 88 + } 89 + 90 + func newRunnerForTest(st *fakeStore, tap *fakeTapClient, indexedCollections string) *Runner { 91 + logger := slog.New(slog.NewTextHandler(io.Discard, nil)) 92 + return NewRunner(st, normalize.NewRegistry(), tap, indexedCollections, logger) 93 + } 94 + 95 + func TestRunner_ProcessIdentityEvent(t *testing.T) { 96 + st := newFakeStore() 97 + tap := &fakeTapClient{} 98 + r := newRunnerForTest(st, tap, "sh.tangled.*") 99 + 100 + event := normalize.TapRecordEvent{ 101 + ID: 101, 102 + Type: "identity", 103 + Identity: &normalize.TapIdentity{ 104 + DID: "did:plc:abc", 105 + Handle: "alice.tangled.org", 106 + IsActive: true, 107 + Status: "active", 108 + }, 109 + } 110 + 111 + if err := r.processEvent(context.Background(), event); err != nil { 112 + t.Fatalf("process identity event: %v", err) 113 + } 114 + if got := st.handles["did:plc:abc"]; got != "alice.tangled.org" { 115 + t.Fatalf("handle: got %q", got) 116 + } 117 + if st.syncCursor != "101" { 118 + t.Fatalf("cursor: got %q, want 101", st.syncCursor) 119 + } 120 + if len(tap.acked) != 1 || tap.acked[0] != 101 { 121 + t.Fatalf("acks: got %#v", tap.acked) 122 + } 123 + } 124 + 125 + func TestRunner_ProcessCreateAndDelete(t *testing.T) { 126 + st := newFakeStore() 127 + st.handles["did:plc:author"] = "author.tangled.org" 128 + tap := &fakeTapClient{} 129 + r := newRunnerForTest(st, tap, "sh.tangled.*") 130 + 131 + createEvent := normalize.TapRecordEvent{ 132 + ID: 201, 133 + Type: "record", 134 + Record: &normalize.TapRecord{ 135 + DID: "did:plc:author", 136 + Collection: "sh.tangled.repo", 137 + RKey: "repo1", 138 + Action: "create", 139 + CID: "cid-1", 140 + Record: map[string]any{ 141 + "name": "repo-one", 142 + "description": "test repo", 143 + }, 144 + }, 145 + } 146 + if err := r.processEvent(context.Background(), createEvent); err != nil { 147 + t.Fatalf("process create event: %v", err) 148 + } 149 + 150 + docID := normalize.StableID("did:plc:author", "sh.tangled.repo", "repo1") 151 + doc := st.docs[docID] 152 + if doc == nil { 153 + t.Fatalf("document %q not found", docID) 154 + } 155 + if doc.AuthorHandle != "author.tangled.org" { 156 + t.Fatalf("author handle: got %q", doc.AuthorHandle) 157 + } 158 + if !st.enqueued[docID] { 159 + t.Fatalf("embedding job not enqueued for %q", docID) 160 + } 161 + 162 + deleteEvent := normalize.TapRecordEvent{ 163 + ID: 202, 164 + Type: "record", 165 + Record: &normalize.TapRecord{ 166 + DID: "did:plc:author", 167 + Collection: "sh.tangled.repo", 168 + RKey: "repo1", 169 + Action: "delete", 170 + }, 171 + } 172 + if err := r.processEvent(context.Background(), deleteEvent); err != nil { 173 + t.Fatalf("process delete event: %v", err) 174 + } 175 + if !st.deleted[docID] { 176 + t.Fatalf("expected tombstone for %q", docID) 177 + } 178 + if st.syncCursor != "202" { 179 + t.Fatalf("cursor: got %q, want 202", st.syncCursor) 180 + } 181 + } 182 + 183 + func TestRunner_ProcessStateEvent(t *testing.T) { 184 + st := newFakeStore() 185 + tap := &fakeTapClient{} 186 + r := newRunnerForTest(st, tap, "sh.tangled.*") 187 + 188 + event := normalize.TapRecordEvent{ 189 + ID: 301, 190 + Type: "record", 191 + Record: &normalize.TapRecord{ 192 + DID: "did:plc:abc", 193 + Collection: "sh.tangled.repo.issue.state", 194 + RKey: "state1", 195 + Action: "create", 196 + Record: map[string]any{ 197 + "subject": "at://did:plc:abc/sh.tangled.repo.issue/1", 198 + "status": "closed", 199 + }, 200 + }, 201 + } 202 + 203 + if err := r.processEvent(context.Background(), event); err != nil { 204 + t.Fatalf("process state event: %v", err) 205 + } 206 + if got := st.recordStates["at://did:plc:abc/sh.tangled.repo.issue/1"]; got != "closed" { 207 + t.Fatalf("record state: got %q", got) 208 + } 209 + } 210 + 211 + func TestRunner_NormalizationFailureAdvancesCursor(t *testing.T) { 212 + st := newFakeStore() 213 + tap := &fakeTapClient{} 214 + r := newRunnerForTest(st, tap, "sh.tangled.*") 215 + 216 + event := normalize.TapRecordEvent{ 217 + ID: 401, 218 + Type: "record", 219 + Record: &normalize.TapRecord{ 220 + DID: "did:plc:abc", 221 + Collection: "sh.tangled.repo.issue", 222 + RKey: "bad-issue", 223 + Action: "create", 224 + CID: "cid-bad", 225 + Record: map[string]any{ 226 + "title": "bad issue", 227 + "repo": "not-an-at-uri", 228 + }, 229 + }, 230 + } 231 + 232 + if err := r.processEvent(context.Background(), event); err != nil { 233 + t.Fatalf("process malformed issue event: %v", err) 234 + } 235 + if st.syncCursor != "401" { 236 + t.Fatalf("cursor: got %q, want 401", st.syncCursor) 237 + } 238 + if len(st.docs) != 0 { 239 + t.Fatalf("expected no documents, got %d", len(st.docs)) 240 + } 241 + } 242 + 243 + func TestAllowlistMatching(t *testing.T) { 244 + a := parseAllowlist("sh.tangled.repo, sh.tangled.string sh.tangled.actor.*") 245 + if !a.match("sh.tangled.repo") { 246 + t.Fatal("expected exact match") 247 + } 248 + if !a.match("sh.tangled.actor.profile") { 249 + t.Fatal("expected wildcard prefix match") 250 + } 251 + if a.match("app.bsky.feed.post") { 252 + t.Fatal("unexpected match") 253 + } 254 + }
+9
packages/api/internal/store/migrations/002_identity_handles.sql
··· 1 + CREATE TABLE IF NOT EXISTS identity_handles ( 2 + did TEXT PRIMARY KEY, 3 + handle TEXT NOT NULL, 4 + is_active INTEGER NOT NULL DEFAULT 1, 5 + status TEXT, 6 + updated_at TEXT NOT NULL 7 + ); 8 + 9 + CREATE INDEX IF NOT EXISTS idx_identity_handles_handle ON identity_handles(handle);
+48
packages/api/internal/store/sql_store.go
··· 130 130 return nil 131 131 } 132 132 133 + func (s *SQLStore) UpsertIdentityHandle(ctx context.Context, did, handle string, isActive bool, status string) error { 134 + now := time.Now().UTC().Format(time.RFC3339) 135 + _, err := s.db.ExecContext(ctx, ` 136 + INSERT INTO identity_handles (did, handle, is_active, status, updated_at) 137 + VALUES (?, ?, ?, ?, ?) 138 + ON CONFLICT(did) DO UPDATE SET 139 + handle = excluded.handle, 140 + is_active = excluded.is_active, 141 + status = excluded.status, 142 + updated_at = excluded.updated_at`, 143 + did, handle, isActive, status, now, 144 + ) 145 + if err != nil { 146 + return fmt.Errorf("upsert identity handle: %w", err) 147 + } 148 + return nil 149 + } 150 + 151 + func (s *SQLStore) GetIdentityHandle(ctx context.Context, did string) (string, error) { 152 + var handle sql.NullString 153 + err := s.db.QueryRowContext(ctx, `SELECT handle FROM identity_handles WHERE did = ?`, did).Scan(&handle) 154 + if errors.Is(err, sql.ErrNoRows) { 155 + return "", nil 156 + } 157 + if err != nil { 158 + return "", fmt.Errorf("get identity handle: %w", err) 159 + } 160 + return handle.String, nil 161 + } 162 + 163 + func (s *SQLStore) EnqueueEmbeddingJob(ctx context.Context, documentID string) error { 164 + now := time.Now().UTC().Format(time.RFC3339) 165 + _, err := s.db.ExecContext(ctx, ` 166 + INSERT INTO embedding_jobs (document_id, status, attempts, last_error, scheduled_at, updated_at) 167 + VALUES (?, 'pending', 0, NULL, ?, ?) 168 + ON CONFLICT(document_id) DO UPDATE SET 169 + status = 'pending', 170 + last_error = NULL, 171 + scheduled_at = excluded.scheduled_at, 172 + updated_at = excluded.updated_at`, 173 + documentID, now, now, 174 + ) 175 + if err != nil { 176 + return fmt.Errorf("enqueue embedding job: %w", err) 177 + } 178 + return nil 179 + } 180 + 133 181 func scanDocument(row *sql.Row) (*Document, error) { 134 182 doc := &Document{} 135 183 var (
+3
packages/api/internal/store/store.go
··· 48 48 GetSyncState(ctx context.Context, consumer string) (*SyncState, error) 49 49 SetSyncState(ctx context.Context, consumer string, cursor string) error 50 50 UpdateRecordState(ctx context.Context, subjectURI string, state string) error 51 + UpsertIdentityHandle(ctx context.Context, did, handle string, isActive bool, status string) error 52 + GetIdentityHandle(ctx context.Context, did string) (string, error) 53 + EnqueueEmbeddingJob(ctx context.Context, documentID string) error 51 54 }
+80
packages/api/internal/store/store_test.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "database/sql" 5 6 "os" 6 7 "path/filepath" 7 8 "testing" ··· 154 155 } 155 156 if err := st.UpdateRecordState(ctx, uri, "closed"); err != nil { 156 157 t.Fatalf("update record state to closed: %v", err) 158 + } 159 + }) 160 + 161 + t.Run("identity handle upsert and get", func(t *testing.T) { 162 + const did = "did:plc:identity1" 163 + 164 + handle, err := st.GetIdentityHandle(ctx, did) 165 + if err != nil { 166 + t.Fatalf("get missing identity handle: %v", err) 167 + } 168 + if handle != "" { 169 + t.Fatalf("expected empty missing handle, got %q", handle) 170 + } 171 + 172 + if err := st.UpsertIdentityHandle(ctx, did, "alice.tangled.org", true, "active"); err != nil { 173 + t.Fatalf("upsert identity handle: %v", err) 174 + } 175 + 176 + handle, err = st.GetIdentityHandle(ctx, did) 177 + if err != nil { 178 + t.Fatalf("get identity handle: %v", err) 179 + } 180 + if handle != "alice.tangled.org" { 181 + t.Fatalf("handle: got %q, want %q", handle, "alice.tangled.org") 182 + } 183 + 184 + if err := st.UpsertIdentityHandle(ctx, did, "alice2.tangled.org", false, "inactive"); err != nil { 185 + t.Fatalf("update identity handle: %v", err) 186 + } 187 + 188 + handle, err = st.GetIdentityHandle(ctx, did) 189 + if err != nil { 190 + t.Fatalf("get identity handle after update: %v", err) 191 + } 192 + if handle != "alice2.tangled.org" { 193 + t.Fatalf("handle after update: got %q, want %q", handle, "alice2.tangled.org") 194 + } 195 + }) 196 + 197 + t.Run("enqueue embedding job is idempotent", func(t *testing.T) { 198 + doc := &store.Document{ 199 + ID: "did:plc:embed|sh.tangled.string|abc", 200 + DID: "did:plc:embed", 201 + Collection: "sh.tangled.string", 202 + RKey: "abc", 203 + ATURI: "at://did:plc:embed/sh.tangled.string/abc", 204 + CID: "bafyreienqueue", 205 + RecordType: "string", 206 + Title: "foo.go", 207 + Body: "package main", 208 + } 209 + if err := st.UpsertDocument(ctx, doc); err != nil { 210 + t.Fatalf("upsert doc for embedding queue: %v", err) 211 + } 212 + 213 + if err := st.EnqueueEmbeddingJob(ctx, doc.ID); err != nil { 214 + t.Fatalf("enqueue embedding job: %v", err) 215 + } 216 + if err := st.EnqueueEmbeddingJob(ctx, doc.ID); err != nil { 217 + t.Fatalf("enqueue embedding job second call: %v", err) 218 + } 219 + 220 + row := db.QueryRowContext(ctx, `SELECT status, attempts, last_error FROM embedding_jobs WHERE document_id = ?`, doc.ID) 221 + var ( 222 + status string 223 + attempts int 224 + lastError sql.NullString 225 + ) 226 + if err := row.Scan(&status, &attempts, &lastError); err != nil { 227 + t.Fatalf("query embedding job: %v", err) 228 + } 229 + if status != "pending" { 230 + t.Fatalf("status: got %q, want pending", status) 231 + } 232 + if attempts != 0 { 233 + t.Fatalf("attempts: got %d, want 0", attempts) 234 + } 235 + if lastError.Valid { 236 + t.Fatalf("last_error: got %q, want NULL", lastError.String) 157 237 } 158 238 }) 159 239 }
+180
packages/api/internal/tapclient/tapclient.go
··· 1 1 package tapclient 2 + 3 + import ( 4 + "context" 5 + "encoding/base64" 6 + "encoding/json" 7 + "fmt" 8 + "log/slog" 9 + "math/rand/v2" 10 + "net/http" 11 + "strconv" 12 + "sync" 13 + "time" 14 + 15 + "github.com/coder/websocket" 16 + "tangled.org/desertthunder.dev/twister/internal/normalize" 17 + ) 18 + 19 + const ( 20 + minReconnectBackoff = 500 * time.Millisecond 21 + maxReconnectBackoff = 10 * time.Second 22 + ) 23 + 24 + // Client receives Tap events over WebSocket and sends acks after processing. 25 + type Client struct { 26 + url string 27 + password string 28 + log *slog.Logger 29 + 30 + mu sync.Mutex 31 + conn *websocket.Conn 32 + ackAsJSON bool 33 + } 34 + 35 + func New(url, password string, log *slog.Logger) *Client { 36 + if log == nil { 37 + log = slog.Default() 38 + } 39 + return &Client{ 40 + url: url, 41 + password: password, 42 + log: log, 43 + ackAsJSON: true, 44 + } 45 + } 46 + 47 + func (c *Client) ReadEvent(ctx context.Context) (normalize.TapRecordEvent, error) { 48 + for { 49 + conn, err := c.ensureConnected(ctx) 50 + if err != nil { 51 + return normalize.TapRecordEvent{}, err 52 + } 53 + 54 + _, data, err := conn.Read(ctx) 55 + if err != nil { 56 + c.log.Warn("tap read failed", slog.String("error", err.Error())) 57 + c.resetConn(websocket.StatusInternalError, "read failed") 58 + if ctx.Err() != nil { 59 + return normalize.TapRecordEvent{}, ctx.Err() 60 + } 61 + continue 62 + } 63 + 64 + var event normalize.TapRecordEvent 65 + if err := json.Unmarshal(data, &event); err != nil { 66 + c.log.Warn("tap decode failed", slog.String("error", err.Error())) 67 + continue 68 + } 69 + 70 + return event, nil 71 + } 72 + } 73 + 74 + func (c *Client) AckEvent(ctx context.Context, id int64) error { 75 + conn, err := c.ensureConnected(ctx) 76 + if err != nil { 77 + return err 78 + } 79 + 80 + c.mu.Lock() 81 + ackAsJSON := c.ackAsJSON 82 + c.mu.Unlock() 83 + 84 + if ackAsJSON { 85 + payload, _ := json.Marshal(map[string]int64{"id": id}) 86 + if err := conn.Write(ctx, websocket.MessageText, payload); err == nil { 87 + return nil 88 + } 89 + 90 + c.log.Warn("tap ack json failed; trying plain id", slog.Int64("event_id", id)) 91 + plain := []byte(strconv.FormatInt(id, 10)) 92 + if err := conn.Write(ctx, websocket.MessageText, plain); err != nil { 93 + c.resetConn(websocket.StatusInternalError, "ack failed") 94 + return fmt.Errorf("ack event %d: %w", id, err) 95 + } 96 + 97 + c.mu.Lock() 98 + c.ackAsJSON = false 99 + c.mu.Unlock() 100 + return nil 101 + } 102 + 103 + if err := conn.Write(ctx, websocket.MessageText, []byte(strconv.FormatInt(id, 10))); err != nil { 104 + c.resetConn(websocket.StatusInternalError, "ack failed") 105 + return fmt.Errorf("ack event %d: %w", id, err) 106 + } 107 + return nil 108 + } 109 + 110 + func (c *Client) Close() error { 111 + c.mu.Lock() 112 + defer c.mu.Unlock() 113 + if c.conn == nil { 114 + return nil 115 + } 116 + err := c.conn.Close(websocket.StatusNormalClosure, "shutdown") 117 + c.conn = nil 118 + return err 119 + } 120 + 121 + func (c *Client) ensureConnected(ctx context.Context) (*websocket.Conn, error) { 122 + c.mu.Lock() 123 + if c.conn != nil { 124 + conn := c.conn 125 + c.mu.Unlock() 126 + return conn, nil 127 + } 128 + c.mu.Unlock() 129 + 130 + backoff := minReconnectBackoff 131 + for { 132 + if ctx.Err() != nil { 133 + return nil, ctx.Err() 134 + } 135 + 136 + h := http.Header{} 137 + if c.password != "" { 138 + token := base64.StdEncoding.EncodeToString([]byte("admin:" + c.password)) 139 + h.Set("Authorization", "Basic "+token) 140 + } 141 + 142 + conn, _, err := websocket.Dial(ctx, c.url, &websocket.DialOptions{HTTPHeader: h}) 143 + if err == nil { 144 + c.mu.Lock() 145 + if c.conn == nil { 146 + c.conn = conn 147 + } else { 148 + _ = conn.Close(websocket.StatusNormalClosure, "duplicate") 149 + } 150 + existing := c.conn 151 + c.mu.Unlock() 152 + c.log.Info("tap connected") 153 + return existing, nil 154 + } 155 + 156 + c.log.Warn("tap connect failed", slog.String("error", err.Error()), slog.Duration("retry_in", backoff)) 157 + 158 + jitter := time.Duration(rand.Int64N(int64(backoff / 2))) 159 + wait := backoff + jitter 160 + select { 161 + case <-ctx.Done(): 162 + return nil, ctx.Err() 163 + case <-time.After(wait): 164 + } 165 + 166 + backoff *= 2 167 + if backoff > maxReconnectBackoff { 168 + backoff = maxReconnectBackoff 169 + } 170 + } 171 + } 172 + 173 + func (c *Client) resetConn(status websocket.StatusCode, reason string) { 174 + c.mu.Lock() 175 + defer c.mu.Unlock() 176 + if c.conn == nil { 177 + return 178 + } 179 + _ = c.conn.Close(status, reason) 180 + c.conn = nil 181 + }
+29 -1
packages/api/main.go
··· 10 10 11 11 "github.com/spf13/cobra" 12 12 "tangled.org/desertthunder.dev/twister/internal/config" 13 + "tangled.org/desertthunder.dev/twister/internal/ingest" 14 + "tangled.org/desertthunder.dev/twister/internal/normalize" 13 15 "tangled.org/desertthunder.dev/twister/internal/observability" 16 + "tangled.org/desertthunder.dev/twister/internal/store" 17 + "tangled.org/desertthunder.dev/twister/internal/tapclient" 14 18 ) 15 19 16 20 var ( ··· 81 85 } 82 86 log := observability.NewLogger(cfg) 83 87 log.Info("starting indexer", slog.String("service", "indexer"), slog.String("version", version)) 88 + 89 + if cfg.TapURL == "" { 90 + return fmt.Errorf("TAP_URL is required for indexer") 91 + } 92 + 93 + db, err := store.Open(cfg.TursoURL, cfg.TursoToken) 94 + if err != nil { 95 + return fmt.Errorf("open database: %w", err) 96 + } 97 + defer db.Close() 98 + 99 + if err := store.Migrate(db); err != nil { 100 + return fmt.Errorf("migrate database: %w", err) 101 + } 102 + 103 + st := store.New(db) 104 + registry := normalize.NewRegistry() 105 + tap := tapclient.New(cfg.TapURL, cfg.TapAuthPassword, log) 106 + runner := ingest.NewRunner(st, registry, tap, cfg.IndexedCollections, log) 107 + 84 108 ctx, cancel := baseContext() 85 109 defer cancel() 86 - <-ctx.Done() 110 + 111 + if err := runner.Run(ctx); err != nil { 112 + return fmt.Errorf("run indexer: %w", err) 113 + } 114 + 87 115 log.Info("shutting down indexer") 88 116 return nil 89 117 },