Highly ambitious ATProtocol AppView service and sdks
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

add type-safe sort params based on sortable lexicon fields, wire up listRecords and searchRecords to use new sort param, actually return a cursor now base64 encoded

Chad Miller 96d779bb 1f608eb2

+560 -250
+3 -2
api/Cargo.toml
··· 34 34 # UUID generation 35 35 uuid = { version = "1.0", features = ["v4", "serde"] } 36 36 37 + # Base64 encoding for cursors 38 + base64 = "0.22" 39 + 37 40 # Environment variables 38 41 dotenvy = "0.15" 39 42 ··· 45 48 atproto-identity = "0.11.2" 46 49 atproto-oauth = "0.11.2" 47 50 48 - # Base64 encoding/decoding 49 - base64 = "0.22" 50 51 51 52 # Middleware for HTTP requests with retry logic 52 53 reqwest-middleware = { version = "0.4.2", features = ["json", "multipart"] }
+4 -4
api/scripts/codegen.sh
··· 1 1 #!/bin/bash 2 2 3 3 # Collect all lexicon files and format them for the TypeScript generator 4 - echo "Collecting lexicons from ./lexicons..." 4 + echo "Collecting lexicons from ../lexicons..." 5 5 6 6 # Create a temporary JSON array of lexicons 7 7 temp_file=$(mktemp) 8 8 echo "[" > "$temp_file" 9 9 10 10 first=true 11 - for lexicon_file in $(find ./lexicons -name "*.json" -type f); do 11 + for lexicon_file in $(find ../lexicons -name "*.json" -type f); do 12 12 if [ "$first" = false ]; then 13 13 echo "," >> "$temp_file" 14 14 fi ··· 32 32 33 33 # Generate the TypeScript client 34 34 echo "Generating TypeScript client..." 35 - deno run --allow-all api/scripts/generate-typescript.ts "$(cat "$temp_file")" > api/generated_client.ts 35 + deno run --allow-all ./scripts/generate-typescript.ts "$(cat "$temp_file")" > ./generated_client.ts 36 36 37 37 # Clean up 38 38 rm "$temp_file" 39 39 40 - echo "✅ Generated TypeScript client at api/generated_client.ts" 40 + echo "✅ Generated TypeScript client at ./generated_client.ts"
+48 -5
api/scripts/generate-typescript.ts
··· 121 121 122 122 ${usageExample} 123 123 124 - import { OAuthClient } from "@slices/oauth"; 124 + import { OAuthClient } from "jsr:@slices/oauth@^0.3.2"; 125 125 126 126 `; 127 127 ··· 154 154 ], 155 155 }); 156 156 157 - // ListRecordsParams interface 157 + // ListRecordsParams interface (generic with sort fields) 158 158 sourceFile.addInterface({ 159 159 name: "ListRecordsParams", 160 + typeParameters: [{ name: "TSortField", constraint: "string", default: "string" }], 160 161 isExported: true, 161 162 properties: [ 162 163 { name: "author", type: "string", hasQuestionToken: true }, 163 164 { name: "limit", type: "number", hasQuestionToken: true }, 164 165 { name: "cursor", type: "string", hasQuestionToken: true }, 166 + { name: "sort", type: "`${TSortField}:${'asc' | 'desc'}` | `${TSortField}:${'asc' | 'desc'},${TSortField}:${'asc' | 'desc'}`", hasQuestionToken: true }, 165 167 ], 166 168 }); 167 169 ··· 172 174 properties: [{ name: "uri", type: "string" }], 173 175 }); 174 176 175 - // SearchRecordsParams interface 177 + // SearchRecordsParams interface (generic with sort fields) 176 178 sourceFile.addInterface({ 177 179 name: "SearchRecordsParams", 180 + typeParameters: [{ name: "TSortField", constraint: "string", default: "string" }], 178 181 isExported: true, 179 182 properties: [ 180 183 { name: "query", type: "string" }, 181 184 { name: "field", type: "string", hasQuestionToken: true }, 182 185 { name: "limit", type: "number", hasQuestionToken: true }, 183 186 { name: "cursor", type: "string", hasQuestionToken: true }, 187 + { name: "sort", type: "`${TSortField}:${'asc' | 'desc'}` | `${TSortField}:${'asc' | 'desc'},${TSortField}:${'asc' | 'desc'}`", hasQuestionToken: true }, 184 188 ], 185 189 }); 186 190 ··· 464 468 return Boolean(recordObj.required && recordObj.required.includes(propName)); 465 469 } 466 470 471 + // Helper function to check if a field type is sortable 472 + function isFieldSortable(propDef: any): boolean { 473 + // Check for direct types 474 + if (propDef.type) { 475 + const sortableTypes = ['string', 'integer', 'number', 'datetime']; 476 + if (sortableTypes.includes(propDef.type)) { 477 + return true; 478 + } 479 + } 480 + 481 + // Check for format-based types (datetime strings, etc.) 482 + if (propDef.format) { 483 + const sortableFormats = ['datetime', 'at-identifier', 'at-uri']; 484 + if (sortableFormats.includes(propDef.format)) { 485 + return true; 486 + } 487 + } 488 + 489 + // Arrays, objects, blobs, and complex types are not sortable 490 + return false; 491 + } 492 + 467 493 // Add lexicon-specific interfaces 468 494 function addLexiconInterfaces(): void { 469 495 for (const lexicon of lexicons) { ··· 479 505 hasQuestionToken: boolean; 480 506 docs?: string[]; 481 507 }> = []; 508 + const fieldNames: string[] = []; 482 509 483 510 if (recordDef.properties) { 484 511 for (const [propName, propDef] of Object.entries( ··· 494 521 // Add JSDoc comment if description exists 495 522 docs: propDef.description ? [propDef.description] : undefined, 496 523 }); 524 + 525 + // Collect sortable field names for sort type 526 + if (isFieldSortable(propDef)) { 527 + fieldNames.push(propName); 528 + } 497 529 } 498 530 } 499 531 532 + // Add the record interface 500 533 sourceFile.addInterface({ 501 534 name: interfaceName, 502 535 isExported: true, 503 536 properties: properties, 504 537 }); 538 + 539 + // Add sort fields type union for this record 540 + if (fieldNames.length > 0) { 541 + sourceFile.addTypeAlias({ 542 + name: `${interfaceName}SortFields`, 543 + isExported: true, 544 + type: fieldNames.map(f => `"${f}"`).join(" | "), 545 + }); 546 + } 505 547 } 506 548 } 507 549 } ··· 715 757 for (const [key, value] of Object.entries(obj)) { 716 758 if (key === "_recordType") { 717 759 // Add collection operations for this record type 760 + const sortFieldsType = `${value}SortFields`; 718 761 methods.push({ 719 762 name: "listRecords", 720 763 parameters: [ 721 764 { 722 765 name: "params", 723 - type: "ListRecordsParams", 766 + type: `ListRecordsParams<${sortFieldsType}>`, 724 767 hasQuestionToken: true, 725 768 }, 726 769 ], ··· 733 776 }); 734 777 methods.push({ 735 778 name: "searchRecords", 736 - parameters: [{ name: "params", type: "SearchRecordsParams" }], 779 + parameters: [{ name: "params", type: `SearchRecordsParams<${sortFieldsType}>` }], 737 780 returnType: `Promise<ListRecordsResponse<${value}>>`, 738 781 }); 739 782 // Add create, update, delete methods
+494 -232
api/src/database.rs
··· 1 1 use sqlx::PgPool; 2 + use base64::{Engine as _, engine::general_purpose}; 2 3 3 4 use crate::errors::DatabaseError; 4 5 use crate::models::{Actor, CollectionStats, IndexedRecord, Record}; 5 6 7 + // Helper function to get field type from lexicon definition 8 + async fn get_field_type_from_lexicon( 9 + pool: &sqlx::PgPool, 10 + slice_uri: &str, 11 + collection: &str, 12 + field: &str 13 + ) -> Option<String> { 14 + let lexicon_query = sqlx::query!( 15 + r#" 16 + SELECT json->>'definitions' as definitions 17 + FROM record 18 + WHERE collection = 'social.slices.lexicon' 19 + AND json->>'slice' = $1 20 + AND json->>'nsid' = $2 21 + AND (json->>'definitions')::jsonb->'main'->>'type' = 'record' 22 + LIMIT 1 23 + "#, 24 + slice_uri, 25 + collection 26 + ); 27 + 28 + if let Ok(Some(row)) = lexicon_query.fetch_optional(pool).await { 29 + if let Some(definitions_str) = &row.definitions { 30 + if let Ok(definitions) = serde_json::from_str::<serde_json::Value>(definitions_str) { 31 + let field_def = definitions 32 + .get("main")? 33 + .get("record")? 34 + .get("properties")? 35 + .get(field)?; 36 + 37 + // Check if it's a datetime field 38 + if let Some(field_type) = field_def.get("type").and_then(|t| t.as_str()) { 39 + if field_type == "string" { 40 + if let Some(format) = field_def.get("format").and_then(|f| f.as_str()) { 41 + if format == "datetime" { 42 + return Some("datetime".to_string()); 43 + } 44 + } 45 + } 46 + return Some(field_type.to_string()); 47 + } 48 + } 49 + } 50 + } 51 + None 52 + } 53 + 54 + // Async helper function to parse sort parameter with lexicon type information 55 + async fn parse_sort_parameter_with_lexicon( 56 + pool: &sqlx::PgPool, 57 + slice_uri: &str, 58 + collection: &str, 59 + sort: Option<&str> 60 + ) -> String { 61 + match sort { 62 + Some(sort_str) => { 63 + let mut order_clauses = Vec::new(); 64 + for sort_item in sort_str.split(',') { 65 + let parts: Vec<&str> = sort_item.trim().split(':').collect(); 66 + if parts.len() == 2 { 67 + let field = parts[0].trim(); 68 + let direction = match parts[1].trim().to_lowercase().as_str() { 69 + "desc" => "DESC", 70 + _ => "ASC", // Default to ASC for any invalid direction 71 + }; 72 + 73 + // Validate field name to prevent SQL injection 74 + if field.chars().all(|c| c.is_alphanumeric() || c == '_') { 75 + if field == "indexed_at" { 76 + order_clauses.push(format!("{field} {direction}")); 77 + } else { 78 + // Get field type from lexicon 79 + let field_type = get_field_type_from_lexicon(pool, slice_uri, collection, field).await; 80 + 81 + if field_type == Some("datetime".to_string()) { 82 + // For datetime fields, use safe casting that handles invalid dates 83 + // This will cast valid dates and return NULL for invalid ones 84 + order_clauses.push(format!("(CASE WHEN json->>'{field}' ~ '^\\d{{4}}-\\d{{2}}-\\d{{2}}T\\d{{2}}:\\d{{2}}:\\d{{2}}' THEN (json->>'{field}')::timestamptz ELSE NULL END) {direction} NULLS LAST")); 85 + } else { 86 + // For other JSON fields, handle NULLs properly with text sorting 87 + order_clauses.push(format!("json->>'{field}' {direction} NULLS LAST")); 88 + } 89 + } 90 + } 91 + } 92 + } 93 + if !order_clauses.is_empty() { 94 + // Always add indexed_at as tie-breaker if not already included 95 + let has_indexed_at = order_clauses.iter().any(|clause| clause.contains("indexed_at")); 96 + if !has_indexed_at { 97 + order_clauses.push("indexed_at DESC".to_string()); 98 + } 99 + order_clauses.join(", ") 100 + } else { 101 + "indexed_at DESC".to_string() // Default sort 102 + } 103 + } 104 + None => "indexed_at DESC".to_string() // Default sort 105 + } 106 + } 107 + 108 + // Helper function to parse sort parameter and build ORDER BY clause (fallback without lexicon) 109 + fn parse_sort_parameter(sort: Option<&str>) -> String { 110 + match sort { 111 + Some(sort_str) => { 112 + let mut order_clauses = Vec::new(); 113 + for sort_item in sort_str.split(',') { 114 + let parts: Vec<&str> = sort_item.trim().split(':').collect(); 115 + if parts.len() == 2 { 116 + let field = parts[0].trim(); 117 + let direction = match parts[1].trim().to_lowercase().as_str() { 118 + "desc" => "DESC", 119 + _ => "ASC", // Default to ASC for any invalid direction 120 + }; 121 + 122 + // Validate field name to prevent SQL injection 123 + if field.chars().all(|c| c.is_alphanumeric() || c == '_') { 124 + if field == "indexed_at" { 125 + order_clauses.push(format!("{field} {direction}")); 126 + } else { 127 + // For JSON fields, handle NULLs properly with text sorting 128 + // (No lexicon lookup in this fallback function) 129 + order_clauses.push(format!("json->>'{field}' {direction} NULLS LAST")); 130 + } 131 + } 132 + } 133 + } 134 + if !order_clauses.is_empty() { 135 + // Always add indexed_at as tie-breaker if not already included 136 + let has_indexed_at = order_clauses.iter().any(|clause| clause.contains("indexed_at")); 137 + if !has_indexed_at { 138 + order_clauses.push("indexed_at DESC".to_string()); 139 + } 140 + order_clauses.join(", ") 141 + } else { 142 + "indexed_at DESC".to_string() // Default sort 143 + } 144 + } 145 + None => "indexed_at DESC".to_string() // Default sort 146 + } 147 + } 148 + 149 + // Cursor utilities for ATProto-style pagination 150 + #[derive(Debug)] 151 + struct ParsedCursor { 152 + sort_value: String, 153 + indexed_at: chrono::DateTime<chrono::Utc>, 154 + cid: String, 155 + } 156 + 157 + fn parse_cursor(cursor: &str) -> Result<ParsedCursor, Box<dyn std::error::Error + Send + Sync>> { 158 + // First try to decode from base64 159 + let cursor_content = if let Ok(decoded) = general_purpose::URL_SAFE_NO_PAD.decode(cursor) { 160 + String::from_utf8(decoded)? 161 + } else { 162 + // Fallback to plain text for backward compatibility 163 + cursor.to_string() 164 + }; 165 + 166 + let parts: Vec<&str> = cursor_content.split("::").collect(); 167 + if parts.len() != 3 { 168 + return Err("Invalid cursor format".into()); 169 + } 170 + 171 + let sort_value = parts[0].to_string(); 172 + let indexed_at = parts[1].parse::<chrono::DateTime<chrono::Utc>>()?; 173 + let cid = parts[2].to_string(); 174 + 175 + Ok(ParsedCursor { 176 + sort_value, 177 + indexed_at, 178 + cid, 179 + }) 180 + } 181 + 182 + fn generate_cursor(sort_value: &str, indexed_at: chrono::DateTime<chrono::Utc>, cid: &str) -> String { 183 + let cursor_content = format!("{}::{}::{}", sort_value, indexed_at.to_rfc3339(), cid); 184 + general_purpose::URL_SAFE_NO_PAD.encode(cursor_content) 185 + } 186 + 187 + // Extract the primary sort field from sort parameter for cursor generation 188 + fn get_primary_sort_field(sort: Option<&str>) -> String { 189 + match sort { 190 + Some(sort_str) => { 191 + // Get the first sort field (primary) 192 + let first_sort = sort_str.split(',').next().unwrap_or("indexed_at"); 193 + let parts: Vec<&str> = first_sort.trim().split(':').collect(); 194 + parts[0].trim().to_string() 195 + } 196 + None => "indexed_at".to_string(), 197 + } 198 + } 199 + 200 + // Check if the primary sort field is descending 201 + fn is_primary_sort_desc(sort: Option<&str>) -> bool { 202 + match sort { 203 + Some(sort_str) => { 204 + let first_sort = sort_str.split(',').next().unwrap_or("indexed_at:desc"); 205 + let parts: Vec<&str> = first_sort.trim().split(':').collect(); 206 + if parts.len() > 1 { 207 + parts[1].trim().to_lowercase() == "desc" 208 + } else { 209 + true // Default to DESC 210 + } 211 + } 212 + None => true, // Default sort is DESC 213 + } 214 + } 215 + 216 + // Build cursor WHERE clause for compound cursor filtering 217 + fn build_cursor_where_clause(_parsed_cursor: &ParsedCursor, sort_field: &str, is_desc: bool) -> String { 218 + // For compound cursor filtering, we use tuple comparison: 219 + // WHERE (sort_field, indexed_at, cid) < (cursor_sort, cursor_indexed_at, cursor_cid) for DESC 220 + // WHERE (sort_field, indexed_at, cid) > (cursor_sort, cursor_indexed_at, cursor_cid) for ASC 221 + 222 + let comparison_op = if is_desc { "<" } else { ">" }; 223 + 224 + // Handle different field types for the sort field comparison 225 + let sort_field_expr = if sort_field == "indexed_at" { 226 + sort_field.to_string() 227 + } else { 228 + // For JSON fields, cast to text for comparison 229 + format!("json->>'{}'", sort_field) 230 + }; 231 + 232 + // Handle NULL values in cursor comparison 233 + if comparison_op == "<" { 234 + // For DESC ordering, we want records where: 235 + // 1. sort_field is NULL and cursor is not NULL (NULLs come first in DESC) 236 + // 2. OR both are not NULL and tuple comparison 237 + format!( 238 + "AND (({} IS NULL AND $1 != 'NULL') OR ({} IS NOT NULL AND $1 != 'NULL' AND ({}, indexed_at, cid) {} ($1, $2::timestamptz, $3)))", 239 + sort_field_expr, sort_field_expr, sort_field_expr, comparison_op 240 + ) 241 + } else { 242 + // For ASC ordering, standard tuple comparison works better 243 + format!( 244 + "AND ({}, indexed_at, cid) {} ($1, $2::timestamptz, $3)", 245 + sort_field_expr, comparison_op 246 + ) 247 + } 248 + } 249 + 250 + // Generate cursor from record and sort parameters 251 + fn generate_cursor_from_record(record: &Record, sort: Option<&str>) -> String { 252 + let primary_sort_field = get_primary_sort_field(sort); 253 + 254 + // Extract sort value from the record based on the sort field 255 + let sort_value = match primary_sort_field.as_str() { 256 + "indexed_at" => record.indexed_at.to_rfc3339(), 257 + field => { 258 + // Extract field value from JSON 259 + record.json.get(field) 260 + .and_then(|v| match v { 261 + serde_json::Value::String(s) if !s.is_empty() => Some(s.clone()), 262 + serde_json::Value::Number(n) => Some(n.to_string()), 263 + serde_json::Value::Bool(b) => Some(b.to_string()), 264 + serde_json::Value::Null => None, // Explicitly handle null 265 + _ => None, 266 + }) 267 + .unwrap_or_else(|| "NULL".to_string()) // Use "NULL" string for null values to match SQL NULLS LAST behavior 268 + } 269 + }; 270 + 271 + generate_cursor(&sort_value, record.indexed_at, &record.cid) 272 + } 273 + 6 274 #[derive(Clone)] 7 275 pub struct Database { 8 276 pool: PgPool, ··· 324 592 query: &str, 325 593 field: Option<&str>, 326 594 limit: Option<i32>, 327 - cursor: Option<&str> 328 - ) -> Result<Vec<Record>, DatabaseError> { 595 + cursor: Option<&str>, 596 + sort: Option<&str> 597 + ) -> Result<(Vec<Record>, Option<String>), DatabaseError> { 329 598 let limit = limit.unwrap_or(50).min(100); // Cap at 100 330 599 331 600 // For lexicon collection, we filter by slice directly 332 601 if collection == "social.slices.lexicon" { 602 + let order_by = parse_sort_parameter(sort); // Use simple parsing for lexicon collections 333 603 let records = match (cursor, field) { 334 - (Some(cursor_time), Some(field_name)) => { 335 - let cursor_dt = cursor_time.parse::<chrono::DateTime<chrono::Utc>>() 336 - .unwrap_or_else(|_| chrono::Utc::now()); 337 - sqlx::query_as!( 338 - Record, 339 - r#" 340 - SELECT uri, cid, did, collection, json, "indexed_at" as indexed_at 341 - FROM record 342 - WHERE collection = $1 AND json->>'slice' = $2 343 - AND json->>$3 ILIKE '%' || $4 || '%' 344 - AND "indexed_at" < $5 345 - ORDER BY "indexed_at" DESC 346 - LIMIT $6 347 - "#, 348 - collection, 349 - slice_uri, 350 - field_name, 351 - query, 352 - cursor_dt, 353 - limit as i64 354 - ) 355 - .fetch_all(&self.pool) 356 - .await? 604 + (Some(cursor_str), Some(field_name)) => { 605 + // Try to parse compound cursor, fallback to old cursor format 606 + if let Ok(parsed_cursor) = parse_cursor(cursor_str) { 607 + let primary_sort_field = get_primary_sort_field(sort); 608 + let is_desc = is_primary_sort_desc(sort); 609 + let cursor_where_clause = build_cursor_where_clause(&parsed_cursor, &primary_sort_field, is_desc); 610 + 611 + let query_sql = format!( 612 + "SELECT uri, cid, did, collection, json, indexed_at FROM record WHERE collection = $4 AND json->>'slice' = $5 AND json->>$6 ILIKE '%' || $7 || '%' {} ORDER BY {} LIMIT $8", 613 + cursor_where_clause, order_by 614 + ); 615 + sqlx::query_as::<_, Record>(&query_sql) 616 + .bind(&parsed_cursor.sort_value) // $1 617 + .bind(&parsed_cursor.indexed_at) // $2 618 + .bind(&parsed_cursor.cid) // $3 619 + .bind(collection) // $4 620 + .bind(slice_uri) // $5 621 + .bind(field_name) // $6 622 + .bind(query) // $7 623 + .bind(limit as i64) // $8 624 + .fetch_all(&self.pool) 625 + .await? 626 + } else { 627 + // Fallback to old cursor format (indexed_at only) 628 + let cursor_dt = cursor_str.parse::<chrono::DateTime<chrono::Utc>>() 629 + .unwrap_or_else(|_| chrono::Utc::now()); 630 + let query_sql = format!( 631 + "SELECT uri, cid, did, collection, json, indexed_at FROM record WHERE collection = $1 AND json->>'slice' = $2 AND json->>$3 ILIKE '%' || $4 || '%' AND indexed_at < $5 ORDER BY {} LIMIT $6", 632 + order_by 633 + ); 634 + sqlx::query_as::<_, Record>(&query_sql) 635 + .bind(collection) 636 + .bind(slice_uri) 637 + .bind(field_name) 638 + .bind(query) 639 + .bind(cursor_dt) 640 + .bind(limit as i64) 641 + .fetch_all(&self.pool) 642 + .await? 643 + } 357 644 }, 358 645 (Some(cursor_time), None) => { 359 646 let cursor_dt = cursor_time.parse::<chrono::DateTime<chrono::Utc>>() 360 647 .unwrap_or_else(|_| chrono::Utc::now()); 361 - sqlx::query_as!( 362 - Record, 363 - r#" 364 - SELECT uri, cid, did, collection, json, "indexed_at" as indexed_at 365 - FROM record 366 - WHERE collection = $1 AND json->>'slice' = $2 367 - AND json::text ILIKE '%' || $3 || '%' 368 - AND "indexed_at" < $4 369 - ORDER BY "indexed_at" DESC 370 - LIMIT $5 371 - "#, 372 - collection, 373 - slice_uri, 374 - query, 375 - cursor_dt, 376 - limit as i64 377 - ) 378 - .fetch_all(&self.pool) 379 - .await? 648 + let query_sql = format!( 649 + "SELECT uri, cid, did, collection, json, indexed_at FROM record WHERE collection = $1 AND json->>'slice' = $2 AND json::text ILIKE '%' || $3 || '%' AND indexed_at < $4 ORDER BY {} LIMIT $5", 650 + order_by 651 + ); 652 + sqlx::query_as::<_, Record>(&query_sql) 653 + .bind(collection) 654 + .bind(slice_uri) 655 + .bind(query) 656 + .bind(cursor_dt) 657 + .bind(limit as i64) 658 + .fetch_all(&self.pool) 659 + .await? 380 660 }, 381 661 (None, Some(field_name)) => { 382 - sqlx::query_as!( 383 - Record, 384 - r#" 385 - SELECT uri, cid, did, collection, json, "indexed_at" as indexed_at 386 - FROM record 387 - WHERE collection = $1 AND json->>'slice' = $2 388 - AND json->>$3 ILIKE '%' || $4 || '%' 389 - ORDER BY "indexed_at" DESC 390 - LIMIT $5 391 - "#, 392 - collection, 393 - slice_uri, 394 - field_name, 395 - query, 396 - limit as i64 397 - ) 398 - .fetch_all(&self.pool) 399 - .await? 662 + let query_sql = format!( 663 + "SELECT uri, cid, did, collection, json, indexed_at FROM record WHERE collection = $1 AND json->>'slice' = $2 AND json->>$3 ILIKE '%' || $4 || '%' ORDER BY {} LIMIT $5", 664 + order_by 665 + ); 666 + sqlx::query_as::<_, Record>(&query_sql) 667 + .bind(collection) 668 + .bind(slice_uri) 669 + .bind(field_name) 670 + .bind(query) 671 + .bind(limit as i64) 672 + .fetch_all(&self.pool) 673 + .await? 400 674 }, 401 675 (None, None) => { 402 - sqlx::query_as!( 403 - Record, 404 - r#" 405 - SELECT uri, cid, did, collection, json, "indexed_at" as indexed_at 406 - FROM record 407 - WHERE collection = $1 AND json->>'slice' = $2 408 - AND json::text ILIKE '%' || $3 || '%' 409 - ORDER BY "indexed_at" DESC 410 - LIMIT $4 411 - "#, 412 - collection, 413 - slice_uri, 414 - query, 415 - limit as i64 416 - ) 417 - .fetch_all(&self.pool) 418 - .await? 676 + let query_sql = format!( 677 + "SELECT uri, cid, did, collection, json, indexed_at FROM record WHERE collection = $1 AND json->>'slice' = $2 AND json::text ILIKE '%' || $3 || '%' ORDER BY {} LIMIT $4", 678 + order_by 679 + ); 680 + sqlx::query_as::<_, Record>(&query_sql) 681 + .bind(collection) 682 + .bind(slice_uri) 683 + .bind(query) 684 + .bind(limit as i64) 685 + .fetch_all(&self.pool) 686 + .await? 419 687 }, 420 688 }; 421 - return Ok(records); 689 + let cursor = if records.is_empty() { 690 + None 691 + } else { 692 + records.last().map(|record| generate_cursor_from_record(record, sort)) 693 + }; 694 + return Ok((records, cursor)); 422 695 } else { 423 696 // For other collections, verify the collection belongs to this slice's lexicons 424 697 let collection_exists = sqlx::query!( ··· 438 711 439 712 if collection_exists.is_none() { 440 713 // Collection not found in this slice's lexicons 441 - return Ok(vec![]); 714 + return Ok((vec![], None)); 442 715 } 443 716 } 717 + 718 + // Get lexicon-aware ORDER BY clause for non-lexicon collections 719 + let order_by = parse_sort_parameter_with_lexicon(&self.pool, slice_uri, collection, sort).await; 444 720 445 721 // Now search the records with cursor-based pagination 446 722 let records = match (cursor, field) { 447 723 (Some(cursor_time), Some(field_name)) => { 448 724 let cursor_dt = cursor_time.parse::<chrono::DateTime<chrono::Utc>>() 449 725 .unwrap_or_else(|_| chrono::Utc::now()); 450 - sqlx::query_as!( 451 - Record, 452 - r#" 453 - SELECT uri, cid, did, collection, json, "indexed_at" as indexed_at 454 - FROM record 455 - WHERE collection = $1 AND json->>$2 ILIKE '%' || $3 || '%' AND "indexed_at" < $4 456 - ORDER BY "indexed_at" DESC 457 - LIMIT $5 458 - "#, 459 - collection, 460 - field_name, 461 - query, 462 - cursor_dt, 463 - limit as i64 464 - ) 465 - .fetch_all(&self.pool) 466 - .await? 726 + let query_sql = format!( 727 + "SELECT uri, cid, did, collection, json, indexed_at FROM record WHERE collection = $1 AND json->>$2 ILIKE '%' || $3 || '%' AND indexed_at < $4 ORDER BY {} LIMIT $5", 728 + order_by 729 + ); 730 + sqlx::query_as::<_, Record>(&query_sql) 731 + .bind(collection) 732 + .bind(field_name) 733 + .bind(query) 734 + .bind(cursor_dt) 735 + .bind(limit as i64) 736 + .fetch_all(&self.pool) 737 + .await? 467 738 }, 468 739 (Some(cursor_time), None) => { 469 740 let cursor_dt = cursor_time.parse::<chrono::DateTime<chrono::Utc>>() 470 741 .unwrap_or_else(|_| chrono::Utc::now()); 471 - sqlx::query_as!( 472 - Record, 473 - r#" 474 - SELECT uri, cid, did, collection, json, "indexed_at" as indexed_at 475 - FROM record 476 - WHERE collection = $1 AND json::text ILIKE '%' || $2 || '%' AND "indexed_at" < $3 477 - ORDER BY "indexed_at" DESC 478 - LIMIT $4 479 - "#, 480 - collection, 481 - query, 482 - cursor_dt, 483 - limit as i64 484 - ) 485 - .fetch_all(&self.pool) 486 - .await? 742 + let query_sql = format!( 743 + "SELECT uri, cid, did, collection, json, indexed_at FROM record WHERE collection = $1 AND json::text ILIKE '%' || $2 || '%' AND indexed_at < $3 ORDER BY {} LIMIT $4", 744 + order_by 745 + ); 746 + sqlx::query_as::<_, Record>(&query_sql) 747 + .bind(collection) 748 + .bind(query) 749 + .bind(cursor_dt) 750 + .bind(limit as i64) 751 + .fetch_all(&self.pool) 752 + .await? 487 753 }, 488 754 (None, Some(field_name)) => { 489 - sqlx::query_as!( 490 - Record, 491 - r#" 492 - SELECT uri, cid, did, collection, json, "indexed_at" as indexed_at 493 - FROM record 494 - WHERE collection = $1 AND json->>$2 ILIKE '%' || $3 || '%' 495 - ORDER BY "indexed_at" DESC 496 - LIMIT $4 497 - "#, 498 - collection, 499 - field_name, 500 - query, 501 - limit as i64 502 - ) 503 - .fetch_all(&self.pool) 504 - .await? 755 + let query_sql = format!( 756 + "SELECT uri, cid, did, collection, json, indexed_at FROM record WHERE collection = $1 AND json->>$2 ILIKE '%' || $3 || '%' ORDER BY {} LIMIT $4", 757 + order_by 758 + ); 759 + sqlx::query_as::<_, Record>(&query_sql) 760 + .bind(collection) 761 + .bind(field_name) 762 + .bind(query) 763 + .bind(limit as i64) 764 + .fetch_all(&self.pool) 765 + .await? 505 766 }, 506 767 (None, None) => { 507 - sqlx::query_as!( 508 - Record, 509 - r#" 510 - SELECT uri, cid, did, collection, json, "indexed_at" as indexed_at 511 - FROM record 512 - WHERE collection = $1 AND json::text ILIKE '%' || $2 || '%' 513 - ORDER BY "indexed_at" DESC 514 - LIMIT $3 515 - "#, 516 - collection, 517 - query, 518 - limit as i64 519 - ) 520 - .fetch_all(&self.pool) 521 - .await? 768 + let query_sql = format!( 769 + "SELECT uri, cid, did, collection, json, indexed_at FROM record WHERE collection = $1 AND json::text ILIKE '%' || $2 || '%' ORDER BY {} LIMIT $3", 770 + order_by 771 + ); 772 + sqlx::query_as::<_, Record>(&query_sql) 773 + .bind(collection) 774 + .bind(query) 775 + .bind(limit as i64) 776 + .fetch_all(&self.pool) 777 + .await? 522 778 }, 523 779 }; 524 780 525 - Ok(records) 781 + // Generate cursor from the last record if there are any records 782 + let cursor = if records.is_empty() { 783 + None 784 + } else { 785 + records.last().map(|record| generate_cursor_from_record(record, sort)) 786 + }; 787 + 788 + Ok((records, cursor)) 526 789 } 527 790 528 791 pub async fn get_slice_collection_records( ··· 531 794 collection: &str, 532 795 repo: Option<&str>, 533 796 limit: Option<i32>, 534 - cursor: Option<&str> 535 - ) -> Result<Vec<Record>, DatabaseError> { 797 + cursor: Option<&str>, 798 + sort: Option<&str> 799 + ) -> Result<(Vec<Record>, Option<String>), DatabaseError> { 536 800 let limit = limit.unwrap_or(50).min(100); // Cap at 100 537 801 538 802 // For lexicon collection, we filter by slice directly 539 803 if collection == "social.slices.lexicon" { 804 + let order_by = parse_sort_parameter(sort); // Use simple parsing for lexicon collections 540 805 // For lexicon records, filter by slice 541 806 let records = match (cursor, repo) { 542 807 (Some(cursor_time), Some(repo_did)) => { ··· 581 846 .await? 582 847 }, 583 848 (None, Some(repo_did)) => { 584 - sqlx::query_as!( 585 - Record, 849 + let sql = format!( 586 850 r#" 587 - SELECT uri, cid, did, collection, json, "indexed_at" as indexed_at 851 + SELECT uri, cid, did, collection, json, indexed_at 588 852 FROM record 589 853 WHERE collection = $1 AND json->>'slice' = $2 AND did = $3 590 - ORDER BY "indexed_at" DESC 854 + ORDER BY {} 591 855 LIMIT $4 592 856 "#, 593 - collection, 594 - slice_uri, 595 - repo_did, 596 - limit as i64 597 - ) 598 - .fetch_all(&self.pool) 599 - .await? 857 + order_by 858 + ); 859 + sqlx::query_as::<_, Record>(&sql) 860 + .bind(collection) 861 + .bind(slice_uri) 862 + .bind(repo_did) 863 + .bind(limit as i64) 864 + .fetch_all(&self.pool) 865 + .await? 600 866 }, 601 867 (None, None) => { 602 - sqlx::query_as!( 603 - Record, 868 + let sql = format!( 604 869 r#" 605 - SELECT uri, cid, did, collection, json, "indexed_at" as indexed_at 870 + SELECT uri, cid, did, collection, json, indexed_at 606 871 FROM record 607 872 WHERE collection = $1 AND json->>'slice' = $2 608 - ORDER BY "indexed_at" DESC 873 + ORDER BY {} 609 874 LIMIT $3 610 875 "#, 611 - collection, 612 - slice_uri, 613 - limit as i64 614 - ) 615 - .fetch_all(&self.pool) 616 - .await? 876 + order_by 877 + ); 878 + sqlx::query_as::<_, Record>(&sql) 879 + .bind(collection) 880 + .bind(slice_uri) 881 + .bind(limit as i64) 882 + .fetch_all(&self.pool) 883 + .await? 617 884 }, 618 885 }; 619 - return Ok(records); 886 + let cursor = if records.is_empty() { 887 + None 888 + } else { 889 + records.last().map(|record| generate_cursor_from_record(record, sort)) 890 + }; 891 + return Ok((records, cursor)); 620 892 } else { 621 893 // For other collections, verify the collection belongs to this slice's lexicons 622 894 let collection_exists = sqlx::query!( ··· 636 908 637 909 if collection_exists.is_none() { 638 910 // Collection not found in this slice's lexicons 639 - return Ok(vec![]); 911 + return Ok((vec![], None)); 640 912 } 641 913 } 642 914 915 + // Get lexicon-aware ORDER BY clause for non-lexicon collections 916 + let order_by = parse_sort_parameter_with_lexicon(&self.pool, slice_uri, collection, sort).await; 917 + 643 918 // Now fetch the records with cursor-based pagination 644 919 let records = match (cursor, repo) { 645 920 (Some(cursor_time), Some(repo_did)) => { 646 921 let cursor_dt = cursor_time.parse::<chrono::DateTime<chrono::Utc>>() 647 922 .unwrap_or_else(|_| chrono::Utc::now()); 648 - sqlx::query_as!( 649 - Record, 650 - r#" 651 - SELECT uri, cid, did, collection, json, "indexed_at" as indexed_at 652 - FROM record 653 - WHERE collection = $1 AND "indexed_at" < $2 AND did = $3 654 - ORDER BY "indexed_at" DESC 655 - LIMIT $4 656 - "#, 657 - collection, 658 - cursor_dt, 659 - repo_did, 660 - limit as i64 661 - ) 662 - .fetch_all(&self.pool) 663 - .await? 923 + let query = format!( 924 + "SELECT uri, cid, did, collection, json, indexed_at FROM record WHERE collection = $1 AND indexed_at < $2 AND did = $3 ORDER BY {} LIMIT $4", 925 + order_by 926 + ); 927 + sqlx::query_as::<_, Record>(&query) 928 + .bind(collection) 929 + .bind(cursor_dt) 930 + .bind(repo_did) 931 + .bind(limit as i64) 932 + .fetch_all(&self.pool) 933 + .await? 664 934 }, 665 935 (Some(cursor_time), None) => { 666 936 let cursor_dt = cursor_time.parse::<chrono::DateTime<chrono::Utc>>() 667 937 .unwrap_or_else(|_| chrono::Utc::now()); 668 - sqlx::query_as!( 669 - Record, 670 - r#" 671 - SELECT uri, cid, did, collection, json, "indexed_at" as indexed_at 672 - FROM record 673 - WHERE collection = $1 AND "indexed_at" < $2 674 - ORDER BY "indexed_at" DESC 675 - LIMIT $3 676 - "#, 677 - collection, 678 - cursor_dt, 679 - limit as i64 680 - ) 681 - .fetch_all(&self.pool) 682 - .await? 938 + let query = format!( 939 + "SELECT uri, cid, did, collection, json, indexed_at FROM record WHERE collection = $1 AND indexed_at < $2 ORDER BY {} LIMIT $3", 940 + order_by 941 + ); 942 + sqlx::query_as::<_, Record>(&query) 943 + .bind(collection) 944 + .bind(cursor_dt) 945 + .bind(limit as i64) 946 + .fetch_all(&self.pool) 947 + .await? 683 948 }, 684 949 (None, Some(repo_did)) => { 685 - sqlx::query_as!( 686 - Record, 687 - r#" 688 - SELECT uri, cid, did, collection, json, "indexed_at" as indexed_at 689 - FROM record 690 - WHERE collection = $1 AND did = $2 691 - ORDER BY "indexed_at" DESC 692 - LIMIT $3 693 - "#, 694 - collection, 695 - repo_did, 696 - limit as i64 697 - ) 698 - .fetch_all(&self.pool) 699 - .await? 950 + let query = format!( 951 + "SELECT uri, cid, did, collection, json, indexed_at FROM record WHERE collection = $1 AND did = $2 ORDER BY {} LIMIT $3", 952 + order_by 953 + ); 954 + sqlx::query_as::<_, Record>(&query) 955 + .bind(collection) 956 + .bind(repo_did) 957 + .bind(limit as i64) 958 + .fetch_all(&self.pool) 959 + .await? 700 960 }, 701 961 (None, None) => { 702 - sqlx::query_as!( 703 - Record, 704 - r#" 705 - SELECT uri, cid, did, collection, json, "indexed_at" as indexed_at 706 - FROM record 707 - WHERE collection = $1 708 - ORDER BY "indexed_at" DESC 709 - LIMIT $2 710 - "#, 711 - collection, 712 - limit as i64 713 - ) 714 - .fetch_all(&self.pool) 715 - .await? 962 + let query = format!( 963 + "SELECT uri, cid, did, collection, json, indexed_at FROM record WHERE collection = $1 ORDER BY {} LIMIT $2", 964 + order_by 965 + ); 966 + sqlx::query_as::<_, Record>(&query) 967 + .bind(collection) 968 + .bind(limit as i64) 969 + .fetch_all(&self.pool) 970 + .await? 716 971 }, 717 972 }; 718 973 719 - Ok(records) 974 + // Generate cursor from the last record if there are any records 975 + let cursor = if records.is_empty() { 976 + None 977 + } else { 978 + records.last().map(|record| generate_cursor_from_record(record, sort)) 979 + }; 980 + 981 + Ok((records, cursor)) 720 982 } 721 983 722 984 }
+3 -3
api/src/handler_records.rs
··· 22 22 } 23 23 24 24 async fn get_slice_collection_records(state: &AppState, params: &SliceRecordsParams) -> Result<SliceRecordsOutput, Box<dyn std::error::Error + Send + Sync>> { 25 - let records = state.database.get_slice_collection_records( 25 + let (records, cursor) = state.database.get_slice_collection_records( 26 26 &params.slice, 27 27 &params.collection, 28 28 params.repo.as_deref(), 29 29 params.limit, 30 30 params.cursor.as_deref(), 31 + None, // No sort parameter for this endpoint yet 31 32 ).await?; 32 33 33 34 // Transform Record to IndexedRecord for the response ··· 40 41 indexed_at: record.indexed_at.to_rfc3339(), 41 42 }).collect(); 42 43 43 - // Use the last record's indexed_at as cursor for pagination 44 - let cursor = indexed_records.last().map(|r| r.indexed_at.clone()); 44 + // Cursor is now generated by the database layer 45 45 46 46 Ok(SliceRecordsOutput { 47 47 success: true,
+8 -4
api/src/handler_xrpc_dynamic.rs
··· 17 17 pub limit: Option<i32>, 18 18 pub cursor: Option<String>, 19 19 pub slice: String, 20 + pub sort: Option<String>, 20 21 } 21 22 22 23 #[derive(Deserialize)] ··· 32 33 pub field: Option<String>, 33 34 pub limit: Option<i32>, 34 35 pub cursor: Option<String>, 36 + pub sort: Option<String>, 35 37 } 36 38 37 39 ··· 97 99 dynamic_params.author.as_deref(), 98 100 dynamic_params.limit, 99 101 dynamic_params.cursor.as_deref(), 102 + dynamic_params.sort.as_deref(), 100 103 ).await { 101 - Ok(records) => { 104 + Ok((records, cursor)) => { 102 105 let indexed_records: Vec<crate::models::IndexedRecord> = records.into_iter().map(|record| crate::models::IndexedRecord { 103 106 uri: record.uri, 104 107 cid: record.cid, ··· 110 113 111 114 let output = ListRecordsOutput { 112 115 records: indexed_records, 113 - cursor: None, // TODO: implement cursor pagination 116 + cursor 114 117 }; 115 118 let json_value = serde_json::to_value(output) 116 119 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; ··· 137 140 search_params.field.as_deref(), 138 141 search_params.limit, 139 142 search_params.cursor.as_deref(), 143 + search_params.sort.as_deref(), 140 144 ).await { 141 - Ok(records) => { 145 + Ok((records, cursor)) => { 142 146 let indexed_records: Vec<crate::models::IndexedRecord> = records.into_iter().map(|record| crate::models::IndexedRecord { 143 147 uri: record.uri, 144 148 cid: record.cid, ··· 150 154 151 155 let output = ListRecordsOutput { 152 156 records: indexed_records, 153 - cursor: None, // TODO: implement cursor pagination 157 + cursor 154 158 }; 155 159 let json_value = serde_json::to_value(output) 156 160 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;