···11+### OAuth 2.0 Endpoints with AIP
22+33+The AIP server implements the following OAuth 2.0 endpoints:
44+55+- `GET ${AIP_BASE_URL}/oauth/authorize` - Authorization endpoint for OAuth flows
66+- `POST ${AIP_BASE_URL}/oauth/token` - Token endpoint for exchanging
77+ authorization codes for access tokens
88+- `POST ${AIP_BASE_URL}/oauth/par` - Pushed Authorization Request endpoint
99+ (RFC 9126)
1010+- `POST ${AIP_BASE_URL}/oauth/clients/register` - Dynamic Client Registration
1111+ endpoint (RFC 7591)
1212+- `GET ${AIP_BASE_URL}/oauth/atp/callback` - ATProtocol OAuth callback handler
1313+- `GET ${AIP_BASE_URL}/.well-known/oauth-authorization-server` - OAuth server
1414+ metadata discovery (RFC 8414)
1515+- `GET ${AIP_BASE_URL}/.well-known/oauth-protected-resource` - Protected
1616+ resource metadata
1717+- `GET ${AIP_BASE_URL}/.well-known/jwks.json` - JSON Web Key Set for token
1818+ verification
1919+- `GET ${AIP_BASE_URL}/oauth/userinfo` - introspection endpoint returning claims
2020+ info where sub is the user's atproto did
2121+- `GET ${AIP_BASE_URL}/api/atproto/session` - returns atproto session data
2222+2323+## Error Handling
2424+2525+All error strings must use this format:
2626+2727+ error-aip-<domain>-<number> <message>: <details>
2828+2929+Example errors:
3030+3131+- error-slice-resolve-1 Multiple DIDs resolved for method
3232+- error-slice-plc-1 HTTP request failed: https://google.com/ Not Found
3333+- error-slice-key-1 Error decoding key: invalid
3434+3535+Errors should be represented as enums using the `thiserror` library when
3636+possible using `src/errors.rs` as a reference and example.
3737+3838+Avoid creating new errors with the `anyhow!(...)` macro.
3939+4040+## Time, Date, and Duration
4141+4242+Use the `chrono` crate for time, date, and duration logic.
4343+4444+Use the `duration_str` crate for parsing string duration values.
4545+4646+All stored dates and times must be in UTC. UTC should be used whenever
4747+determining the current time and computing values like expiration.
4848+4949+## HTTP Handler Organization
5050+5151+HTTP handlers should be organized as Rust source files in the `src/http`
5252+directory and should have the `handler_` prefix. Each handler should have it's
5353+own request and response types and helper functionality.
5454+5555+Example handler: `handler_index.rs`
5656+5757+- After updating, run `cargo check` to fix errors and warnings
5858+- Don't use dead code, if it's not used remove it
5959+- Ise htmx and hyperscript when possible, if not javascript in script tag is ok
···11+# AT Protocol Indexing Service - Technical Specification
22+33+## Project Overview
44+55+Build a high-performance, scalable indexing service for AT Protocol that
66+automatically generates typed APIs for any lexicon, with intelligent data
77+fetching strategies and real-time synchronization.
88+99+### Core Goals
1010+1111+- **Universal Lexicon Support**: Automatically handle any AT Protocol lexicon
1212+ without manual configuration
1313+- **Multi-Language Client Generation**: Generate typed API clients for
1414+ TypeScript, Rust, Python, Go, etc.
1515+- **High Performance**: Handle millions of records efficiently with smart
1616+ caching and batching
1717+- **Real-time Sync**: Support both bulk imports and live firehose updates
1818+- **Developer Experience**: Hasura-style auto-generated APIs with full type
1919+ safety
2020+2121+## Architecture Overview
2222+2323+### Data Storage Strategy
2424+2525+**Primary Database: PostgreSQL**
2626+2727+- Single source of truth for all indexed records
2828+- Single table approach for maximum flexibility across arbitrary lexicons
2929+- JSONB for complete record storage and sophisticated querying
3030+- Optional partitioning by collection for very high volume deployments
3131+3232+```sql
3333+-- Single table for all AT Protocol records
3434+CREATE TABLE IF NOT EXISTS "record" (
3535+ "uri" TEXT PRIMARY KEY NOT NULL,
3636+ "cid" TEXT NOT NULL,
3737+ "did" TEXT NOT NULL,
3838+ "collection" TEXT NOT NULL,
3939+ "json" JSONB NOT NULL, -- Use JSONB for performance and querying
4040+ "indexedAt" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
4141+);
4242+4343+-- Essential indexes for performance
4444+CREATE INDEX IF NOT EXISTS idx_record_collection ON "record"("collection");
4545+CREATE INDEX IF NOT EXISTS idx_record_did ON "record"("did");
4646+CREATE INDEX IF NOT EXISTS idx_record_indexed_at ON "record"("indexedAt");
4747+CREATE INDEX IF NOT EXISTS idx_record_json_gin ON "record" USING GIN("json");
4848+4949+-- Collection-specific indexes for common queries
5050+CREATE INDEX IF NOT EXISTS idx_record_collection_did ON "record"("collection", "did");
5151+CREATE INDEX IF NOT EXISTS idx_record_cid ON "record"("cid");
5252+```
5353+5454+**Caching Strategy**
5555+5656+- **Redis**: Hot data caching, query result caching, rate limiting
5757+- **Application-level**: Compiled lexicon handlers, parsed schemas
5858+- **CDN**: Public API endpoints with appropriate cache headers
5959+6060+**PostgreSQL JSONB Advantages**
6161+6262+- **GIN indexes**: Fast querying on JSON content with `@>`, `?`, `?&`, `?|`
6363+ operators
6464+- **JSON operators**: Rich querying with `->`, `->>`, `#>`, `#>>` for nested
6565+ access
6666+- **JSON path queries**: Complex nested field access and filtering
6767+- **Performance**: JSONB stored in optimized binary format for fast access
6868+- **Flexibility**: Handle arbitrary lexicon schemas without schema migrations
6969+7070+### Search Implementation
7171+7272+**Hybrid Approach**:
7373+7474+- **PostgreSQL**: Primary queries, exact matches, admin operations, complex
7575+ joins
7676+- **Optional Search Engine**: User-facing search, fuzzy matching, aggregations,
7777+ analytics
7878+7979+**Search Engine Options**:
8080+8181+- **Typesense**: Easy setup, good performance for smaller deployments
8282+- **Meilisearch**: Excellent for instant search experiences
8383+- **Elasticsearch/OpenSearch**: Full-featured for large-scale deployments
8484+8585+## Record Fetching Strategies
8686+8787+### Decision Matrix
8888+8989+| Scenario | Strategy | Reasoning |
9090+| -------------------- | ----------------------- | -------------------------------------------- |
9191+| Initial sync | CAR file download | Most efficient for bulk data |
9292+| Real-time updates | Firehose stream | Live updates as they happen |
9393+| Catch-up sync (<24h) | List + individual fetch | Good for small gaps |
9494+| Catch-up sync (>24h) | CAR file re-download | More efficient than many individual requests |
9595+| Single record update | Individual fetch | Targeted and fast |
9696+9797+### Implementation Strategy
9898+9999+```rust
100100+async fn smart_sync(&self, did: &str) -> Result<()> {
101101+ let last_sync = self.get_last_sync_time(did).await?;
102102+103103+ match last_sync {
104104+ None => self.sync_repo_car(did).await?, // Initial: CAR file
105105+ Some(last) if Utc::now() - last > Duration::hours(24) => {
106106+ self.sync_repo_car(did).await? // Full resync: CAR file
107107+ }
108108+ Some(last) => {
109109+ self.incremental_sync(did, last).await? // Incremental: List + fetch
110110+ }
111111+ }
112112+113113+ Ok(())
114114+}
115115+```
116116+117117+## Dynamic Lexicon System
118118+119119+### Why Single Table Works Better for AT Protocol
120120+121121+**Lexicon characteristics that favor single table:**
122122+123123+- **Runtime schema definition**: Lexicons can be arbitrary and defined by any
124124+ developer
125125+- **Shared metadata**: All records have common fields (CID, timestamp, author,
126126+ etc.)
127127+- **Flexible querying**: Query across different record types seamlessly
128128+- **Unknown schema count**: Could have hundreds of different lexicons
129129+130130+### Unified Query Interface
131131+132132+**Cross-lexicon querying capabilities:**
133133+134134+```sql
135135+-- Posts with specific hashtags
136136+SELECT * FROM "record"
137137+WHERE "collection" = 'app.bsky.feed.post'
138138+AND "json"->>'text' ILIKE '%#atproto%';
139139+140140+-- All records by author across all lexicons
141141+SELECT "collection", COUNT(*) FROM "record"
142142+WHERE "did" = 'did:plc:example'
143143+GROUP BY "collection";
144144+145145+-- Cross-lexicon search for any record with text content
146146+SELECT * FROM "record"
147147+WHERE "json" ? 'text'
148148+AND "json"->>'text' ILIKE '%search term%';
149149+150150+-- Recent records across all collections
151151+SELECT "uri", "collection", "json"->>'$type' as record_type, "indexedAt"
152152+FROM "record"
153153+WHERE "indexedAt" > NOW() - INTERVAL '24 hours'
154154+ORDER BY "indexedAt" DESC;
155155+```
156156+157157+### Schema Management
158158+159159+**Components**:
160160+161161+1. **Lexicon Registry**: Parse and store lexicon definitions for validation
162162+2. **Indexer Lexicons**: Define the indexer's own XRPC procedures with proper
163163+ lexicons
164164+3. **Validation Layer**: Ensure records conform to their lexicon schemas
165165+4. **XRPC Server**: Serve both indexed AT Protocol data and indexer's own
166166+ procedures
167167+5. **Type Generator**: Generate typed interfaces for all lexicons (AT Protocol +
168168+ indexer)
169169+170170+### Dynamic Index Creation
171171+172172+```sql
173173+-- Add lexicon-specific indexes as needed for performance
174174+CREATE INDEX IF NOT EXISTS idx_posts_text ON "record" USING GIN(("json"->'text'))
175175+WHERE "collection" = 'app.bsky.feed.post';
176176+177177+CREATE INDEX IF NOT EXISTS idx_profiles_handle ON "record"(("json"->>'handle'))
178178+WHERE "collection" = 'app.bsky.actor.profile';
179179+180180+-- For very high volume, consider partitioning by collection
181181+CREATE TABLE "record_posts" PARTITION OF "record"
182182+FOR VALUES IN ('app.bsky.feed.post');
183183+184184+-- Composite indexes for common query patterns
185185+CREATE INDEX IF NOT EXISTS idx_record_collection_created_at ON "record"("collection", ("json"->>'createdAt'))
186186+WHERE "json" ? 'createdAt';
187187+```
188188+189189+### Implementation Strategy
190190+191191+```rust
192192+async fn register_lexicon(lexicon: LexiconDoc) -> Result<()> {
193193+ // 1. Store lexicon definition for validation
194194+ self.store_lexicon_schema(lexicon).await?;
195195+196196+ // 2. Create collection-specific indexes if needed
197197+ self.create_performance_indexes(&lexicon.id).await?;
198198+199199+ // 3. Register XRPC handlers for core AT Protocol lexicons
200200+ if lexicon.id.starts_with("com.atproto.") {
201201+ self.register_atproto_handlers(&lexicon.id).await?;
202202+ }
203203+204204+ // 4. Generate TypeScript types for all lexicons (AT Protocol + indexer)
205205+ self.generate_client_types(&lexicon.id).await?;
206206+207207+ Ok(())
208208+}
209209+210210+async fn initialize_indexer_lexicons(&self) -> Result<()> {
211211+ // Define and register the indexer's own XRPC procedures
212212+ let indexer_lexicons = vec![
213213+ self.create_list_records_lexicon(),
214214+ self.create_search_records_lexicon(),
215215+ self.create_get_record_lexicon(),
216216+ // ... other indexer procedures
217217+ ];
218218+219219+ for lexicon in indexer_lexicons {
220220+ self.register_indexer_procedure(lexicon).await?;
221221+ }
222222+223223+ Ok(())
224224+}
225225+```
226226+227227+### Record Validation
228228+229229+**Validation Layer**: Ensure data integrity with lexicon schema validation
230230+231231+```rust
232232+async fn insert_record(&self, record: ATProtoRecord) -> Result<()> {
233233+ // 1. Validate against lexicon schema
234234+ let lexicon = self.get_lexicon_schema(&record.collection).await?;
235235+ self.validate_record_against_schema(&record.json, &lexicon)?;
236236+237237+ // 2. Insert with proper indexing
238238+ sqlx::query!(
239239+ r#"INSERT INTO "record" ("uri", "cid", "did", "collection", "json", "indexedAt")
240240+ VALUES ($1, $2, $3, $4, $5, $6)
241241+ ON CONFLICT ("uri")
242242+ DO UPDATE SET
243243+ "cid" = EXCLUDED."cid",
244244+ "json" = EXCLUDED."json",
245245+ "indexedAt" = EXCLUDED."indexedAt""#,
246246+ record.uri,
247247+ record.cid,
248248+ record.did,
249249+ record.collection,
250250+ record.json,
251251+ record.indexed_at
252252+ ).execute(&self.db).await?;
253253+254254+ Ok(())
255255+}
256256+257257+// Batch processing for CAR file imports
258258+async fn batch_insert_records(&self, records: &[ATProtoRecord]) -> Result<()> {
259259+ let mut tx = self.db.begin().await?;
260260+261261+ for record in records {
262262+ sqlx::query!(
263263+ r#"INSERT INTO "record" ("uri", "cid", "did", "collection", "json", "indexedAt")
264264+ VALUES ($1, $2, $3, $4, $5, $6)
265265+ ON CONFLICT ("uri")
266266+ DO UPDATE SET
267267+ "cid" = EXCLUDED."cid",
268268+ "json" = EXCLUDED."json",
269269+ "indexedAt" = EXCLUDED."indexedAt""#,
270270+ record.uri,
271271+ record.cid,
272272+ record.did,
273273+ record.collection,
274274+ record.json,
275275+ record.indexed_at
276276+ ).execute(&mut *tx).await?;
277277+ }
278278+279279+ tx.commit().await?;
280280+ Ok(())
281281+}
282282+```
283283+284284+### API Generation Strategy
285285+286286+**XRPC Endpoints** with proper lexicon definitions:
287287+288288+```
289289+GET /xrpc/com.indexer.records.list # List records for collection
290290+GET /xrpc/com.indexer.records.get # Get specific record
291291+POST /xrpc/com.indexer.records.create # Create record
292292+POST /xrpc/com.indexer.records.update # Update record
293293+POST /xrpc/com.indexer.records.delete # Delete record
294294+295295+# Advanced query procedures
296296+GET /xrpc/com.indexer.records.search # Full-text search on record content
297297+GET /xrpc/com.indexer.records.filter # JSON field filtering
298298+GET /xrpc/com.indexer.author.listRecords # All records by author (cross-collection)
299299+GET /xrpc/com.indexer.search.global # Global search across all collections
300300+```
301301+302302+**Lexicon Definitions** for indexer procedures:
303303+304304+```json
305305+{
306306+ "lexicon": 1,
307307+ "id": "com.indexer.records.list",
308308+ "defs": {
309309+ "main": {
310310+ "type": "query",
311311+ "description": "List records for a specific collection",
312312+ "parameters": {
313313+ "collection": {
314314+ "type": "string",
315315+ "description": "Collection/lexicon ID (e.g. app.bsky.feed.post)",
316316+ "required": true
317317+ },
318318+ "author": {
319319+ "type": "string",
320320+ "description": "Filter by author DID"
321321+ },
322322+ "limit": {
323323+ "type": "integer",
324324+ "minimum": 1,
325325+ "maximum": 100,
326326+ "default": 25
327327+ },
328328+ "cursor": {
329329+ "type": "string",
330330+ "description": "Pagination cursor"
331331+ }
332332+ },
333333+ "output": {
334334+ "encoding": "application/json",
335335+ "schema": {
336336+ "type": "object",
337337+ "required": ["records"],
338338+ "properties": {
339339+ "records": {
340340+ "type": "array",
341341+ "items": { "$ref": "#/defs/indexedRecord" }
342342+ },
343343+ "cursor": { "type": "string" }
344344+ }
345345+ }
346346+ }
347347+ },
348348+ "indexedRecord": {
349349+ "type": "object",
350350+ "required": ["uri", "cid", "value", "indexedAt"],
351351+ "properties": {
352352+ "uri": { "type": "string", "format": "at-uri" },
353353+ "cid": { "type": "string" },
354354+ "value": { "type": "unknown" },
355355+ "indexedAt": { "type": "string", "format": "datetime" },
356356+ "collection": { "type": "string" },
357357+ "rkey": { "type": "string" },
358358+ "authorDid": { "type": "string", "format": "did" }
359359+ }
360360+ }
361361+ }
362362+}
363363+```
364364+365365+**Benefits of XRPC + Lexicons**:
366366+367367+- **Native AT Protocol**: Indexer becomes a proper AT Protocol service
368368+- **Discoverable APIs**: Lexicons can be fetched and introspected
369369+- **Type Generation**: Same code generation works for indexer APIs
370370+- **Consistent**: Uses established AT Protocol patterns
371371+- **Composable**: Can be mixed with other AT Protocol services
372372+373373+**XRPC Implementation Examples**:
374374+375375+```rust
376376+// XRPC query handler for listing records
377377+async fn handle_list_records(&self, params: ListRecordsParams) -> Result<ListRecordsOutput> {
378378+ let records = sqlx::query!(
379379+ r#"SELECT "uri", "cid", "did", "collection", "json", "indexedAt"
380380+ FROM "record"
381381+ WHERE "collection" = $1
382382+ AND ($2::text IS NULL OR "did" = $2)
383383+ ORDER BY "indexedAt" DESC
384384+ LIMIT $3"#,
385385+ params.collection,
386386+ params.author,
387387+ params.limit.unwrap_or(25) as i32
388388+ ).fetch_all(&self.db).await?;
389389+390390+ let indexed_records: Vec<IndexedRecord> = records.into_iter().map(|row| {
391391+ IndexedRecord {
392392+ uri: row.uri,
393393+ cid: row.cid,
394394+ did: row.did,
395395+ collection: row.collection,
396396+ value: serde_json::from_str(&row.json.to_string()).unwrap_or_default(),
397397+ indexed_at: row.indexedAt.to_rfc3339(),
398398+ }
399399+ }).collect();
400400+401401+ Ok(ListRecordsOutput {
402402+ records: indexed_records,
403403+ cursor: self.generate_cursor(&records).await?,
404404+ })
405405+}
406406+407407+// XRPC search handler with JSONB queries
408408+async fn handle_search_records(&self, params: SearchParams) -> Result<SearchOutput> {
409409+ let records = sqlx::query!(
410410+ r#"SELECT "uri", "cid", "did", "collection", "json", "indexedAt"
411411+ FROM "record"
412412+ WHERE ($1::text IS NULL OR "collection" = $1)
413413+ AND "json"->>'text' ILIKE $2
414414+ ORDER BY "indexedAt" DESC
415415+ LIMIT $3"#,
416416+ params.collection,
417417+ format!("%{}%", params.query),
418418+ params.limit.unwrap_or(25) as i32
419419+ ).fetch_all(&self.db).await?;
420420+421421+ Ok(SearchOutput {
422422+ records: records.into_iter().map(|row| IndexedRecord {
423423+ uri: row.uri,
424424+ cid: row.cid,
425425+ did: row.did,
426426+ collection: row.collection,
427427+ value: serde_json::from_str(&row.json.to_string()).unwrap_or_default(),
428428+ indexed_at: row.indexedAt.to_rfc3339(),
429429+ }).collect()
430430+ })
431431+}
432432+```
433433+434434+## Multi-Language Client Generation
435435+436436+### Initial Target: TypeScript
437437+438438+**Primary focus**: Generate fully typed TypeScript clients for web applications
439439+and Node.js services
440440+441441+- **Type Safety**: Complete interfaces for all request/response objects
442442+- **Auto-completion**: Full IDE support with generated types
443443+- **Runtime Validation**: Optional runtime type checking
444444+- **Documentation**: Auto-generated JSDoc comments from lexicon descriptions
445445+446446+### Future Language Support
447447+448448+**Planned targets** for multi-language expansion:
449449+450450+- **Rust**: High-performance services, CLI tools
451451+- **Python**: Data analysis, ML workflows, web backends
452452+- **Go**: Microservices, system tools
453453+454454+### Code Generation Pipeline
455455+456456+**Extensible architecture** designed for multiple languages:
457457+458458+```rust
459459+trait CodeGenerator {
460460+ fn generate_client(&self, lexicons: &[LexiconDoc]) -> Result<String>;
461461+ fn generate_types(&self, lexicon: &LexiconDoc) -> Result<String>;
462462+ fn generate_method(&self, nsid: &str, def: &LexiconDef) -> Result<String>;
463463+}
464464+465465+// Initial implementation: TypeScript
466466+impl CodeGenerator for TypeScriptGenerator {
467467+ fn generate_client(&self, lexicons: &[LexiconDoc]) -> Result<String> {
468468+ // Generate TypeScript client with full type safety
469469+ }
470470+}
471471+472472+// Future implementations:
473473+// impl CodeGenerator for RustGenerator { /* ... */ }
474474+// impl CodeGenerator for PythonGenerator { /* ... */ }
475475+// impl CodeGenerator for GoGenerator { /* ... */ }
476476+```
477477+478478+### TypeScript Client Generation
479479+480480+**Type-Safe Generic XRPC Client with Auto-Discovery:**
481481+482482+```typescript
483483+// Registry of all known collections -> their record types
484484+interface CollectionRecordMap {
485485+ // Core AT Protocol (always included)
486486+ "app.bsky.feed.post": PostRecord;
487487+ "app.bsky.actor.profile": ProfileRecord;
488488+ "app.bsky.feed.like": LikeRecord;
489489+490490+ // Dynamically discovered custom lexicons
491491+ "recipes.cooking-app.com": RecipeRecord;
492492+ "tasks.productivity-tool.io": TaskRecord;
493493+ "photos.gallery-app.net": PhotoRecord;
494494+ "someRecord.something-cool.indexer.com": SomeCustomRecord;
495495+}
496496+497497+// Generic input/output types with conditional typing
498498+interface CreateRecordInput<T extends keyof CollectionRecordMap> {
499499+ collection: T;
500500+ repo: string; // The DID that will become the 'did' field
501501+ rkey?: string; // Used to construct the URI
502502+ record: CollectionRecordMap[T]; // Type depends on collection!
503503+}
504504+505505+interface ListRecordsParams<T extends keyof CollectionRecordMap> {
506506+ collection: T;
507507+ author?: string;
508508+ limit?: number;
509509+ cursor?: string;
510510+}
511511+512512+interface ListRecordsOutput<T extends keyof CollectionRecordMap> {
513513+ records: Array<{
514514+ uri: string;
515515+ cid: string;
516516+ did: string; // Author DID
517517+ collection: T;
518518+ value: CollectionRecordMap[T]; // Typed based on collection (parsed from json field)
519519+ indexedAt: string;
520520+ }>;
521521+ cursor?: string;
522522+}
523523+524524+// Generated client class with conditional types
525525+export class ATProtoIndexerClient {
526526+ private client: AxiosInstance;
527527+528528+ constructor(baseURL: string, accessToken?: string) {
529529+ this.client = axios.create({
530530+ baseURL,
531531+ headers: accessToken ? { Authorization: `Bearer ${accessToken}` } : {},
532532+ });
533533+ }
534534+535535+ // Generic method - fully typed based on collection parameter
536536+ async createRecord<T extends keyof CollectionRecordMap>(
537537+ input: CreateRecordInput<T>,
538538+ ): Promise<CreateRecordOutput>;
539539+540540+ // Fallback for unknown collections
541541+ async createRecord(input: {
542542+ collection: string;
543543+ repo: string;
544544+ rkey?: string;
545545+ record: unknown;
546546+ }): Promise<CreateRecordOutput>;
547547+548548+ // Implementation handles both cases
549549+ async createRecord(input: any): Promise<CreateRecordOutput> {
550550+ const response = await this.client.post(
551551+ "/xrpc/com.indexer.records.create",
552552+ input,
553553+ );
554554+ return response.data;
555555+ }
556556+557557+ // Generic typed list method
558558+ async listRecords<T extends keyof CollectionRecordMap>(
559559+ params: ListRecordsParams<T>,
560560+ ): Promise<ListRecordsOutput<T>>;
561561+562562+ // Fallback for unknown collections
563563+ async listRecords(params: {
564564+ collection: string;
565565+ author?: string;
566566+ limit?: number;
567567+ cursor?: string;
568568+ }): Promise<ListRecordsOutput<string>>;
569569+570570+ async listRecords(params: any): Promise<any> {
571571+ const response = await this.client.get("/xrpc/com.indexer.records.list", {
572572+ params,
573573+ });
574574+ return response.data;
575575+ }
576576+577577+ // Convenience methods for popular collections
578578+ async createPost(
579579+ input: Omit<CreateRecordInput<"app.bsky.feed.post">, "collection">,
580580+ ) {
581581+ return this.createRecord({ ...input, collection: "app.bsky.feed.post" });
582582+ }
583583+584584+ async listPosts(
585585+ params: Omit<ListRecordsParams<"app.bsky.feed.post">, "collection">,
586586+ ) {
587587+ return this.listRecords({ ...params, collection: "app.bsky.feed.post" });
588588+ }
589589+590590+ // Auto-generated convenience methods for custom lexicons
591591+ async createRecipe(
592592+ input: Omit<CreateRecordInput<"recipes.cooking-app.com">, "collection">,
593593+ ) {
594594+ return this.createRecord({
595595+ ...input,
596596+ collection: "recipes.cooking-app.com",
597597+ });
598598+ }
599599+600600+ async searchRecords(params: SearchRecordsParams): Promise<SearchOutput> {
601601+ const response = await this.client.get("/xrpc/com.indexer.records.search", {
602602+ params,
603603+ });
604604+ return response.data;
605605+ }
606606+}
607607+```
608608+609609+**Usage Examples with Full Type Safety:**
610610+611611+```typescript
612612+const indexer = new ATProtoIndexerClient("https://indexer.example.com");
613613+614614+// ✅ Fully typed for known collections
615615+await indexer.createPost({
616616+ repo: "did:plc:user123",
617617+ record: {
618618+ $type: "app.bsky.feed.post",
619619+ text: "Hello!",
620620+ createdAt: new Date().toISOString(),
621621+ // TypeScript knows this must be a PostRecord
622622+ },
623623+});
624624+625625+// ✅ Custom lexicon with full typing
626626+await indexer.createRecord({
627627+ collection: "recipes.cooking-app.com",
628628+ repo: "did:plc:chef456",
629629+ record: {
630630+ $type: "recipes.cooking-app.com",
631631+ title: "Pizza",
632632+ ingredients: ["dough", "sauce", "cheese"],
633633+ difficulty: "easy",
634634+ // TypeScript enforces RecipeRecord structure
635635+ },
636636+});
637637+638638+// ✅ Query with same type safety - returns typed results
639639+const posts = await indexer.listPosts({
640640+ author: "did:plc:user123",
641641+ limit: 50,
642642+});
643643+// posts.records[0].value is typed as PostRecord!
644644+645645+// ✅ Unknown collection - falls back gracefully
646646+await indexer.createRecord({
647647+ collection: "new-app.startup.xyz",
648648+ repo: "did:plc:user789",
649649+ record: {
650650+ customField: "value", // No type checking, but still works
651651+ },
652652+});
653653+```
654654+655655+**Auto-Discovery Implementation:**
656656+657657+```rust
658658+// Indexer discovers and registers custom lexicons dynamically
659659+impl ATProtoIndexer {
660660+ async fn discover_lexicons(&self) -> Result<Vec<LexiconDoc>> {
661661+ let mut lexicons = Vec::new();
662662+663663+ // Core AT Protocol lexicons
664664+ lexicons.extend(self.load_core_lexicons().await?);
665665+666666+ // Custom lexicons from indexed records
667667+ let custom_collections = sqlx::query!(
668668+ r#"SELECT DISTINCT "collection" FROM "record"
669669+ WHERE "collection" NOT LIKE 'app.bsky.%'
670670+ AND "collection" NOT LIKE 'com.atproto.%'"#
671671+ ).fetch_all(&self.db).await?;
672672+673673+ for row in custom_collections {
674674+ if let Ok(lexicon) = self.fetch_lexicon_definition(&row.collection).await {
675675+ lexicons.push(lexicon);
676676+ }
677677+ }
678678+679679+ Ok(lexicons)
680680+ }
681681+682682+ async fn fetch_lexicon_definition(&self, nsid: &str) -> Result<LexiconDoc> {
683683+ // Fetch from domain's well-known endpoint
684684+ let domain = nsid.split('.').last().unwrap_or("");
685685+ let lexicon_url = format!("https://{}/.well-known/atproto/lexicon/{}", domain, nsid);
686686+687687+ let response = self.client.get(&lexicon_url).send().await?;
688688+ let lexicon: LexiconDoc = response.json().await?;
689689+ Ok(lexicon)
690690+ }
691691+692692+ async fn regenerate_typescript_client(&self) -> Result<()> {
693693+ let all_lexicons = self.discover_lexicons().await?;
694694+ let typescript_code = self.typescript_generator.generate_client(&all_lexicons)?;
695695+696696+ // Write to file or serve via API endpoint
697697+ self.write_client_code("typescript", &typescript_code).await?;
698698+ Ok(())
699699+ }
700700+701701+ // Get statistics about indexed collections
702702+ async fn get_collection_stats(&self) -> Result<Vec<CollectionStats>> {
703703+ let stats = sqlx::query!(
704704+ r#"SELECT "collection",
705705+ COUNT(*) as record_count,
706706+ COUNT(DISTINCT "did") as unique_authors,
707707+ MIN("indexedAt") as first_indexed,
708708+ MAX("indexedAt") as last_indexed
709709+ FROM "record"
710710+ GROUP BY "collection"
711711+ ORDER BY record_count DESC"#
712712+ ).fetch_all(&self.db).await?;
713713+714714+ Ok(stats.into_iter().map(|row| CollectionStats {
715715+ collection: row.collection,
716716+ record_count: row.record_count.unwrap_or(0) as u64,
717717+ unique_authors: row.unique_authors.unwrap_or(0) as u64,
718718+ first_indexed: row.first_indexed,
719719+ last_indexed: row.last_indexed,
720720+ }).collect())
721721+ }
722722+}
723723+```
724724+725725+**Lexicon Discovery Protocol:**
726726+727727+```json
728728+// GET https://cooking-app.com/.well-known/atproto/lexicon/recipes.cooking-app.com
729729+{
730730+ "lexicon": 1,
731731+ "id": "recipes.cooking-app.com",
732732+ "description": "Recipe sharing lexicon",
733733+ "defs": {
734734+ "main": {
735735+ "type": "record",
736736+ "record": {
737737+ "type": "object",
738738+ "required": ["$type", "title", "ingredients"],
739739+ "properties": {
740740+ "$type": { "const": "recipes.cooking-app.com" },
741741+ "title": { "type": "string" },
742742+ "ingredients": { "type": "array", "items": { "type": "string" } },
743743+ "cookingTime": { "type": "integer" },
744744+ "difficulty": { "type": "string", "enum": ["easy", "medium", "hard"] }
745745+ }
746746+ }
747747+ }
748748+ }
749749+}
750750+```
751751+752752+**Generated CLI with Discovery:**
753753+754754+```bash
755755+# Generate TypeScript client with auto-discovered lexicons
756756+npx atproto-codegen typescript \
757757+ --discover \
758758+ --output ./src/generated/indexer-client.ts \
759759+ --endpoint https://your-indexer.com
760760+761761+# Or specify additional custom lexicons
762762+npx atproto-codegen typescript \
763763+ --lexicons recipes.cooking-app.com,tasks.productivity-tool.io \
764764+ --output ./src/generated/indexer-client.ts \
765765+ --endpoint https://your-indexer.com
766766+```
767767+768768+## Implementation Technology Stack
769769+770770+### Backend: Rust
771771+772772+**Rationale**:
773773+774774+- Zero-copy parsing of CAR files and CBOR data
775775+- Memory safety for long-running indexing processes
776776+- High-performance concurrent processing
777777+- Strong type system prevents runtime errors
778778+- Excellent async ecosystem (Tokio)
779779+780780+### Client Generation: TypeScript (Initial Target)
781781+782782+**Rationale**:
783783+784784+- **Primary ecosystem**: Most AT Protocol developers use JavaScript/TypeScript
785785+- **Immediate value**: Web apps and Node.js services are common use cases
786786+- **Type safety**: Excellent TypeScript support for generated interfaces
787787+- **Developer experience**: Full IDE support with auto-completion
788788+- **Ecosystem compatibility**: Works with React, Next.js, Express, etc.
789789+790790+### Key Dependencies
791791+792792+```toml
793793+[dependencies]
794794+tokio = { version = "1.0", features = ["full"] }
795795+sqlx = { version = "0.7", features = ["postgres", "chrono", "serde_json"] }
796796+serde = { version = "1.0", features = ["derive"] }
797797+reqwest = { version = "0.11", features = ["json", "stream"] }
798798+libipld = { version = "0.16", features = ["dag-cbor", "car"] }
799799+tokio-tungstenite = "0.20" # WebSocket for firehose
800800+redis = { version = "0.23", features = ["tokio-comp"] }
801801+tracing = "0.1"
802802+803803+# Code generation dependencies
804804+handlebars = "4.0" # Template engine for TypeScript generation
805805+```
806806+807807+## Performance Optimizations
808808+809809+### Concurrent Processing
810810+811811+- **Bounded concurrency**: Limit simultaneous CAR file processing
812812+- **Streaming**: Process large CAR files without loading entirely into memory
813813+- **Batching**: Group database operations for better throughput
814814+- **Connection pooling**: Efficient database connection management
815815+816816+### Rate Limiting
817817+818818+```rust
819819+// Token bucket implementation for API rate limiting
820820+struct RateLimiter {
821821+ tokens: Arc<Mutex<f64>>,
822822+ max_tokens: f64,
823823+ refill_rate: f64, // tokens per second
824824+}
825825+```
826826+827827+### Memory Management
828828+829829+- **Streaming CAR processing**: Avoid loading entire repos into memory
830830+- **LRU caches**: Intelligent caching of frequently accessed data
831831+- **Pagination**: Cursor-based pagination for large result sets
832832+833833+## Real-Time Synchronization
834834+835835+### Firehose Integration
836836+837837+```rust
838838+async fn start_firehose_listener(&self) -> Result<()> {
839839+ let (ws_stream, _) = connect_async(
840840+ "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos"
841841+ ).await?;
842842+843843+ // Process commits in real-time
844844+ while let Some(msg) = read.next().await {
845845+ if let Ok(commit) = self.parse_commit(&msg) {
846846+ self.process_commit(commit).await?;
847847+ }
848848+ }
849849+850850+ Ok(())
851851+}
852852+```
853853+854854+### Sync Strategies
855855+856856+1. **Initial Bootstrap**: Download existing data via CAR files
857857+2. **Real-time Updates**: Process firehose stream for live changes
858858+3. **Periodic Reconciliation**: Compare local state with remote to catch missed
859859+ updates
860860+4. **Backfill**: Handle gaps in data due to downtime
861861+862862+## API Design
863863+864864+### Core Principles
865865+866866+- **RESTful**: Follow REST conventions where applicable
867867+- **Lexicon-Agnostic**: Work with any current or future AT Protocol lexicon
868868+- **Type-Safe**: Generate strongly typed clients
869869+- **Cacheable**: Design for HTTP caching and CDN distribution
870870+- **Paginated**: Support cursor-based pagination for large datasets
871871+872872+### Authentication
873873+874874+- **Optional**: Support authenticated requests for private data
875875+- **Bearer tokens**: Standard AT Protocol authentication
876876+- **Rate limiting**: Per-user and global rate limits
877877+878878+### Response Format
879879+880880+```json
881881+{
882882+ "data": [...],
883883+ "cursor": "next_page_token",
884884+ "count": 42,
885885+ "total": 1337
886886+}
887887+```
888888+889889+## Risk Mitigation
890890+891891+### Data Consistency
892892+893893+- **Idempotent operations**: Safe to retry any indexing operation
894894+- **Checksum validation**: Verify CAR file integrity
895895+- **Reconciliation**: Periodic comparison with authoritative sources
896896+897897+### Scalability
898898+899899+- **Horizontal scaling**: Design for multiple indexer instances
900900+- **Database sharding**: Partition by lexicon type or DID prefix
901901+- **Caching layers**: Multiple levels of caching for performance
902902+903903+### Operational
904904+905905+- **Circuit breakers**: Prevent cascade failures
906906+- **Graceful degradation**: Continue operating with reduced functionality
907907+- **Monitoring**: Comprehensive observability for quick issue detection
+505
api/docs/lexicons_spec.md
···11+Lexicon Lexicon is a schema definition language used to describe atproto
22+records, HTTP endpoints (XRPC), and event stream messages. It builds on top of
33+the atproto Data Model.
44+55+The schema language is similar to JSON Schema and OpenAPI, but includes some
66+atproto-specific features and semantics.
77+88+This specification describes version 1 of the Lexicon definition language.
99+1010+Overview of Types Lexicon Type Data Model Type Category null Null concrete
1111+boolean Boolean concrete integer Integer concrete string String concrete
1212+bytes Bytes concrete cid-link Link concrete blob Blob concrete
1313+array Array container object Object container params container token meta
1414+ref meta union meta unknown meta record primary query primary
1515+procedure primary subscription primary Lexicon Files Lexicons are JSON files
1616+associated with a single NSID. A file contains one or more definitions, each
1717+with a distinct short name. A definition with the name main optionally describes
1818+the "primary" definition for the entire file. A Lexicon with zero definitions is
1919+invalid.
2020+2121+A Lexicon JSON file is an object with the following fields:
2222+2323+lexicon (integer, required): indicates Lexicon language version. In this
2424+version, a fixed value of 1 id (string, required): the NSID of the Lexicon
2525+description (string, optional): short overview of the Lexicon, usually one or
2626+two sentences defs (map of strings-to-objects, required): set of definitions,
2727+each with a distinct name (key) Schema definitions under defs all have a type
2828+field to distinguish their type. A file can have at most one definition with one
2929+of the "primary" types. Primary types should always have the name main. It is
3030+possible for main to describe a non-primary type.
3131+3232+References to specific definitions within a Lexicon use fragment syntax, like
3333+com.example.defs#someView. If a main definition exists, it can be referenced
3434+without a fragment, just using the NSID. For references in the $type fields in
3535+data objects themselves (eg, records or contents of a union), this is a "must"
3636+(use of a #main suffix is invalid). For example, com.example.record not
3737+com.example.record#main.
3838+3939+Related Lexicons are often grouped together in the NSID hierarchy. As a
4040+convention, any definitions used by multiple Lexicons are defined in a dedicated
4141+*.defs Lexicon (eg, com.atproto.server.defs) within the group. A *.defs Lexicon
4242+should generally not include a definition named main, though it is not strictly
4343+invalid to do so.
4444+4545+Primary Type Definitions The primary types are:
4646+4747+query: describes an XRPC Query (HTTP GET) procedure: describes an XRPC Procedure
4848+(HTTP POST) subscription: Event Stream (WebSocket) record: describes an object
4949+that can be stored in a repository record Each primary definition schema object
5050+includes these fields:
5151+5252+type (string, required): the type value (eg, record for records) description
5353+(string, optional): short, usually only a sentence or two Record Type-specific
5454+fields:
5555+5656+key (string, required): specifies the Record Key type record (object, required):
5757+a schema definition with type object, which specifies this type of record Query
5858+and Procedure (HTTP API) Type-specific fields:
5959+6060+parameters (object, optional): a schema definition with type params, describing
6161+the HTTP query parameters for this endpoint output (object, optional): describes
6262+the HTTP response body description (string, optional): short description
6363+encoding (string, required): MIME type for body contents. Use application/json
6464+for JSON responses. schema (object, optional): schema definition, either an
6565+object, a ref, or a union of refs. Used to describe JSON encoded responses,
6666+though schema is optional even for JSON responses. input (object, optional, only
6767+for procedure): describes HTTP request body schema, with the same format as the
6868+output field errors (array of objects, optional): set of string error codes
6969+which might be returned name (string, required): short name for the error type,
7070+with no whitespace description (string, optional): short description, one or two
7171+sentences Subscription (Event Stream) Type-specific fields:
7272+7373+parameters (object, optional): same as Query and Procedure message (object,
7474+optional): specifies what messages can be description (string, optional): short
7575+description schema (object, required): schema definition, which must be a union
7676+of refs errors (array of objects, optional): same as Query and Procedure
7777+Subscription schemas (referenced by the schema field under message) must be a
7878+union of refs, not an object type.
7979+8080+Field Type Definitions As with the primary definitions, every schema object
8181+includes these fields:
8282+8383+type (string, required): fixed value for each type description (string,
8484+optional): short, usually only a sentence or two null No additional fields.
8585+8686+boolean Type-specific fields:
8787+8888+default (boolean, optional): a default value for this field const (boolean,
8989+optional): a fixed (constant) value for this field When included as an HTTP
9090+query parameter, should be rendered as true or false (no quotes).
9191+9292+integer A signed integer number.
9393+9494+Type-specific fields:
9595+9696+minimum (integer, optional): minimum acceptable value maximum (integer,
9797+optional): maximum acceptable value enum (array of integers, optional): a closed
9898+set of allowed values default (integer, optional): a default value for this
9999+field const (integer, optional): a fixed (constant) value for this field string
100100+Type-specific fields:
101101+102102+format (string, optional): string format restriction maxLength (integer,
103103+optional): maximum length of value, in UTF-8 bytes minLength (integer,
104104+optional): minimum length of value, in UTF-8 bytes maxGraphemes (integer,
105105+optional): maximum length of value, counted as Unicode Grapheme Clusters
106106+minGraphemes (integer, optional): minimum length of value, counted as Unicode
107107+Grapheme Clusters knownValues (array of strings, optional): a set of suggested
108108+or common values for this field. Values are not limited to this set (aka, not a
109109+closed enum). enum (array of strings, optional): a closed set of allowed values
110110+default (string, optional): a default value for this field const (string,
111111+optional): a fixed (constant) value for this field Strings are Unicode. For
112112+non-Unicode encodings, use bytes instead. The basic minLength/maxLength
113113+validation constraints are counted as UTF-8 bytes. Note that Javascript stores
114114+strings with UTF-16 by default, and it is necessary to re-encode to count
115115+accurately. The minGraphemes/maxGraphemes validation constraints work with
116116+Grapheme Clusters, which have a complex technical and linguistic definition, but
117117+loosely correspond to "distinct visual characters" like Latin letters, CJK
118118+characters, punctuation, digits, or emoji (which might comprise multiple Unicode
119119+codepoints and many UTF-8 bytes).
120120+121121+format constrains the string format and provides additional semantic context.
122122+Refer to the Data Model specification for the available format types and their
123123+definitions.
124124+125125+const and default are mutually exclusive.
126126+127127+bytes Type-specific fields:
128128+129129+minLength (integer, optional): minimum size of value, as raw bytes with no
130130+encoding maxLength (integer, optional): maximum size of value, as raw bytes with
131131+no encoding cid-link No type-specific fields.
132132+133133+See Data Model spec for CID restrictions.
134134+135135+array Type-specific fields:
136136+137137+items (object, required): describes the schema elements of this array minLength
138138+(integer, optional): minimum count of elements in array maxLength (integer,
139139+optional): maximum count of elements in array In theory arrays have homogeneous
140140+types (meaning every element as the same type). However, with union types this
141141+restriction is meaningless, so implementations can not assume that all the
142142+elements have the same type.
143143+144144+object A generic object schema which can be nested inside other definitions by
145145+reference.
146146+147147+Type-specific fields:
148148+149149+properties (map of strings-to-objects, required): defines the properties
150150+(fields) by name, each with their own schema required (array of strings,
151151+optional): indicates which properties are required nullable (array of strings,
152152+optional): indicates which properties can have null as a value As described in
153153+the data model specification, there is a semantic difference in data between
154154+omitting a field; including the field with the value null; and including the
155155+field with a "false-y" value (false, 0, empty array, etc).
156156+157157+blob Type-specific fields:
158158+159159+accept (array of strings, optional): list of acceptable MIME types. Each may end
160160+in * as a glob pattern (eg, image/*). Use _/_ to indicate that any MIME type is
161161+accepted. maxSize (integer, optional): maximum size in bytes params This is a
162162+limited-scope type which is only ever used for the parameters field on query,
163163+procedure, and subscription primary types. These map to HTTP query parameters.
164164+165165+Type-specific fields:
166166+167167+required (array of strings, optional): same semantics as field on object
168168+properties: similar to properties under object, but can only include the types
169169+boolean, integer, string, and unknown; or an array of one of these types Note
170170+that unlike object, there is no nullable field on params.
171171+172172+token Tokens are empty data values which exist only to be referenced by name.
173173+They are used to define a set of values with specific meanings. The description
174174+field should clarify the meaning of the token. Tokens encode as string data,
175175+with the string being the fully-qualified reference to the token itself (NSID
176176+followed by an optional fragment).
177177+178178+Tokens are similar to the concept of a "symbol" in some programming languages,
179179+distinct from strings, variables, built-in keywords, or other identifiers.
180180+181181+For example, tokens could be defined to represent the state of an entity (in a
182182+state machine), or to enumerate a list of categories.
183183+184184+No type-specific fields.
185185+186186+ref Type-specific fields:
187187+188188+ref (string, required): reference to another schema definition Refs are a
189189+mechanism for re-using a schema definition in multiple places. The ref string
190190+can be a global reference to a Lexicon type definition (an NSID, optionally with
191191+a #-delimited name indicating a definition other than main), or can indicate a
192192+local definition within the same Lexicon file (a # followed by a name).
193193+194194+union Type-specific fields:
195195+196196+refs (array of strings, required): references to schema definitions closed
197197+(boolean, optional): indicates if a union is "open" or "closed". defaults to
198198+false (open union) Unions represent that multiple possible types could be
199199+present at this location in the schema. The references follow the same syntax as
200200+ref, allowing references to both global or local schema definitions. Actual data
201201+will validate against a single specific type: the union does not combine fields
202202+from multiple schemas, or define a new hybrid data type. The different types are
203203+referred to as variants.
204204+205205+By default unions are "open", meaning that future revisions of the schema could
206206+add more types to the list of refs (though can not remove types). This means
207207+that implementations should be permissive when validating, in case they do not
208208+have the most recent version of the Lexicon. The closed flag (boolean) can
209209+indicate that the set of types is fixed and can not be extended in the future.
210210+211211+A union schema definition with no refs is allowed and similar to unknown, as
212212+long as the closed flag is false (the default). The main difference is that the
213213+data would be required to have the $type field. An empty refs list with closed
214214+set to true is an invalid schema.
215215+216216+The schema definitions pointed to by a union are objects or types with a clear
217217+mapping to an object, like a record. All the variants must be represented by a
218218+CBOR map (or JSON Object) and must include a $type field indicating the variant
219219+type. Because the data must be an object, unions can not reference token (which
220220+would correspond to string data).
221221+222222+unknown Indicates than any data object could appear at this location, with no
223223+specific validation. The top-level data must be an object (not a string,
224224+boolean, etc). As with all other data types, the value null is not allowed
225225+unless the field is specifically marked as nullable.
226226+227227+The data object may contain a
228228+$type field indicating the schema of the data, but this is not currently required. The top-level data object must not have the structure of a compound data type, like blob ($type:
229229+blob) or CID link ($link).
230230+231231+The (nested) contents of the data object must still be valid under the atproto
232232+data model. For example, it should not contain floats. Nested compound types
233233+like blobs and CID links should be validated and transformed as expected.
234234+235235+Lexicon designers are strongly recommended to not use unknown fields in record
236236+objects for now.
237237+238238+No type-specific fields.
239239+240240+String Formats Strings can optionally be constrained to one of the following
241241+format types:
242242+243243+at-identifier: either a Handle or a DID, details described below at-uri: AT-URI
244244+cid: CID in string format, details specified in Data Model datetime: timestamp,
245245+details specified below did: generic DID Identifier handle: Handle Identifier
246246+nsid: Namespaced Identifier tid: Timestamp Identifier (TID) record-key: Record
247247+Key, matching the general syntax ("any") uri: generic URI, details specified
248248+below language: language code, details specified below For the various
249249+identifier formats, when doing Lexicon schema validation the most expansive
250250+identifier syntax format should be permitted. Problems with identifiers which do
251251+pass basic syntax validation should be reported as application errors, not
252252+lexicon data validation errors. For example, data with any kind of DID in a did
253253+format string field should pass Lexicon validation, with unsupported DID methods
254254+being raised separately as an application error.
255255+256256+at-identifier A string type which is either a DID (type: did) or a handle
257257+(handle). Mostly used in XRPC query parameters. It is unambiguous whether an
258258+at-identifier is a handle or a DID because a DID always starts with did:, and
259259+the colon character (:) is not allowed in handles.
260260+261261+datetime Full-precision date and time, with timezone information.
262262+263263+This format is intended for use with computer-generated timestamps in the modern
264264+computing era (eg, after the UNIX epoch). If you need to represent historical or
265265+ancient events, ambiguity, or far-future times, a different format is probably
266266+more appropriate. Datetimes before the Current Era (year zero) as specifically
267267+disallowed.
268268+269269+Datetime format standards are notoriously flexible and overlapping. Datetime
270270+strings in atproto should meet the intersecting requirements of the RFC 3339,
271271+ISO 8601, and WHATWG HTML datetime standards.
272272+273273+The character separating "date" and "time" parts must be an upper-case T.
274274+275275+Timezone specification is required. It is strongly preferred to use the UTC
276276+timezone, and to represent the timezone with a simple capital Z suffix
277277+(lower-case is not allowed). While hour/minute suffix syntax (like +01:00 or
278278+-10:30) is supported, "negative zero" (-00:00) is specifically disallowed (by
279279+ISO 8601).
280280+281281+Whole seconds precision is required, and arbitrary fractional precision digits
282282+are allowed. Best practice is to use at least millisecond precision, and to pad
283283+with zeros to the generated precision (eg, trailing :12.340Z instead of
284284+:12.34Z). Not all datetime formatting libraries support trailing zero
285285+formatting. Both millisecond and microsecond precision have reasonable
286286+cross-language support; nanosecond precision does not.
287287+288288+Implementations should be aware when round-tripping records containing datetimes
289289+of two ambiguities: loss-of-precision, and ambiguity with trailing fractional
290290+second zeros. If de-serializing Lexicon records into native types, and then
291291+re-serializing, the string representation may not be the same, which could
292292+result in broken hash references, sanity check failures, or repository update
293293+churn. A safer thing to do is to deserialize the datetime as a simple string,
294294+which ensures round-trip re-serialization.
295295+296296+Implementations "should" validate that the semantics of the datetime are valid.
297297+For example, a month or day 00 is invalid.
298298+299299+Valid examples:
300300+301301+# preferred
302302+303303+1985-04-12T23:20:50.123Z 1985-04-12T23:20:50.123456Z 1985-04-12T23:20:50.120Z
304304+1985-04-12T23:20:50.120000Z
305305+306306+# supported
307307+308308+1985-04-12T23:20:50.12345678912345Z 1985-04-12T23:20:50Z 1985-04-12T23:20:50.0Z
309309+1985-04-12T23:20:50.123+00:00 1985-04-12T23:20:50.123-07:00
310310+311311+Copy Copied! Invalid examples:
312312+313313+1985-04-12 1985-04-12T23:20Z 1985-04-12T23:20:5Z 1985-04-12T23:20:50.123
314314++001985-04-12T23:20:50.123Z 23:20:50.123Z -1985-04-12T23:20:50.123Z
315315+1985-4-12T23:20:50.123Z 01985-04-12T23:20:50.123Z 1985-04-12T23:20:50.123+00
316316+1985-04-12T23:20:50.123+0000
317317+318318+# ISO-8601 strict capitalization
319319+320320+1985-04-12t23:20:50.123Z 1985-04-12T23:20:50.123z
321321+322322+# RFC-3339, but not ISO-8601
323323+324324+1985-04-12T23:20:50.123-00:00 1985-04-12 23:20:50.123Z
325325+326326+# timezone is required
327327+328328+1985-04-12T23:20:50.123
329329+330330+# syntax looks ok, but datetime is not valid
331331+332332+1985-04-12T23:99:50.123Z 1985-00-12T23:20:50.123Z
333333+334334+Copy Copied! uri Flexible to any URI schema, following the generic RFC-3986 on
335335+URIs. This includes, but isn’t limited to: did, https, wss, ipfs (for CIDs),
336336+dns, and of course at. Maximum length in Lexicons is 8 KBytes.
337337+338338+language An IETF Language Tag string, compliant with BCP 47, defined in RFC 5646
339339+("Tags for Identifying Languages"). This is the same standard used to identify
340340+languages in HTTP, HTML, and other web standards. The Lexicon string must
341341+validate as a "well-formed" language tag, as defined in the RFC. Clients should
342342+ignore language strings which are "well-formed" but not "valid" according to the
343343+RFC.
344344+345345+As specified in the RFC, ISO 639 two-character and three-character language
346346+codes can be used on their own, lower-cased, such as ja (Japanese) or ban
347347+(Balinese). Regional sub-tags can be added, like pt-BR (Brazilian Portuguese).
348348+Additional subtags can also be added, such as hy-Latn-IT-arevela.
349349+350350+Language codes generally need to be parsed, normalized, and matched
351351+semantically, not simply string-compared. For example, a search engine might
352352+simplify language tags to ISO 639 codes for indexing and filtering, while a
353353+client application (user agent) would retain the full language code for
354354+presentation (text rendering) locally.
355355+356356+When to use $type Data objects sometimes include a $type field which indicates
357357+their Lexicon type. The general principle is that this field needs to be
358358+included any time there could be ambiguity about the content type when
359359+validating data.
360360+361361+The specific rules are:
362362+363363+record objects must always include $type. While the type is often known from
364364+context (eg, the collection part of the path for records stored in a
365365+repository), record objects can also be passed around outside of repositories
366366+and need to be self-describing union variants must always include $type, except
367367+at the top level of subscription messages Note that blob objects always include
368368+$type, which allows generic processing.
369369+370370+As a reminder, main types must be referenced in $type fields as just the NSID,
371371+not including a #main suffix.
372372+373373+Lexicon Evolution Lexicons are allowed to change over time, within some bounds
374374+to ensure both forwards and backwards compatibility. The basic principle is that
375375+all old data must still be valid under the updated Lexicon, and new data must be
376376+valid under the old Lexicon.
377377+378378+Any new fields must be optional Non-optional fields can not be removed. A best
379379+practice is to retain all fields in the Lexicon and mark them as deprecated if
380380+they are no longer used. Types can not change Fields can not be renamed If
381381+larger breaking changes are necessary, a new Lexicon name must be used.
382382+383383+It can be ambiguous when a Lexicon has been published and becomes "set in
384384+stone". At a minimum, public adoption and implementation by a third party, even
385385+without explicit permission, indicates that the Lexicon has been released and
386386+should not break compatibility. A best practice is to clearly indicate in the
387387+Lexicon type name any experimental or development status. Eg,
388388+com.corp.experimental.newRecord.
389389+390390+Authority and Control The authority for a Lexicon is determined by the NSID, and
391391+rooted in DNS control of the domain authority. That authority has ultimate
392392+control over the Lexicon definition, and responsibility for maintenance and
393393+distribution of Lexicon schema definitions.
394394+395395+In a crisis, such as unintentional loss of DNS control to a bad actor, the
396396+protocol ecosystem could decide to disregard this chain of authority. This
397397+should only be done in exceptional circumstances, and not as a mechanism to
398398+subvert an active authority. The primary mechanism for resolving protocol
399399+disputes is to fork Lexicons in to a new namespace.
400400+401401+Protocol implementations should generally consider data which fails to validate
402402+against the Lexicon to be entirely invalid, and should not try to repair or do
403403+partial processing on the individual piece of data.
404404+405405+Unexpected fields in data which otherwise conforms to the Lexicon should be
406406+ignored. When doing schema validation, they should be treated at worst as
407407+warnings. This is necessary to allow evolution of the schema by the controlling
408408+authority, and to be robust in the case of out-of-date Lexicons.
409409+410410+Third parties can technically insert any additional fields they want into data.
411411+This is not the recommended way to extend applications, but it is not
412412+specifically disallowed. One danger with this is that the Lexicon may be updated
413413+to include fields with the same field names but different types, which would
414414+make existing data invalid.
415415+416416+Lexicon Publication and Resolution Lexicon schemas are published publicly as
417417+records in atproto repositories, using the com.atproto.lexicon.schema type. The
418418+domain name authority for NSIDs to specific atproto repositories (identified by
419419+DID is linked by a DNS TXT record (_lexicon), similar to but distinct from the
420420+handle resolution system.
421421+422422+The com.atproto.lexicon.schema Lexicon itself is very minimal: it only requires
423423+the lexicon integer field, which must be 1 for this version of the Lexicon
424424+language. In practice, same fields as Lexicon Files should be included, along
425425+with $type. The record key is the NSID of the schema.
426426+427427+A summary of record fields:
428428+429429+$type: must be com.atproto.lexicon.schema (as with all atproto records) lexicon:
430430+integer, indicates the overall version of the Lexicon (currently 1) id: the NSID
431431+of this Lexicon. Must be a simple NSID (no fragment), and must match the record
432432+key defs: the schema definitions themselves, as a map-of-objects. Names should
433433+not include a # prefix. description: optional description of the overall schema;
434434+though descriptions are best included on individual defs, not the overall
435435+schema. The com.atproto.lexicon.schema meta-schema is somewhat unlike other
436436+Lexicons, in that it is defined and governed as part of the protocol. Future
437437+versions of the language and protocol might not follow the evolution rules. It
438438+is an intentional decision to not express the Lexicon schema language itself
439439+recursively, using the schema language.
440440+441441+Authority for NSID namespaces is done at the "group" level, meaning that all
442442+NSIDs which differ only by the final "name" part are all published in the same
443443+repository. Lexicon resolution of NSIDs is not hierarchical: DNS TXT records
444444+must be created for each authority section, and resolvers should not recurse up
445445+or down the DNS hierarchy looking for TXT records.
446446+447447+As an example, the NSID edu.university.dept.lab.blogging.getBlogPost has a
448448+"name" getBlogPost. Removing the name and reversing the rest of the NSID gives
449449+an "authority domain name" of blogging.lab.dept.university.edu. To link the
450450+authority to a specific DID (say did:plc:ewvi7nxzyoun6zhxrhs64oiz), a DNS TXT
451451+record with the name _lexicon.blogging.lab.dept.university.edu and value
452452+did=did:plc:ewvi7nxzyoun6zhxrhs64oiz (note the did= prefix) would be created.
453453+Then a record with collection com.atproto.lexicon.schema and record-key
454454+edu.university.dept.lab.blogging.getBlogPost would be created in that account's
455455+repository.
456456+457457+A resolving service would start with the NSID
458458+(edu.university.dept.lab.blogging.getBlogPost) and do a DNS TXT resolution for
459459+_lexicon.blogging.lab.dept.university.edu. Finding the DID, it would proceed
460460+with atproto DID resolution, look for a PDS, and then fetch the relevant record.
461461+The overall AT-URI for the record would be
462462+at://did:plc:ewvi7nxzyoun6zhxrhs64oiz/com.atproto.lexicon.schema/edu.university.dept.lab.blogging.getBlogPost.
463463+464464+If the DNS TXT resolution for _lexicon.blogging.lab.dept.university.edu failed,
465465+the resolving service would NOT try _lexicon.lab.dept.university.edu or
466466+_lexicon.getBlogPost.blogging.lab.dept.university.edu or
467467+_lexicon.university.edu, or any other domain name. The Lexicon resolution would
468468+simply fail.
469469+470470+If another NSID edu.university.dept.lab.blogging.getBlogComments was created, it
471471+would have the same authority name, and must be published in the same atproto
472472+repository (with a different record key). If a Lexicon for
473473+edu.university.dept.lab.gallery.photo was published, a new DNS TXT record would
474474+be required (_lexicon.gallery.lab.dept.university.edu; it could point at the
475475+same repository (DID), or a different repository.
476476+477477+As a simpler example, an NSID app.toy.record would resolve via _lexicon.toy.app.
478478+479479+A single repository can host Lexicons for multiple authority domains, possibly
480480+across multiple registered domains and TLDs. Resolution DNS records can change
481481+over time, moving schema resolution to different repositories, though it may
482482+take time for DNS and cache changes to propagate.
483483+484484+Note that Lexicon record operations are broadcast over repository event streams
485485+("firehose"), but that DNS resolution changes do not (unlike handle changes).
486486+Resolving services should not cache DNS resolution results for long time
487487+periods.
488488+489489+Usage and Implementation Guidelines It should be possible to translate Lexicon
490490+schemas to JSON Schema or OpenAPI and use tools and libraries from those
491491+ecosystems to work with atproto data in JSON format.
492492+493493+Implementations which serialize and deserialize data from JSON or CBOR into
494494+structures derived from specific Lexicons should be aware of the risk of
495495+"clobbering" unexpected fields. For example, if a Lexicon is updated to add a
496496+new (optional) field, old implementations would not be aware of that field, and
497497+might accidentally strip the data when de-serializing and then re-serializing.
498498+Depending on the context, one way to avoid this problem is to retain any "extra"
499499+fields, or to pass-through the original data object instead of re-serializing
500500+it.
501501+502502+Possible Future Changes The validation rules for unexpected additional fields
503503+may change. For example, a mechanism for Lexicons to indicate that the schema is
504504+"closed" and unexpected fields are not allowed, or a convention around field
505505+name prefixes (x-) to indicate unofficial extension.
api/lexicons.zip
This is a binary file and will not be displayed.
+21
api/migrations/001_initial.sql
···11+-- AT Protocol Indexer Database Schema
22+-- Single table approach for maximum flexibility across arbitrary lexicons
33+44+CREATE TABLE IF NOT EXISTS "record" (
55+ "uri" TEXT PRIMARY KEY NOT NULL,
66+ "cid" TEXT NOT NULL,
77+ "did" TEXT NOT NULL,
88+ "collection" TEXT NOT NULL,
99+ "json" JSONB NOT NULL,
1010+ "indexedAt" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
1111+);
1212+1313+-- Essential indexes for performance
1414+CREATE INDEX IF NOT EXISTS idx_record_collection ON "record"("collection");
1515+CREATE INDEX IF NOT EXISTS idx_record_did ON "record"("did");
1616+CREATE INDEX IF NOT EXISTS idx_record_indexed_at ON "record"("indexedAt");
1717+CREATE INDEX IF NOT EXISTS idx_record_json_gin ON "record" USING GIN("json");
1818+1919+-- Collection-specific indexes for common queries
2020+CREATE INDEX IF NOT EXISTS idx_record_collection_did ON "record"("collection", "did");
2121+CREATE INDEX IF NOT EXISTS idx_record_cid ON "record"("cid");
+10
api/migrations/002_lexicons.sql
···11+-- Add lexicons table for storing AT Protocol lexicon schemas
22+CREATE TABLE IF NOT EXISTS "lexicons" (
33+ "nsid" TEXT PRIMARY KEY NOT NULL,
44+ "definitions" JSONB NOT NULL,
55+ "created_at" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
66+ "updated_at" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
77+);
88+99+CREATE INDEX IF NOT EXISTS idx_lexicons_nsid ON "lexicons"("nsid");
1010+CREATE INDEX IF NOT EXISTS idx_lexicons_definitions ON "lexicons" USING gin("definitions");
+9
api/migrations/003_actors.sql
···11+-- Add actors table for storing AT Protocol actor/profile data
22+CREATE TABLE IF NOT EXISTS "actor" (
33+ "did" TEXT PRIMARY KEY NOT NULL,
44+ "handle" TEXT,
55+ "indexedAt" TEXT NOT NULL
66+);
77+88+CREATE INDEX IF NOT EXISTS idx_actor_handle ON "actor"("handle");
99+CREATE INDEX IF NOT EXISTS idx_actor_indexed_at ON "actor"("indexedAt");
···11+use chrono::{Utc};
22+use reqwest::Client;
33+use serde::Deserialize;
44+use serde_json::Value;
55+use tracing::{error, info};
66+77+use crate::database::Database;
88+use crate::errors::SyncError;
99+use crate::models::{Actor, Record};
1010+use crate::utils::is_primary_collection;
1111+1212+1313+#[derive(Debug, Deserialize)]
1414+struct AtProtoRecord {
1515+ uri: String,
1616+ cid: String,
1717+ value: Value,
1818+}
1919+2020+#[derive(Debug, Deserialize)]
2121+struct ListRecordsResponse {
2222+ records: Vec<AtProtoRecord>,
2323+ cursor: Option<String>,
2424+}
2525+2626+2727+#[derive(Debug, Deserialize)]
2828+struct ListReposByCollectionResponse {
2929+ repos: Vec<RepoRef>,
3030+}
3131+3232+#[derive(Debug, Deserialize)]
3333+struct RepoRef {
3434+ did: String,
3535+}
3636+3737+#[derive(Debug, Deserialize)]
3838+struct DidDocument {
3939+ service: Option<Vec<Service>>,
4040+}
4141+4242+#[derive(Debug, Deserialize)]
4343+struct Service {
4444+ #[serde(rename = "type")]
4545+ service_type: String,
4646+ #[serde(rename = "serviceEndpoint")]
4747+ service_endpoint: String,
4848+}
4949+5050+#[derive(Debug, Clone)]
5151+struct AtpData {
5252+ did: String,
5353+ pds: String,
5454+ handle: Option<String>,
5555+}
5656+5757+#[derive(Clone)]
5858+pub struct SyncService {
5959+ client: Client,
6060+ database: Database,
6161+}
6262+6363+impl SyncService {
6464+ pub fn new(database: Database) -> Self {
6565+ Self {
6666+ client: Client::new(),
6767+ database,
6868+ }
6969+ }
7070+7171+ // Sync using listRecords
7272+ pub async fn sync_repo(&self, did: &str, collections: Option<&[String]>) -> Result<i64, SyncError> {
7373+ info!("🔄 Starting sync for DID: {}", did);
7474+7575+ let total_records = self.listrecords_sync(did, collections).await?;
7676+7777+ info!("✅ Sync completed for {}: {} records", did, total_records);
7878+ Ok(total_records)
7979+ }
8080+8181+8282+ // Sync using listRecords
8383+ async fn listrecords_sync(&self, did: &str, collections: Option<&[String]>) -> Result<i64, SyncError> {
8484+ let collections_to_sync = match collections {
8585+ Some(cols) => cols,
8686+ None => return Ok(0), // No collections specified = no records
8787+ };
8888+8989+ // Get ATP data for this single repo
9090+ let atp_map = self.get_atp_map_for_repos(&[did.to_string()]).await?;
9191+9292+ let mut total_records = 0;
9393+ for collection in collections_to_sync {
9494+ match self.fetch_records_for_repo_collection_with_atp_map(did, collection, &atp_map).await {
9595+ Ok(records) => {
9696+ if !records.is_empty() {
9797+ info!("📋 Fallback sync: {} records for {}/{}", records.len(), did, collection);
9898+ self.database.batch_insert_records(&records).await?;
9999+ total_records += records.len() as i64;
100100+ }
101101+ }
102102+ Err(e) => {
103103+ error!("Failed fallback sync for {}/{}: {}", did, collection, e);
104104+ }
105105+ }
106106+ }
107107+108108+ Ok(total_records)
109109+ }
110110+111111+112112+ pub async fn backfill_collections(&self, collections: &[String], repos: Option<&[String]>) -> Result<(), SyncError> {
113113+ info!("🔄 Starting backfill operation");
114114+ info!("📚 Processing {} collections: {}", collections.len(), collections.join(", "));
115115+116116+ let all_repos = if let Some(provided_repos) = repos {
117117+ info!("📋 Using {} provided repositories", provided_repos.len());
118118+ provided_repos.to_vec()
119119+ } else {
120120+ info!("📊 Fetching repositories for collections...");
121121+ let mut unique_repos = std::collections::HashSet::new();
122122+123123+ // Separate primary and external collections
124124+ let (primary_collections, _external_collections): (Vec<_>, Vec<_>) = collections
125125+ .iter()
126126+ .partition(|collection| is_primary_collection(collection));
127127+128128+ // First, get all repos from primary collections
129129+ let mut primary_repos = std::collections::HashSet::new();
130130+ for collection in &primary_collections {
131131+ match self.get_repos_for_collection(collection).await {
132132+ Ok(repos) => {
133133+ info!("✓ Found {} repositories for primary collection \"{}\"", repos.len(), collection);
134134+ primary_repos.extend(repos);
135135+ },
136136+ Err(e) => {
137137+ error!("Failed to get repos for primary collection {}: {}", collection, e);
138138+ }
139139+ }
140140+ }
141141+142142+ info!("📋 Found {} unique repositories from primary collections", primary_repos.len());
143143+144144+ // Use primary repos for syncing (both primary and external collections)
145145+ unique_repos.extend(primary_repos);
146146+147147+ let repos: Vec<String> = unique_repos.into_iter().collect();
148148+ info!("📋 Processing {} unique repositories", repos.len());
149149+ repos
150150+ };
151151+152152+ // Get ATP data for all repos at once
153153+ info!("🔍 Resolving ATP data for repositories...");
154154+ let atp_map = self.get_atp_map_for_repos(&all_repos).await?;
155155+ info!("✓ Resolved ATP data for {}/{} repositories", atp_map.len(), all_repos.len());
156156+157157+ // Only sync repos that have valid ATP data
158158+ let valid_repos: Vec<String> = atp_map.keys().cloned().collect();
159159+ let failed_resolutions = all_repos.len() - valid_repos.len();
160160+161161+ if failed_resolutions > 0 {
162162+ info!("⚠️ {} repositories failed DID resolution and will be skipped", failed_resolutions);
163163+ }
164164+165165+ info!("🧠 Starting sync for {} repositories...", valid_repos.len());
166166+167167+ // Create parallel fetch tasks for repos with valid ATP data only
168168+ let mut fetch_tasks = Vec::new();
169169+ for repo in &valid_repos {
170170+ for collection in collections {
171171+ let repo_clone = repo.clone();
172172+ let collection_clone = collection.clone();
173173+ let sync_service = self.clone();
174174+ let atp_map_clone = atp_map.clone();
175175+176176+ let task = tokio::spawn(async move {
177177+ match sync_service.fetch_records_for_repo_collection_with_atp_map(&repo_clone, &collection_clone, &atp_map_clone).await {
178178+ Ok(records) => {
179179+ Ok((repo_clone, collection_clone, records))
180180+ }
181181+ Err(e) => {
182182+ // Handle common "not error" scenarios as empty results
183183+ match &e {
184184+ SyncError::ListRecords { status } => {
185185+ if *status == 404 || *status == 400 {
186186+ // Collection doesn't exist for this repo - return empty
187187+ Ok((repo_clone, collection_clone, vec![]))
188188+ } else {
189189+ Err(e)
190190+ }
191191+ }
192192+ SyncError::HttpRequest(_) => {
193193+ // Network errors - treat as empty (like TypeScript version)
194194+ Ok((repo_clone, collection_clone, vec![]))
195195+ }
196196+ _ => Err(e)
197197+ }
198198+ }
199199+ }
200200+ });
201201+ fetch_tasks.push(task);
202202+ }
203203+ }
204204+205205+ info!("📥 Fetching records for repositories and collections...");
206206+ info!("🔧 Debug: Created {} fetch tasks for {} repos × {} collections", fetch_tasks.len(), valid_repos.len(), collections.len());
207207+208208+ // Collect all results
209209+ let mut all_records = Vec::new();
210210+ let mut successful_tasks = 0;
211211+ let mut failed_tasks = 0;
212212+ for task in fetch_tasks {
213213+ match task.await {
214214+ Ok(Ok((_repo, _collection, records))) => {
215215+ all_records.extend(records);
216216+ successful_tasks += 1;
217217+ }
218218+ Ok(Err(_)) => {
219219+ failed_tasks += 1;
220220+ }
221221+ Err(_e) => {
222222+ failed_tasks += 1;
223223+ }
224224+ }
225225+ }
226226+227227+ info!("🔧 Debug: {} successful tasks, {} failed tasks", successful_tasks, failed_tasks);
228228+229229+ let total_records = all_records.len() as i64;
230230+ info!("✓ Fetched {} total records", total_records);
231231+232232+ // Index actors first (like the TypeScript version)
233233+ info!("📝 Indexing actors...");
234234+ self.index_actors(&valid_repos, &atp_map).await?;
235235+ info!("✓ Indexed {} actors", valid_repos.len());
236236+237237+ // Single batch insert for all records
238238+ if !all_records.is_empty() {
239239+ info!("📝 Indexing {} records...", total_records);
240240+ self.database.batch_insert_records(&all_records).await?;
241241+ }
242242+243243+ info!("✅ Backfill complete!");
244244+245245+ Ok(())
246246+ }
247247+248248+ async fn get_repos_for_collection(&self, collection: &str) -> Result<Vec<String>, SyncError> {
249249+ let response = self.client
250250+ .get("https://relay1.us-west.bsky.network/xrpc/com.atproto.sync.listReposByCollection")
251251+ .query(&[("collection", collection)])
252252+ .send()
253253+ .await?;
254254+255255+ if !response.status().is_success() {
256256+ return Err(SyncError::ListRepos { status: response.status().as_u16() });
257257+ }
258258+259259+ let repos_response: ListReposByCollectionResponse = response.json().await?;
260260+ Ok(repos_response.repos.into_iter().map(|r| r.did).collect())
261261+ }
262262+263263+ async fn fetch_records_for_repo_collection_with_atp_map(&self, repo: &str, collection: &str, atp_map: &std::collections::HashMap<String, AtpData>) -> Result<Vec<Record>, SyncError> {
264264+ let atp_data = atp_map.get(repo).ok_or_else(|| SyncError::Generic(format!("No ATP data found for repo: {}", repo)))?;
265265+ self.fetch_records_for_repo_collection(repo, collection, &atp_data.pds).await
266266+ }
267267+268268+ async fn fetch_records_for_repo_collection(&self, repo: &str, collection: &str, pds_url: &str) -> Result<Vec<Record>, SyncError> {
269269+ let mut records = Vec::new();
270270+ let mut cursor: Option<String> = None;
271271+272272+ loop {
273273+ let mut params = vec![("repo", repo), ("collection", collection), ("limit", "100")];
274274+ if let Some(ref c) = cursor {
275275+ params.push(("cursor", c));
276276+ }
277277+278278+ let response = self.client
279279+ .get(&format!("{}/xrpc/com.atproto.repo.listRecords", pds_url))
280280+ .query(¶ms)
281281+ .send()
282282+ .await?;
283283+284284+ if !response.status().is_success() {
285285+ return Err(SyncError::ListRecords { status: response.status().as_u16() });
286286+ }
287287+288288+ let list_response: ListRecordsResponse = response.json().await?;
289289+290290+ for atproto_record in list_response.records {
291291+ let record = Record {
292292+ uri: atproto_record.uri,
293293+ cid: atproto_record.cid,
294294+ did: repo.to_string(),
295295+ collection: collection.to_string(),
296296+ json: atproto_record.value,
297297+ indexed_at: Utc::now(),
298298+ };
299299+ records.push(record);
300300+ }
301301+302302+ cursor = list_response.cursor;
303303+ if cursor.is_none() {
304304+ break;
305305+ }
306306+ }
307307+308308+ Ok(records)
309309+ }
310310+311311+ async fn get_atp_map_for_repos(&self, repos: &[String]) -> Result<std::collections::HashMap<String, AtpData>, SyncError> {
312312+ let mut atp_map = std::collections::HashMap::new();
313313+314314+ for repo in repos {
315315+ if let Ok(atp_data) = self.resolve_atp_data(repo).await {
316316+ atp_map.insert(atp_data.did.clone(), atp_data);
317317+ }
318318+ }
319319+320320+ Ok(atp_map)
321321+ }
322322+323323+ async fn resolve_atp_data(&self, did: &str) -> Result<AtpData, SyncError> {
324324+ let pds = if did.starts_with("did:plc:") {
325325+ // Resolve PLC DID
326326+ let response = self.client
327327+ .get(&format!("https://plc.directory/{}", did))
328328+ .send()
329329+ .await?;
330330+331331+ if response.status().is_success() {
332332+ let did_doc: DidDocument = response.json().await?;
333333+ if let Some(services) = did_doc.service {
334334+ for service in services {
335335+ if service.service_type == "AtprotoPersonalDataServer" {
336336+ return Ok(AtpData {
337337+ did: did.to_string(),
338338+ pds: service.service_endpoint,
339339+ handle: None,
340340+ });
341341+ }
342342+ }
343343+ }
344344+ }
345345+346346+ // Fallback to bsky.social
347347+ "https://bsky.social".to_string()
348348+ } else {
349349+ // Fallback to bsky.social for non-PLC DIDs
350350+ "https://bsky.social".to_string()
351351+ };
352352+353353+ Ok(AtpData {
354354+ did: did.to_string(),
355355+ pds,
356356+ handle: None,
357357+ })
358358+ }
359359+360360+ async fn index_actors(&self, repos: &[String], atp_map: &std::collections::HashMap<String, AtpData>) -> Result<(), SyncError> {
361361+ let mut actors = Vec::new();
362362+ let now = chrono::Utc::now().to_rfc3339();
363363+364364+ for repo in repos {
365365+ if let Some(atp_data) = atp_map.get(repo) {
366366+ actors.push(Actor {
367367+ did: atp_data.did.clone(),
368368+ handle: atp_data.handle.clone(),
369369+ indexed_at: now.clone(),
370370+ });
371371+ }
372372+ }
373373+374374+ if !actors.is_empty() {
375375+ self.database.batch_insert_actors(&actors).await?;
376376+ }
377377+378378+ Ok(())
379379+ }
380380+}
+6
api/src/utils.rs
···11+/// Determines if a collection NSID is considered "primary" vs "external"
22+/// Primary collections are social.grain.* domain
33+/// Everything else is considered external
44+pub fn is_primary_collection(nsid: &str) -> bool {
55+ nsid.starts_with("social.grain.")
66+}