Highly ambitious ATProtocol AppView service and sdks
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix log filtering by slice, clean up logs every 6 hrs

+137 -38
+63 -18
api/scripts/generate_typescript.ts
··· 192 192 193 193 ${usageExample} 194 194 195 - ${excludeSlicesClient 196 - ? 'import { SlicesClient, type RecordResponse, type GetRecordsResponse, type CountRecordsResponse, type GetRecordParams, type WhereCondition, type IndexedRecordFields, type SortField, type BlobRef } from "jsr:@slices/client@^0.1.0-alpha.2";' 197 - : 'import { SlicesClient, type RecordResponse, type GetRecordsResponse, type CountRecordsResponse, type GetRecordParams, type WhereCondition, type IndexedRecordFields, type SortField, type GetActorsParams, type GetActorsResponse, type BlobRef, type SliceLevelRecordsParams, type SliceRecordsOutput } from "jsr:@slices/client@^0.1.0-alpha.2";' 195 + ${ 196 + excludeSlicesClient 197 + ? 'import { SlicesClient, type RecordResponse, type GetRecordsResponse, type CountRecordsResponse, type GetRecordParams, type WhereCondition, type IndexedRecordFields, type SortField, type BlobRef } from "jsr:@slices/client@^0.1.0-alpha.3";' 198 + : 'import { SlicesClient, type RecordResponse, type GetRecordsResponse, type CountRecordsResponse, type GetRecordParams, type WhereCondition, type IndexedRecordFields, type SortField, type GetActorsParams, type GetActorsResponse, type BlobRef, type SliceLevelRecordsParams, type SliceRecordsOutput } from "jsr:@slices/client@^0.1.0-alpha.3";' 198 199 } 199 200 import { OAuthClient } from "jsr:@slices/oauth@^0.4.1"; 200 201 ··· 426 427 ], 427 428 }); 428 429 429 - 430 430 sourceFile.addInterface({ 431 431 name: "GetSparklinesParams", 432 432 isExported: true, ··· 442 442 isExported: true, 443 443 properties: [ 444 444 { name: "success", type: "boolean" }, 445 - { name: "sparklines", type: `Record<string, NetworkSlicesSliceDefs["SparklinePoint"][]>` }, 445 + { 446 + name: "sparklines", 447 + type: `Record<string, NetworkSlicesSliceDefs["SparklinePoint"][]>`, 448 + }, 446 449 { name: "message", type: "string", hasQuestionToken: true }, 447 450 ], 448 451 }); ··· 537 540 switch (type) { 538 541 case "string": 539 542 // For knownValues, return the type alias name 540 - if (def.knownValues && Array.isArray(def.knownValues) && def.knownValues.length > 0 && propertyName) { 543 + if ( 544 + def.knownValues && 545 + Array.isArray(def.knownValues) && 546 + def.knownValues.length > 0 && 547 + propertyName 548 + ) { 541 549 // Reference the generated type alias with namespace 542 550 const namespace = nsidToNamespace(currentLexicon); 543 551 return `${namespace}${defNameToPascalCase(propertyName)}`; ··· 744 752 745 753 if (defValue.properties) { 746 754 for (const [propName, propDef] of Object.entries(defValue.properties)) { 747 - const tsType = convertLexiconTypeToTypeScript(propDef as any, lexicon.id, propName); 755 + const tsType = convertLexiconTypeToTypeScript( 756 + propDef as any, 757 + lexicon.id, 758 + propName 759 + ); 748 760 const required = 749 761 defValue.required && defValue.required.includes(propName); 750 762 ··· 783 795 784 796 if (recordDef.properties) { 785 797 for (const [propName, propDef] of Object.entries(recordDef.properties)) { 786 - const tsType = convertLexiconTypeToTypeScript(propDef as any, lexicon.id, propName); 798 + const tsType = convertLexiconTypeToTypeScript( 799 + propDef as any, 800 + lexicon.id, 801 + propName 802 + ); 787 803 const required = isPropertyRequired(recordDef, propName); 788 804 789 805 properties.push({ ··· 880 896 if (lexicon.definitions && typeof lexicon.definitions === "object") { 881 897 for (const [defKey, defValue] of Object.entries(lexicon.definitions)) { 882 898 if (defValue.type === "record" && defValue.record?.properties) { 883 - for (const [propName, propDef] of Object.entries(defValue.record.properties)) { 899 + for (const [propName, propDef] of Object.entries( 900 + defValue.record.properties 901 + )) { 884 902 const prop = propDef as any; 885 - if (prop.type === "string" && prop.knownValues && Array.isArray(prop.knownValues) && prop.knownValues.length > 0) { 903 + if ( 904 + prop.type === "string" && 905 + prop.knownValues && 906 + Array.isArray(prop.knownValues) && 907 + prop.knownValues.length > 0 908 + ) { 886 909 // Generate a type alias for this property, namespaced by lexicon 887 910 const namespace = nsidToNamespace(lexicon.id); 888 911 const pascalPropName = defNameToPascalCase(propName); 889 912 const typeName = `${namespace}${pascalPropName}`; 890 913 891 - const knownValueTypes = prop.knownValues.map((value: string) => `'${value}'`).join('\n | '); 914 + const knownValueTypes = prop.knownValues 915 + .map((value: string) => `'${value}'`) 916 + .join("\n | "); 892 917 const typeDefinition = `${knownValueTypes}\n | (string & {})`; 893 918 894 919 sourceFile.addTypeAlias({ ··· 900 925 } 901 926 } 902 927 } else if (defValue.type === "object" && defValue.properties) { 903 - for (const [propName, propDef] of Object.entries(defValue.properties)) { 928 + for (const [propName, propDef] of Object.entries( 929 + defValue.properties 930 + )) { 904 931 const prop = propDef as any; 905 - if (prop.type === "string" && prop.knownValues && Array.isArray(prop.knownValues) && prop.knownValues.length > 0) { 932 + if ( 933 + prop.type === "string" && 934 + prop.knownValues && 935 + Array.isArray(prop.knownValues) && 936 + prop.knownValues.length > 0 937 + ) { 906 938 // Generate a type alias for this property, namespaced by lexicon 907 939 const namespace = nsidToNamespace(lexicon.id); 908 940 const pascalPropName = defNameToPascalCase(propName); 909 941 const typeName = `${namespace}${pascalPropName}`; 910 942 911 - const knownValueTypes = prop.knownValues.map((value: string) => `'${value}'`).join('\n | '); 943 + const knownValueTypes = prop.knownValues 944 + .map((value: string) => `'${value}'`) 945 + .join("\n | "); 912 946 const typeDefinition = `${knownValueTypes}\n | (string & {})`; 913 947 914 948 sourceFile.addTypeAlias({ ··· 922 956 } else if (defValue.type === "string") { 923 957 // Handle standalone string definitions with knownValues (like labelValue) 924 958 const stringDef = defValue as any; 925 - if (stringDef.knownValues && Array.isArray(stringDef.knownValues) && stringDef.knownValues.length > 0) { 959 + if ( 960 + stringDef.knownValues && 961 + Array.isArray(stringDef.knownValues) && 962 + stringDef.knownValues.length > 0 963 + ) { 926 964 // Generate a type alias for this definition, namespaced by lexicon 927 965 const namespace = nsidToNamespace(lexicon.id); 928 966 const typeName = `${namespace}${defNameToPascalCase(defKey)}`; 929 967 930 - const knownValueTypes = stringDef.knownValues.map((value: string) => `'${value}'`).join('\n | '); 968 + const knownValueTypes = stringDef.knownValues 969 + .map((value: string) => `'${value}'`) 970 + .join("\n | "); 931 971 const typeDefinition = `${knownValueTypes}\n | (string & {})`; 932 972 933 973 sourceFile.addTypeAlias({ ··· 1012 1052 case "string": 1013 1053 // Check if this is a string type with knownValues 1014 1054 const stringDef = defValue as any; 1015 - if (stringDef.knownValues && Array.isArray(stringDef.knownValues) && stringDef.knownValues.length > 0) { 1055 + if ( 1056 + stringDef.knownValues && 1057 + Array.isArray(stringDef.knownValues) && 1058 + stringDef.knownValues.length > 0 1059 + ) { 1016 1060 // This generates a type alias, reference it in the namespace with full name 1017 1061 namespaceProperties.push({ 1018 1062 name: defName, ··· 1453 1497 returnType: "Promise<GetJetstreamLogsResponse>", 1454 1498 isAsync: true, 1455 1499 statements: [ 1456 - `return await this.client.makeRequest<GetJetstreamLogsResponse>('network.slices.slice.getJetstreamLogs', 'GET', params);`, 1500 + `const requestParams = { ...params, slice: this.client.sliceUri };`, 1501 + `return await this.client.makeRequest<GetJetstreamLogsResponse>('network.slices.slice.getJetstreamLogs', 'GET', requestParams);`, 1457 1502 ], 1458 1503 }); 1459 1504
+11 -1
api/src/handler_logs.rs
··· 45 45 State(state): State<AppState>, 46 46 Query(params): Query<LogsQuery>, 47 47 ) -> Result<Json<LogsResponse>, StatusCode> { 48 + // Debug logging to see what slice filter is being used 49 + if let Some(slice) = &params.slice { 50 + tracing::info!("Filtering jetstream logs by slice: {}", slice); 51 + } else { 52 + tracing::info!("No slice filter applied - returning all jetstream logs"); 53 + } 54 + 48 55 match get_jetstream_logs(&state.database_pool, params.slice.as_deref(), params.limit).await { 49 - Ok(logs) => Ok(Json(LogsResponse { logs })), 56 + Ok(logs) => { 57 + tracing::info!("Returning {} jetstream logs", logs.len()); 58 + Ok(Json(LogsResponse { logs })) 59 + }, 50 60 Err(e) => { 51 61 tracing::error!("Failed to get jetstream logs: {}", e); 52 62 Err(StatusCode::INTERNAL_SERVER_ERROR)
+10 -10
api/src/jetstream.rs
··· 204 204 Err(fresh_e) => { 205 205 let message = format!("Validation failed for collection {} in slice {}", commit.collection, slice_uri); 206 206 error!("✗ {}: {}", message, fresh_e); 207 - Logger::global().log_jetstream(LogLevel::Warn, &message, Some(serde_json::json!({ 207 + Logger::global().log_jetstream_with_slice(LogLevel::Warn, &message, Some(serde_json::json!({ 208 208 "collection": commit.collection, 209 209 "slice_uri": slice_uri, 210 210 "did": did 211 - }))); 211 + })), Some(slice_uri)); 212 212 false 213 213 } 214 214 } ··· 250 250 format!("Record updated in {}", commit.collection) 251 251 }; 252 252 let operation = if is_insert { "insert" } else { "update" }; 253 - Logger::global().log_jetstream(LogLevel::Info, &message, Some(serde_json::json!({ 253 + Logger::global().log_jetstream_with_slice(LogLevel::Info, &message, Some(serde_json::json!({ 254 254 "operation": operation, 255 255 "collection": commit.collection, 256 256 "slice_uri": slice_uri, 257 257 "did": did, 258 258 "record_type": "primary" 259 - }))); 259 + })), Some(slice_uri)); 260 260 } 261 261 Err(e) => { 262 262 let message = "Failed to insert/update record"; 263 - Logger::global().log_jetstream(LogLevel::Error, message, Some(serde_json::json!({ 263 + Logger::global().log_jetstream_with_slice(LogLevel::Error, message, Some(serde_json::json!({ 264 264 "operation": "upsert", 265 265 "collection": commit.collection, 266 266 "slice_uri": slice_uri, 267 267 "did": did, 268 268 "error": e.to_string(), 269 269 "record_type": "primary" 270 - }))); 270 + })), Some(slice_uri)); 271 271 return Err(anyhow::anyhow!("Database error: {}", e)); 272 272 } 273 273 } ··· 300 300 format!("Record updated in {}", commit.collection) 301 301 }; 302 302 let operation = if is_insert { "insert" } else { "update" }; 303 - Logger::global().log_jetstream(LogLevel::Info, &message, Some(serde_json::json!({ 303 + Logger::global().log_jetstream_with_slice(LogLevel::Info, &message, Some(serde_json::json!({ 304 304 "operation": operation, 305 305 "collection": commit.collection, 306 306 "slice_uri": slice_uri, 307 307 "did": did, 308 308 "record_type": "external" 309 - }))); 309 + })), Some(slice_uri)); 310 310 } 311 311 Err(e) => { 312 312 let message = "Failed to insert/update record"; 313 - Logger::global().log_jetstream(LogLevel::Error, message, Some(serde_json::json!({ 313 + Logger::global().log_jetstream_with_slice(LogLevel::Error, message, Some(serde_json::json!({ 314 314 "operation": "upsert", 315 315 "collection": commit.collection, 316 316 "slice_uri": slice_uri, 317 317 "did": did, 318 318 "error": e.to_string(), 319 319 "record_type": "external" 320 - }))); 320 + })), Some(slice_uri)); 321 321 return Err(anyhow::anyhow!("Database error: {}", e)); 322 322 } 323 323 }
+46 -6
api/src/logging.rs
··· 127 127 message: &str, 128 128 metadata: Option<Value>, 129 129 ) { 130 + self.log_jetstream_with_slice(level, message, metadata, None); 131 + } 132 + 133 + /// Log a jetstream message with slice context (queued for batch insertion) 134 + pub fn log_jetstream_with_slice( 135 + &self, 136 + level: LogLevel, 137 + message: &str, 138 + metadata: Option<Value>, 139 + slice_uri: Option<&str>, 140 + ) { 130 141 let entry = QueuedLogEntry { 131 142 log_type: LogType::Jetstream.as_str().to_string(), 132 143 job_id: None, 133 144 user_did: None, 134 - slice_uri: None, 145 + slice_uri: slice_uri.map(|s| s.to_string()), 135 146 level: level.as_str().to_string(), 136 147 message: message.to_string(), 137 148 metadata, ··· 301 312 let limit = limit.unwrap_or(100); 302 313 303 314 let rows = if let Some(slice_uri) = slice_filter { 315 + tracing::info!("Querying jetstream logs with slice filter: {}", slice_uri); 304 316 // When filtering by slice, include both slice-specific logs and global connection logs (where slice_uri is NULL) 305 - sqlx::query_as!( 317 + let results = sqlx::query_as!( 306 318 LogEntry, 307 319 r#" 308 320 SELECT id, created_at, log_type, job_id, user_did, slice_uri, level, message, metadata ··· 316 328 limit 317 329 ) 318 330 .fetch_all(pool) 319 - .await? 331 + .await?; 332 + 333 + tracing::info!("Found {} jetstream logs for slice {}", results.len(), slice_uri); 334 + results 320 335 } else { 321 336 // No filter, return all jetstream logs 322 337 sqlx::query_as!( ··· 383 398 Ok(rows) 384 399 } 385 400 386 - /// Clean up old logs (keep last 30 days for jetstream, 7 days for completed sync jobs) 387 - #[allow(dead_code)] 401 + /// Clean up old logs (keep last 1 day for jetstream, 7 days for completed sync jobs) 388 402 pub async fn cleanup_old_logs(pool: &PgPool) -> Result<u64, sqlx::Error> { 389 403 let result = sqlx::query!( 390 404 r#" 391 405 DELETE FROM logs 392 406 WHERE 393 - (log_type = 'jetstream' AND created_at < NOW() - INTERVAL '30 days') 407 + (log_type = 'jetstream' AND created_at < NOW() - INTERVAL '1 day') 394 408 OR (log_type = 'sync_job' AND created_at < NOW() - INTERVAL '7 days') 395 409 OR (log_type = 'system' AND created_at < NOW() - INTERVAL '7 days') 396 410 "#, ··· 399 413 .await?; 400 414 401 415 Ok(result.rows_affected()) 416 + } 417 + 418 + /// Start a background task that cleans up old logs every 6 hours 419 + pub fn start_log_cleanup_task(pool: PgPool) { 420 + tokio::spawn(async move { 421 + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(6 * 3600)); // Every 6 hours 422 + 423 + info!("Started log cleanup background task (runs every 6 hours)"); 424 + 425 + loop { 426 + interval.tick().await; 427 + 428 + match cleanup_old_logs(&pool).await { 429 + Ok(deleted) => { 430 + if deleted > 0 { 431 + info!("Log cleanup: deleted {} old log entries", deleted); 432 + } else { 433 + info!("Log cleanup: no old logs to delete"); 434 + } 435 + } 436 + Err(e) => { 437 + error!("Failed to cleanup old logs: {}", e); 438 + } 439 + } 440 + } 441 + }); 402 442 }
+4 -1
api/src/main.rs
··· 39 39 use crate::database::Database; 40 40 use crate::errors::AppError; 41 41 use crate::jetstream::JetstreamConsumer; 42 - use crate::logging::{LogLevel, Logger}; 42 + use crate::logging::{LogLevel, Logger, start_log_cleanup_task}; 43 43 44 44 #[derive(Clone)] 45 45 pub struct Config { ··· 97 97 98 98 // Initialize global logger 99 99 Logger::init_global(pool.clone()); 100 + 101 + // Start log cleanup background task 102 + start_log_cleanup_task(pool.clone()); 100 103 101 104 // Start job queue runner 102 105 let pool_for_runner = pool.clone();
+3 -2
frontend/src/client.ts
··· 1 1 // Generated TypeScript client for AT Protocol records 2 - // Generated at: 2025-09-17 17:42:43 UTC 2 + // Generated at: 2025-09-18 02:14:35 UTC 3 3 // Lexicons: 25 4 4 5 5 /** ··· 2189 2189 async getJetstreamLogs( 2190 2190 params: GetJetstreamLogsParams 2191 2191 ): Promise<GetJetstreamLogsResponse> { 2192 + const requestParams = { ...params, slice: this.client.sliceUri }; 2192 2193 return await this.client.makeRequest<GetJetstreamLogsResponse>( 2193 2194 "network.slices.slice.getJetstreamLogs", 2194 2195 "GET", 2195 - params 2196 + requestParams 2196 2197 ); 2197 2198 } 2198 2199