···44use crate::errors::DatabaseError;
55use crate::models::{Actor, CollectionStats, IndexedRecord, Record};
6677-// Helper function to get field type from lexicon definition
77+// Helper function to get field type from lexicon definition
88async fn get_field_type_from_lexicon(
99- pool: &sqlx::PgPool,
1010- slice_uri: &str,
1111- collection: &str,
99+ pool: &sqlx::PgPool,
1010+ slice_uri: &str,
1111+ collection: &str,
1212 field: &str
1313) -> Option<String> {
1414 let lexicon_query = sqlx::query!(
1515 r#"
1616 SELECT json->>'definitions' as definitions
1717- FROM record
1717+ FROM record
1818 WHERE collection = 'social.slices.lexicon'
1919- AND json->>'slice' = $1
1919+ AND json->>'slice' = $1
2020 AND json->>'nsid' = $2
2121 AND (json->>'definitions')::jsonb->'main'->>'type' = 'record'
2222 LIMIT 1
···2424 slice_uri,
2525 collection
2626 );
2727-2727+2828 if let Ok(Some(row)) = lexicon_query.fetch_optional(pool).await {
2929 if let Some(definitions_str) = &row.definitions {
3030 if let Ok(definitions) = serde_json::from_str::<serde_json::Value>(definitions_str) {
···3333 .get("record")?
3434 .get("properties")?
3535 .get(field)?;
3636-3636+3737 // Check if it's a datetime field
3838 if let Some(field_type) = field_def.get("type").and_then(|t| t.as_str()) {
3939 if field_type == "string" {
···5454// Async helper function to parse sort parameter with lexicon type information
5555async fn parse_sort_parameter_with_lexicon(
5656 pool: &sqlx::PgPool,
5757- slice_uri: &str,
5757+ slice_uri: &str,
5858 collection: &str,
5959 sort: Option<&str>
6060) -> String {
···6969 "desc" => "DESC",
7070 _ => "ASC", // Default to ASC for any invalid direction
7171 };
7272-7272+7373 // Validate field name to prevent SQL injection
7474 if field.chars().all(|c| c.is_alphanumeric() || c == '_') {
7575 if field == "indexed_at" {
···7777 } else {
7878 // Get field type from lexicon
7979 let field_type = get_field_type_from_lexicon(pool, slice_uri, collection, field).await;
8080-8080+8181 if field_type == Some("datetime".to_string()) {
8282 // For datetime fields, use safe casting that handles invalid dates
8383 // This will cast valid dates and return NULL for invalid ones
···118118 "desc" => "DESC",
119119 _ => "ASC", // Default to ASC for any invalid direction
120120 };
121121-121121+122122 // Validate field name to prevent SQL injection
123123 if field.chars().all(|c| c.is_alphanumeric() || c == '_') {
124124 if field == "indexed_at" {
···162162 // Fallback to plain text for backward compatibility
163163 cursor.to_string()
164164 };
165165-165165+166166 let parts: Vec<&str> = cursor_content.split("::").collect();
167167 if parts.len() != 3 {
168168 return Err("Invalid cursor format".into());
169169 }
170170-170170+171171 let sort_value = parts[0].to_string();
172172 let indexed_at = parts[1].parse::<chrono::DateTime<chrono::Utc>>()?;
173173 let cid = parts[2].to_string();
174174-174174+175175 Ok(ParsedCursor {
176176 sort_value,
177177 indexed_at,
···218218 // For compound cursor filtering, we use tuple comparison:
219219 // WHERE (sort_field, indexed_at, cid) < (cursor_sort, cursor_indexed_at, cursor_cid) for DESC
220220 // WHERE (sort_field, indexed_at, cid) > (cursor_sort, cursor_indexed_at, cursor_cid) for ASC
221221-221221+222222 let comparison_op = if is_desc { "<" } else { ">" };
223223-223223+224224 // Handle different field types for the sort field comparison
225225 let sort_field_expr = if sort_field == "indexed_at" {
226226 sort_field.to_string()
···228228 // For JSON fields, cast to text for comparison
229229 format!("json->>'{}'", sort_field)
230230 };
231231-231231+232232 // Handle NULL values in cursor comparison
233233 if comparison_op == "<" {
234234 // For DESC ordering, we want records where:
···250250// Generate cursor from record and sort parameters
251251fn generate_cursor_from_record(record: &Record, sort: Option<&str>) -> String {
252252 let primary_sort_field = get_primary_sort_field(sort);
253253-253253+254254 // Extract sort value from the record based on the sort field
255255 let sort_value = match primary_sort_field.as_str() {
256256 "indexed_at" => record.indexed_at.to_rfc3339(),
···267267 .unwrap_or_else(|| "NULL".to_string()) // Use "NULL" string for null values to match SQL NULLS LAST behavior
268268 }
269269 };
270270-270270+271271 generate_cursor(&sort_value, record.indexed_at, &record.cid)
272272}
273273···339339 Ok(())
340340 }
341341342342- pub async fn get_existing_record_cids(&self, did: &str, collection: &str) -> Result<std::collections::HashMap<String, String>, DatabaseError> {
342342+ pub async fn get_existing_record_cids_for_slice(&self, did: &str, collection: &str, slice_uri: &str) -> Result<std::collections::HashMap<String, String>, DatabaseError> {
343343 let records = sqlx::query!(
344344 r#"SELECT "uri", "cid"
345345 FROM "record"
346346- WHERE "did" = $1 AND "collection" = $2"#,
346346+ WHERE "did" = $1 AND "collection" = $2 AND "slice_uri" = $3"#,
347347 did,
348348- collection
348348+ collection,
349349+ slice_uri
349350 )
350351 .fetch_all(&self.pool)
351352 .await?;
···489490 COUNT(DISTINCT r.did) as unique_actors
490491 FROM record r
491492 INNER JOIN slice_collections sc ON r.collection = sc.collection_nsid
493493+ WHERE r.slice_uri = $1
492494 GROUP BY r.collection
493495 ORDER BY r.collection
494496 "#,
···540542 SELECT COUNT(*) as count
541543 FROM record r
542544 INNER JOIN slice_collections sc ON r.collection = sc.collection_nsid
545545+ WHERE r.slice_uri = $1
543546 "#,
544547 slice_uri
545548 )
···710713 // For lexicon collection, we filter by slice directly
711714 if collection == "social.slices.lexicon" {
712715 let order_by = parse_sort_parameter(sort); // Use simple parsing for lexicon collections
713713-716716+714717 // Note: For lexicon searches, authors filtering is not commonly used as lexicons are typically not user-specific
715718 // But we include it for completeness
716719 let records = match (cursor, field) {
···720723 let primary_sort_field = get_primary_sort_field(sort);
721724 let is_desc = is_primary_sort_desc(sort);
722725 let cursor_where_clause = build_cursor_where_clause(&parsed_cursor, &primary_sort_field, is_desc);
723723-726726+724727 let query_sql = format!(
725728 "SELECT uri, cid, did, collection, json, indexed_at, slice_uri FROM record WHERE collection = $4 AND json->>'slice' = $5 AND json->>$6 ILIKE '%' || $7 || '%' {} ORDER BY {} LIMIT $8",
726729 cursor_where_clause, order_by
···837840 let cursor_dt = cursor_time.parse::<chrono::DateTime<chrono::Utc>>()
838841 .unwrap_or_else(|_| chrono::Utc::now());
839842 let query_sql = format!(
840840- "SELECT uri, cid, did, collection, json, indexed_at, slice_uri FROM record WHERE collection = $1 AND json->>$2 ILIKE '%' || $3 || '%' AND indexed_at < $4 ORDER BY {} LIMIT $5",
843843+ "SELECT uri, cid, did, collection, json, indexed_at, slice_uri FROM record WHERE collection = $1 AND slice_uri = $2 AND json->>$3 ILIKE '%' || $4 || '%' AND indexed_at < $5 ORDER BY {} LIMIT $6",
841844 order_by
842845 );
843846 sqlx::query_as::<_, Record>(&query_sql)
844847 .bind(collection)
848848+ .bind(slice_uri)
845849 .bind(field_name)
846850 .bind(query)
847851 .bind(cursor_dt)
···853857 let cursor_dt = cursor_time.parse::<chrono::DateTime<chrono::Utc>>()
854858 .unwrap_or_else(|_| chrono::Utc::now());
855859 let query_sql = format!(
856856- "SELECT uri, cid, did, collection, json, indexed_at, slice_uri FROM record WHERE collection = $1 AND json::text ILIKE '%' || $2 || '%' AND indexed_at < $3 ORDER BY {} LIMIT $4",
860860+ "SELECT uri, cid, did, collection, json, indexed_at, slice_uri FROM record WHERE collection = $1 AND slice_uri = $2 AND json::text ILIKE '%' || $3 || '%' AND indexed_at < $4 ORDER BY {} LIMIT $5",
857861 order_by
858862 );
859863 sqlx::query_as::<_, Record>(&query_sql)
860864 .bind(collection)
865865+ .bind(slice_uri)
861866 .bind(query)
862867 .bind(cursor_dt)
863868 .bind(limit as i64)
···866871 },
867872 (None, Some(field_name)) => {
868873 let query_sql = format!(
869869- "SELECT uri, cid, did, collection, json, indexed_at, slice_uri FROM record WHERE collection = $1 AND json->>$2 ILIKE '%' || $3 || '%' ORDER BY {} LIMIT $4",
874874+ "SELECT uri, cid, did, collection, json, indexed_at, slice_uri FROM record WHERE collection = $1 AND slice_uri = $2 AND json->>$3 ILIKE '%' || $4 || '%' ORDER BY {} LIMIT $5",
870875 order_by
871876 );
872877 sqlx::query_as::<_, Record>(&query_sql)
873878 .bind(collection)
879879+ .bind(slice_uri)
874880 .bind(field_name)
875881 .bind(query)
876882 .bind(limit as i64)
···879885 },
880886 (None, None) => {
881887 let query_sql = format!(
882882- "SELECT uri, cid, did, collection, json, indexed_at, slice_uri FROM record WHERE collection = $1 AND json::text ILIKE '%' || $2 || '%' ORDER BY {} LIMIT $3",
888888+ "SELECT uri, cid, did, collection, json, indexed_at, slice_uri FROM record WHERE collection = $1 AND slice_uri = $2 AND json::text ILIKE '%' || $3 || '%' ORDER BY {} LIMIT $4",
883889 order_by
884890 );
885891 sqlx::query_as::<_, Record>(&query_sql)
886892 .bind(collection)
893893+ .bind(slice_uri)
887894 .bind(query)
888895 .bind(limit as i64)
889896 .fetch_all(&self.pool)
···916923 // For lexicon collection, we filter by slice directly
917924 if collection == "social.slices.lexicon" {
918925 let order_by = parse_sort_parameter(sort); // Use simple parsing for lexicon collections
919919-920920-926926+927927+921928 // Determine the author list to use
922929 let author_list: Option<Vec<String>> = if let Some(authors_list) = authors {
923930 Some(authors_list.clone())
···10551062 let cursor_dt = cursor_time.parse::<chrono::DateTime<chrono::Utc>>()
10561063 .unwrap_or_else(|_| chrono::Utc::now());
10571064 let query = format!(
10581058- "SELECT uri, cid, did, collection, json, indexed_at, slice_uri FROM record WHERE collection = $1 AND indexed_at < $2 AND did = ANY($3) ORDER BY {} LIMIT $4",
10651065+ "SELECT uri, cid, did, collection, json, indexed_at, slice_uri FROM record WHERE collection = $1 AND slice_uri = $2 AND indexed_at < $3 AND did = ANY($4) ORDER BY {} LIMIT $5",
10591066 order_by
10601067 );
10611068 sqlx::query_as::<_, Record>(&query)
10621069 .bind(collection)
10701070+ .bind(slice_uri)
10631071 .bind(cursor_dt)
10641072 .bind(author_list)
10651073 .bind(limit as i64)
···10701078 let cursor_dt = cursor_time.parse::<chrono::DateTime<chrono::Utc>>()
10711079 .unwrap_or_else(|_| chrono::Utc::now());
10721080 let query = format!(
10731073- "SELECT uri, cid, did, collection, json, indexed_at, slice_uri FROM record WHERE collection = $1 AND indexed_at < $2 ORDER BY {} LIMIT $3",
10811081+ "SELECT uri, cid, did, collection, json, indexed_at, slice_uri FROM record WHERE collection = $1 AND slice_uri = $2 AND indexed_at < $3 ORDER BY {} LIMIT $4",
10741082 order_by
10751083 );
10761084 sqlx::query_as::<_, Record>(&query)
10771085 .bind(collection)
10861086+ .bind(slice_uri)
10781087 .bind(cursor_dt)
10791088 .bind(limit as i64)
10801089 .fetch_all(&self.pool)
···10821091 },
10831092 (None, Some(author_list)) => {
10841093 let query = format!(
10851085- "SELECT uri, cid, did, collection, json, indexed_at, slice_uri FROM record WHERE collection = $1 AND did = ANY($2) ORDER BY {} LIMIT $3",
10941094+ "SELECT uri, cid, did, collection, json, indexed_at, slice_uri FROM record WHERE collection = $1 AND slice_uri = $2 AND did = ANY($3) ORDER BY {} LIMIT $4",
10861095 order_by
10871096 );
10881097 sqlx::query_as::<_, Record>(&query)
10891098 .bind(collection)
10991099+ .bind(slice_uri)
10901100 .bind(author_list)
10911101 .bind(limit as i64)
10921102 .fetch_all(&self.pool)
···10941104 },
10951105 (None, None) => {
10961106 let query = format!(
10971097- "SELECT uri, cid, did, collection, json, indexed_at, slice_uri FROM record WHERE collection = $1 ORDER BY {} LIMIT $2",
11071107+ "SELECT uri, cid, did, collection, json, indexed_at, slice_uri FROM record WHERE collection = $1 AND slice_uri = $2 ORDER BY {} LIMIT $3",
10981108 order_by
10991109 );
11001110 sqlx::query_as::<_, Record>(&query)
11011111 .bind(collection)
11121112+ .bind(slice_uri)
11021113 .bind(limit as i64)
11031114 .fetch_all(&self.pool)
11041115 .await?
···11551166 "#)
11561167 .fetch_all(&self.pool)
11571168 .await?;
11581158-11691169+11591170 Ok(rows.into_iter().map(|(uri,)| uri).collect())
11601171 }
11611172···11691180 )
11701181 .fetch_all(&self.pool)
11711182 .await?;
11721172-11831183+11731184 Ok(rows.into_iter().map(|row| (row.did, row.slice_uri)).collect())
11741185 }
11751186···11851196 )
11861197 .fetch_optional(&self.pool)
11871198 .await?;
11881188-11991199+11891200 Ok(row.and_then(|r| r.domain))
11901201 }
11911202