a love letter to tangled (android, iOS, and a search API)
19
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: tangled links & pds + handle <-> DID resolution

+225 -38
+1 -2
apps/twisted/src/features/activity/ActivityPage.vue
··· 193 193 194 194 onIonViewWillLeave(() => { 195 195 client.disconnect(); 196 - status.value = "connecting"; // Reset so next enter shows "connecting" 196 + status.value = "connecting"; 197 197 }); 198 198 199 199 onUnmounted(() => { ··· 217 217 client.disconnect(); 218 218 status.value = "connecting"; 219 219 client.connect(); 220 - // Complete the refresher after a short delay 221 220 await new Promise<void>((resolve) => setTimeout(resolve, 1000)); 222 221 (event.target as HTMLIonRefresherElement).complete(); 223 222 }
+1 -1
apps/twisted/src/mocks/repos.ts
··· 120 120 121 121 const README_CONTENT = `# twisted 122 122 123 - A mobile companion reader for [Tangled](https://tangled.sh), built with Ionic Vue and Capacitor. 123 + A mobile companion reader for [Tangled](https://tangled.org), built with Ionic Vue and Capacitor. 124 124 125 125 ## Features 126 126
+3 -1
apps/twisted/src/services/tangled/repo-assets.ts
··· 79 79 const objectUrl = createObjectUrlFromBlobContent(blob); 80 80 if (objectUrl) return { url: objectUrl, revoke: true }; 81 81 } catch { 82 - // Fall back to the public raw URL if the XRPC lookup fails. 82 + console.warn( 83 + `Failed to fetch blob for ${context.owner}/${context.repo} at ${repoPath}, falling back to public URL.`, 84 + ); 83 85 } 84 86 85 87 return { url: buildPublicRawUrl(context, repoPath), revoke: false };
+4 -4
docs/roadmap.md
··· 20 20 - Follow AT URI (desertthunder.dev follows npmx): `at://did:plc:xg2vq45muivyy3xwatcehspu/sh.tangled.graph.follow/3mhofstanru22` 21 21 - Star AT URI (desertthunder.dev stars microcosm-rs): `at://did:plc:lulmyldiq4sb2ikags5sfb25/sh.tangled.repo/3lvsxzinfz222` 22 22 - ~~Add `just` targets for smoke-test runs locally and against a remote base URL~~ directly invoking the scripts is fine. 23 - - [ ] Reuse the existing normalization and upsert path for on-demand indexing jobs 24 - - [ ] Trigger indexing jobs from repo, issue, PR, profile, and similar fetch handlers 25 - - [ ] Add dedupe, retries, and observability for indexing jobs 23 + - [x] Reuse the existing normalization and upsert path for on-demand indexing jobs 24 + - [x] Trigger indexing jobs from repo, issue, PR, profile, and similar fetch handlers 25 + - [x] Add dedupe, retries, and observability for indexing jobs 26 26 - [ ] Add a JetStream cache consumer with a persisted timestamp cursor 27 27 - [ ] Seed the JetStream cursor to `now - 24h` on first boot and rewind slightly on reconnect 28 28 - [ ] Store and serve bounded recent activity from the local cache 29 29 - [ ] Keep Tap as the authoritative indexing and bulk backfill path 30 - - [ ] Define a controlled backfill and repo-resync playbook for recovery 30 + - [ ] Define a controlled backfill and repo-resync playbook for recovery (`docs/references/resync.md`) 31 31 32 32 ## API: Constellation Integration 33 33
+17 -1
packages/api/README.md
··· 142 142 twister api # Start the HTTP API server 143 143 twister indexer # Start the Tap firehose consumer 144 144 twister backfill # Seed the index from upstream APIs 145 - twister reindex # Re-process existing documents 145 + twister reindex # Re-process existing documents (re-syncs FTS) 146 + twister enrich # Backfill RepoName, AuthorHandle, WebURL on existing documents 146 147 ``` 148 + 149 + ### enrich 150 + 151 + Resolves missing `author_handle`, `repo_name`, and `web_url` fields on documents already 152 + in the database. Run this after deploying enrichment changes or when search results show 153 + documents with empty author handles. 154 + 155 + ```sh 156 + twister enrich --local # all documents 157 + twister enrich --local --collection sh.tangled.repo 158 + twister enrich --local --did did:plc:abc123 159 + twister enrich --local --dry-run # preview without writing 160 + ``` 161 + 162 + Flags: `--collection`, `--did`, `--document`, `--dry-run`, `--concurrency` (default 5). 147 163 148 164 ## Proxy endpoints 149 165
+9 -3
packages/api/internal/api/actors.go
··· 22 22 // issueEntry extends recordEntry with pre-joined issue state. 23 23 type issueEntry struct { 24 24 recordEntry 25 - State string `json:"state"` // "open" or "closed" 25 + // "open" or "closed" 26 + State string `json:"state"` 26 27 } 27 28 28 29 // pullEntry extends recordEntry with pre-joined pull status. 29 30 type pullEntry struct { 30 31 recordEntry 31 - Status string `json:"status"` // "open", "merged", or "closed" 32 + // "open", "merged", or "closed" 33 + Status string `json:"status"` 32 34 } 33 35 34 36 // actorContext holds resolved identity for a request. 35 37 type actorContext struct { 36 38 DID string `json:"did"` 37 39 Handle string `json:"handle"` 38 - PDS string `json:"pds"` // full URL, e.g. "https://bsky.social" 40 + // full URL, e.g. "https://bsky.social" 41 + PDS string `json:"pds"` 39 42 } 40 43 41 44 // repoContext extends actorContext with the repo's knot host and AT URI. ··· 64 67 identity, err := s.xrpc.ResolveIdentity(ctx, did) 65 68 if err != nil { 66 69 return nil, fmt.Errorf("resolve identity %q: %w", did, err) 70 + } 71 + if identity.PDS == "" { 72 + return nil, fmt.Errorf("no atproto pds in did document for %q", did) 67 73 } 68 74 69 75 return &actorContext{
+121 -1
packages/api/internal/api/readthrough.go
··· 5 5 "encoding/json" 6 6 "fmt" 7 7 "log/slog" 8 + "sync" 8 9 "time" 9 10 10 11 "tangled.org/desertthunder.dev/twister/internal/normalize" ··· 12 13 "tangled.org/desertthunder.dev/twister/internal/xrpc" 13 14 ) 14 15 15 - const readThroughIdlePoll = 1 * time.Second 16 + const ( 17 + readThroughIdlePoll = 1 * time.Second 18 + readThroughStatusInterval = 30 * time.Second 19 + maxIndexingAttempts = 10 20 + ) 16 21 17 22 func (s *Server) runReadThroughIndexer(ctx context.Context) { 18 23 ticker := time.NewTicker(readThroughIdlePoll) 19 24 defer ticker.Stop() 25 + 26 + var mu sync.Mutex 27 + var processedTick int64 28 + go s.runIndexerStatusLogger(ctx, &mu, &processedTick) 20 29 21 30 s.log.Info("read-through indexer worker started") 22 31 for { ··· 45 54 } 46 55 47 56 if err := s.processReadThroughJob(ctx, job); err != nil { 57 + if job.Attempts+1 >= maxIndexingAttempts { 58 + s.log.Error("read-through job exceeded max attempts; discarding", 59 + slog.String("document_id", job.DocumentID), 60 + slog.Int("attempts", job.Attempts+1), 61 + slog.String("last_error", err.Error()), 62 + ) 63 + _ = s.store.CompleteIndexingJob(ctx, job.DocumentID) 64 + continue 65 + } 48 66 nextDelay := retryDelay(job.Attempts + 1) 49 67 nextAt := time.Now().UTC().Add(nextDelay).Format(time.RFC3339) 50 68 retryErr := s.store.RetryIndexingJob(ctx, job.DocumentID, nextAt, truncateErr(err)) ··· 71 89 ) 72 90 continue 73 91 } 92 + 93 + s.log.Debug("read-through job completed", slog.String("document_id", job.DocumentID)) 94 + mu.Lock() 95 + processedTick++ 96 + mu.Unlock() 97 + } 98 + } 99 + 100 + func (s *Server) runIndexerStatusLogger(ctx context.Context, mu *sync.Mutex, processedTick *int64) { 101 + ticker := time.NewTicker(readThroughStatusInterval) 102 + defer ticker.Stop() 103 + for { 104 + select { 105 + case <-ctx.Done(): 106 + return 107 + case <-ticker.C: 108 + mu.Lock() 109 + n := *processedTick 110 + *processedTick = 0 111 + mu.Unlock() 112 + 113 + pending, err := s.store.CountPendingIndexingJobs(ctx) 114 + if err != nil { 115 + s.log.Warn("read-through status: count failed", slog.String("error", err.Error())) 116 + continue 117 + } 118 + s.log.Info("read-through indexer status", 119 + slog.Int64("jobs_processed", n), 120 + slog.Int64("jobs_pending", pending), 121 + ) 122 + } 74 123 } 75 124 } 76 125 ··· 124 173 } 125 174 } 126 175 176 + s.enrichDocument(ctx, doc, record) 177 + 127 178 if err := s.store.UpsertDocument(ctx, doc); err != nil { 128 179 return fmt.Errorf("upsert document: %w", err) 129 180 } ··· 137 188 } 138 189 } 139 190 return nil 191 + } 192 + 193 + // enrichDocument fills RepoName, AuthorHandle, and WebURL via XRPC when possible. 194 + // Failures are logged but never block indexing. 195 + func (s *Server) enrichDocument(ctx context.Context, doc *store.Document, record map[string]any) { 196 + if s.xrpc == nil { 197 + return 198 + } 199 + 200 + if doc.RepoDID != "" && doc.RepoName == "" { 201 + repoURI := repoURIFromRecord(record) 202 + if repoURI != "" { 203 + _, _, repoRKey, err := normalize.ParseATURI(repoURI) 204 + if err == nil && repoRKey != "" { 205 + name, err := s.xrpc.ResolveRepoName(ctx, doc.RepoDID, repoRKey) 206 + if err == nil { 207 + doc.RepoName = name 208 + } else { 209 + s.log.Debug("read-through enrich: resolve repo name failed", 210 + slog.String("doc_id", doc.ID), 211 + slog.String("repo_did", doc.RepoDID), 212 + slog.String("error", err.Error()), 213 + ) 214 + } 215 + } 216 + } 217 + } 218 + 219 + if doc.AuthorHandle == "" && doc.DID != "" { 220 + info, err := s.xrpc.ResolveIdentity(ctx, doc.DID) 221 + if err == nil && info.Handle != "" { 222 + doc.AuthorHandle = info.Handle 223 + if doc.RecordType == "profile" { 224 + doc.Title = info.Handle 225 + } 226 + } else if err != nil { 227 + s.log.Debug("read-through enrich: resolve author handle failed", 228 + slog.String("doc_id", doc.ID), 229 + slog.String("did", doc.DID), 230 + slog.String("error", err.Error()), 231 + ) 232 + } 233 + } 234 + 235 + if doc.WebURL == "" { 236 + ownerHandle := doc.AuthorHandle 237 + if doc.RepoDID != "" && doc.RepoDID != doc.DID { 238 + if h, err := s.store.GetIdentityHandle(ctx, doc.RepoDID); err == nil && h != "" { 239 + ownerHandle = h 240 + } else if info, err := s.xrpc.ResolveIdentity(ctx, doc.RepoDID); err == nil && info.Handle != "" { 241 + ownerHandle = info.Handle 242 + } 243 + } 244 + doc.WebURL = xrpc.BuildWebURL(ownerHandle, doc.RepoName, doc.RecordType, doc.RKey) 245 + } 246 + } 247 + 248 + // repoURIFromRecord extracts the repo AT-URI from common record fields. 249 + // Issues store it in rec["repo"]; pulls store it in rec["target"]["repo"]. 250 + func repoURIFromRecord(record map[string]any) string { 251 + if uri, _ := record["repo"].(string); uri != "" { 252 + return uri 253 + } 254 + if target, _ := record["target"].(map[string]any); target != nil { 255 + if uri, _ := target["repo"].(string); uri != "" { 256 + return uri 257 + } 258 + } 259 + return "" 140 260 } 141 261 142 262 func (s *Server) enqueueXRPCRecord(ctx context.Context, uri, cid string, value map[string]any) {
+1 -1
packages/api/internal/backfill/backfill.go
··· 189 189 func (r *Runner) resolveSeeds(ctx context.Context, entries []seedEntry) ([]string, map[string]string, error) { 190 190 seen := map[string]bool{} 191 191 seeds := make([]string, 0, len(entries)) 192 - handles := make(map[string]string) // did → handle 192 + handles := make(map[string]string) 193 193 for _, entry := range entries { 194 194 if entry.isDID { 195 195 seen[entry.raw] = true
+1 -1
packages/api/internal/config/config.go
··· 82 82 IdentityServiceURL: envOrDefault("IDENTITY_SERVICE_URL", "https://public.api.bsky.app"), 83 83 XRPCTimeout: envDuration("XRPC_TIMEOUT", 15*time.Second), 84 84 ConstellationURL: envOrDefault("CONSTELLATION_URL", "https://constellation.microcosm.blue"), 85 - ConstellationUserAgent: envOrDefault("CONSTELLATION_USER_AGENT", "twister/1.0 (https://tangled.sh; Owais <desertthunder.dev@gmail.com>)"), 85 + ConstellationUserAgent: envOrDefault("CONSTELLATION_USER_AGENT", "twister/1.0 (https://tangled.org/desertthunder.dev/twisted; Owais <desertthunder.dev@gmail.com>)"), 86 86 ConstellationTimeout: envDuration("CONSTELLATION_TIMEOUT", 10*time.Second), 87 87 ConstellationCacheTTL: envDuration("CONSTELLATION_CACHE_TTL", 5*time.Minute), 88 88 OAuthClientID: os.Getenv("OAUTH_CLIENT_ID"),
+4
packages/api/internal/ingest/ingest_test.go
··· 143 143 return int64(len(f.docs)), nil 144 144 } 145 145 146 + func (f *fakeStore) CountPendingIndexingJobs(_ context.Context) (int64, error) { 147 + return 0, nil 148 + } 149 + 146 150 func (f *fakeStore) Ping(_ context.Context) error { 147 151 return nil 148 152 }
+8 -4
packages/api/internal/reindex/reindex.go
··· 12 12 13 13 // Options controls which documents are reindexed. 14 14 type Options struct { 15 - Collection string // reindex documents in this collection only 16 - DID string // reindex documents authored by this DID only 17 - DocumentID string // reindex a single document by stable ID 18 - DryRun bool // log intended work without writing 15 + // reindex documents in this collection only 16 + Collection string 17 + // reindex documents authored by this DID only 18 + DID string 19 + // reindex a single document by stable ID 20 + DocumentID string 21 + // log intended work without writing 22 + DryRun bool 19 23 } 20 24 21 25 // Result summarises the outcome of a reindex run.
+9
packages/api/internal/store/sql_store.go
··· 460 460 return n, nil 461 461 } 462 462 463 + func (s *SQLStore) CountPendingIndexingJobs(ctx context.Context) (int64, error) { 464 + var n int64 465 + err := s.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM indexing_jobs WHERE status = 'pending'`).Scan(&n) 466 + if err != nil { 467 + return 0, fmt.Errorf("count pending indexing jobs: %w", err) 468 + } 469 + return n, nil 470 + } 471 + 463 472 func (s *SQLStore) Ping(ctx context.Context) error { 464 473 return s.db.PingContext(ctx) 465 474 }
+1
packages/api/internal/store/store.go
··· 96 96 GetFollowSubjects(ctx context.Context, did string) ([]string, error) 97 97 GetRepoCollaborators(ctx context.Context, repoOwnerDID string) ([]string, error) 98 98 CountDocuments(ctx context.Context) (int64, error) 99 + CountPendingIndexingJobs(ctx context.Context) (int64, error) 99 100 Ping(ctx context.Context) error 100 101 }
+1 -1
packages/api/internal/view/templates/docs/index.html
··· 1 1 {{define "title"}}API Docs &mdash; Twister{{end}} 2 2 {{define "content"}} 3 3 <h1>API Documentation</h1> 4 - <p>Twister exposes a public JSON API for searching indexed <a href="https://tangled.sh" target="_blank" rel="noopener">Tangled</a> content. No authentication is required for read endpoints.</p> 4 + <p>Twister exposes a public JSON API for searching indexed <a href="https://tangled.org" target="_blank" rel="noopener">Tangled</a> content. No authentication is required for read endpoints.</p> 5 5 6 6 <h2>Base URL</h2> 7 7 <pre><code>https://&lt;your-twister-domain&gt;</code></pre>
+1 -1
packages/api/internal/view/templates/layout.html
··· 25 25 </main> 26 26 <footer class="footer"> 27 27 <div class="footer-inner"> 28 - <span>Twister &mdash; search for <a href="https://tangled.sh" target="_blank" rel="noopener">Tangled</a></span> 28 + <span>Twister &mdash; search for <a href="https://tangled.org" target="_blank" rel="noopener">Tangled</a></span> 29 29 </div> 30 30 </footer> 31 31 {{block "scripts" .}}{{end}}
+1 -3
packages/api/internal/xrpc/did.go
··· 77 77 } 78 78 79 79 // ResolveIdentity resolves a DID to its PDS endpoint and handle. 80 + // Both fields are best-effort: callers that require PDS should check info.PDS != "". 80 81 func (c *Client) ResolveIdentity(ctx context.Context, did string) (*IdentityInfo, error) { 81 82 doc, err := c.ResolveDIDDoc(ctx, did) 82 83 if err != nil { ··· 90 91 info.PDS = strings.TrimSpace(svc.ServiceEndpoint) 91 92 break 92 93 } 93 - } 94 - if info.PDS == "" { 95 - return nil, fmt.Errorf("no atproto pds endpoint in did document for %s", did) 96 94 } 97 95 98 96 for _, aka := range doc.AlsoKnownAs {
+25
packages/api/internal/xrpc/did_test.go
··· 85 85 } 86 86 } 87 87 88 + func TestResolveIdentity_NoPDS(t *testing.T) { 89 + doc := DIDDocument{ 90 + ID: "did:plc:nopds", 91 + AlsoKnownAs: []string{"at://alice.test"}, 92 + Service: []DIDService{}, 93 + } 94 + 95 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 96 + json.NewEncoder(w).Encode(doc) 97 + })) 98 + defer srv.Close() 99 + 100 + c := NewClient(WithPLCDirectory(srv.URL)) 101 + info, err := c.ResolveIdentity(context.Background(), "did:plc:nopds") 102 + if err != nil { 103 + t.Fatal("ResolveIdentity should succeed even with no PDS service:", err) 104 + } 105 + if info.Handle != "alice.test" { 106 + t.Errorf("expected handle alice.test, got %q", info.Handle) 107 + } 108 + if info.PDS != "" { 109 + t.Errorf("expected empty PDS, got %q", info.PDS) 110 + } 111 + } 112 + 88 113 func TestResolveDIDDoc_UnsupportedMethod(t *testing.T) { 89 114 c := NewClient() 90 115 _, err := c.ResolveDIDDoc(context.Background(), "did:key:z123")
+3
packages/api/internal/xrpc/records.go
··· 120 120 if err != nil { 121 121 return "", fmt.Errorf("resolve pds for %s: %w", repo, err) 122 122 } 123 + if info.PDS == "" { 124 + return "", fmt.Errorf("no atproto pds in did document for %s", repo) 125 + } 123 126 return info.PDS, nil 124 127 }
+7 -7
packages/api/internal/xrpc/repo.go
··· 31 31 return name, nil 32 32 } 33 33 34 - // BuildWebURL builds a canonical tangled.sh URL for a record. 34 + // BuildWebURL builds a canonical tangled.org URL for a record. 35 35 // recordType should be one of: "repo", "issue", "pull", "issue_comment", "pull_comment", "profile". 36 36 func BuildWebURL(ownerHandle, repoName, recordType, rkey string) string { 37 37 if ownerHandle == "" { ··· 41 41 42 42 switch recordType { 43 43 case "profile": 44 - return fmt.Sprintf("https://tangled.sh/%s", owner) 44 + return fmt.Sprintf("https://tangled.org/%s", owner) 45 45 case "repo": 46 46 if repoName == "" { 47 47 return "" 48 48 } 49 - return fmt.Sprintf("https://tangled.sh/%s/%s", owner, repoName) 49 + return fmt.Sprintf("https://tangled.org/%s/%s", owner, repoName) 50 50 case "issue": 51 51 if repoName == "" || rkey == "" { 52 52 return "" 53 53 } 54 - return fmt.Sprintf("https://tangled.sh/%s/%s/issues/%s", owner, repoName, rkey) 54 + return fmt.Sprintf("https://tangled.org/%s/%s/issues/%s", owner, repoName, rkey) 55 55 case "pull": 56 56 if repoName == "" || rkey == "" { 57 57 return "" 58 58 } 59 - return fmt.Sprintf("https://tangled.sh/%s/%s/pulls/%s", owner, repoName, rkey) 59 + return fmt.Sprintf("https://tangled.org/%s/%s/pulls/%s", owner, repoName, rkey) 60 60 case "issue_comment": 61 61 if repoName == "" { 62 62 return "" 63 63 } 64 - return fmt.Sprintf("https://tangled.sh/%s/%s/issues", owner, repoName) 64 + return fmt.Sprintf("https://tangled.org/%s/%s/issues", owner, repoName) 65 65 case "pull_comment": 66 66 if repoName == "" { 67 67 return "" 68 68 } 69 - return fmt.Sprintf("https://tangled.sh/%s/%s/pulls", owner, repoName) 69 + return fmt.Sprintf("https://tangled.org/%s/%s/pulls", owner, repoName) 70 70 default: 71 71 return "" 72 72 }
+7 -7
packages/api/internal/xrpc/repo_test.go
··· 7 7 owner, repo, recordType, rkey string 8 8 want string 9 9 }{ 10 - {"alice.test", "myrepo", "repo", "", "https://tangled.sh/alice.test/myrepo"}, 11 - {"alice.test", "myrepo", "issue", "123", "https://tangled.sh/alice.test/myrepo/issues/123"}, 12 - {"alice.test", "myrepo", "pull", "456", "https://tangled.sh/alice.test/myrepo/pulls/456"}, 13 - {"alice.test", "myrepo", "issue_comment", "789", "https://tangled.sh/alice.test/myrepo/issues"}, 14 - {"alice.test", "myrepo", "pull_comment", "789", "https://tangled.sh/alice.test/myrepo/pulls"}, 15 - {"alice.test", "", "profile", "", "https://tangled.sh/alice.test"}, 16 - {"@alice.test", "myrepo", "repo", "", "https://tangled.sh/alice.test/myrepo"}, 10 + {"alice.test", "myrepo", "repo", "", "https://tangled.org/alice.test/myrepo"}, 11 + {"alice.test", "myrepo", "issue", "123", "https://tangled.org/alice.test/myrepo/issues/123"}, 12 + {"alice.test", "myrepo", "pull", "456", "https://tangled.org/alice.test/myrepo/pulls/456"}, 13 + {"alice.test", "myrepo", "issue_comment", "789", "https://tangled.org/alice.test/myrepo/issues"}, 14 + {"alice.test", "myrepo", "pull_comment", "789", "https://tangled.org/alice.test/myrepo/pulls"}, 15 + {"alice.test", "", "profile", "", "https://tangled.org/alice.test"}, 16 + {"@alice.test", "myrepo", "repo", "", "https://tangled.org/alice.test/myrepo"}, 17 17 {"", "myrepo", "repo", "", ""}, 18 18 {"alice.test", "", "repo", "", ""}, 19 19 {"alice.test", "myrepo", "unknown", "", ""},