Auto-indexing service and GraphQL API for AT Protocol Records
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix did:web resolution, fix batch inserts too many variables issue, backfill all collections when actor enters the system to account for repos that aren't included in listReposByCollection endpoint

+252 -71
+120 -15
server/src/backfill.gleam
··· 98 98 99 99 /// Resolve a DID to get ATP data (PDS endpoint and handle) 100 100 pub fn resolve_did(did: String, plc_url: String) -> Result(AtprotoData, String) { 101 + // Check if this is a did:web DID 102 + case string.starts_with(did, "did:web:") { 103 + True -> resolve_did_web(did) 104 + False -> resolve_did_plc(did, plc_url) 105 + } 106 + } 107 + 108 + /// Resolve a did:web DID by fetching the DID document from HTTPS 109 + fn resolve_did_web(did: String) -> Result(AtprotoData, String) { 110 + // Extract the domain from did:web:example.com 111 + // did:web format: did:web:<domain>[:<port>][:<path>] 112 + let parts = string.split(did, ":") 113 + case parts { 114 + ["did", "web", domain, ..rest] -> { 115 + // Build the URL: https://<domain>/.well-known/did.json 116 + // If there are additional path components, they go before /.well-known/did.json 117 + let base_domain = case rest { 118 + [] -> domain 119 + path_parts -> domain <> "/" <> string.join(path_parts, "/") 120 + } 121 + let url = "https://" <> base_domain <> "/.well-known/did.json" 122 + 123 + case request.to(url) { 124 + Error(_) -> Error("Failed to create request for did:web DID: " <> did) 125 + Ok(req) -> { 126 + case hackney.send(req) { 127 + Error(_) -> Error("Failed to fetch did:web DID data for: " <> did) 128 + Ok(resp) -> { 129 + case resp.status { 130 + 200 -> parse_atproto_data(resp.body, did) 131 + _ -> 132 + Error( 133 + "Failed to resolve DID " 134 + <> did 135 + <> " (status: " 136 + <> string.inspect(resp.status) 137 + <> ")", 138 + ) 139 + } 140 + } 141 + } 142 + } 143 + } 144 + } 145 + _ -> Error("Invalid did:web format: " <> did) 146 + } 147 + } 148 + 149 + /// Resolve a did:plc DID through the PLC directory 150 + fn resolve_did_plc(did: String, plc_url: String) -> Result(AtprotoData, String) { 101 151 let url = plc_url <> "/" <> did 102 152 103 153 case request.to(url) { ··· 266 316 Ok(req) -> { 267 317 case hackney.send(req) { 268 318 Error(err) -> { 269 - // Only log unexpected errors (not TLS/DNS/timeout issues) 319 + // Log all errors with details 270 320 let err_str = string.inspect(err) 271 - case 321 + let is_expected_error = 272 322 string.contains(err_str, "TlsAlert") 273 323 || string.contains(err_str, "Nxdomain") 274 324 || string.contains(err_str, "Timeout") 275 - { 276 - True -> Nil 325 + || string.contains(err_str, "Econnrefused") 326 + || string.contains(err_str, "Closed") 327 + 328 + case is_expected_error { 329 + True -> 330 + logging.log( 331 + logging.Warning, 332 + "[backfill] Network error fetching " 333 + <> repo 334 + <> "/" 335 + <> collection 336 + <> " from " 337 + <> pds_url 338 + <> ": " 339 + <> err_str, 340 + ) 277 341 False -> 278 342 logging.log( 279 343 logging.Error, ··· 281 345 <> repo 282 346 <> "/" 283 347 <> collection 348 + <> " from " 349 + <> pds_url 284 350 <> ": " 285 351 <> err_str, 286 352 ) ··· 293 359 case parse_list_records_response(resp.body, repo, collection) { 294 360 Ok(#(records, next_cursor)) -> { 295 361 let new_acc = list.append(acc, records) 362 + let total_so_far = list.length(new_acc) 363 + 296 364 case next_cursor { 297 - Some(c) -> 365 + Some(c) -> { 366 + logging.log( 367 + logging.Info, 368 + "[backfill] Fetched " 369 + <> string.inspect(list.length(records)) 370 + <> " records (total: " 371 + <> string.inspect(total_so_far) 372 + <> "), continuing with cursor for " 373 + <> repo 374 + <> "/" 375 + <> collection, 376 + ) 298 377 fetch_records_paginated( 299 378 repo, 300 379 collection, ··· 302 381 Some(c), 303 382 new_acc, 304 383 ) 305 - None -> new_acc 384 + } 385 + None -> { 386 + logging.log( 387 + logging.Info, 388 + "[backfill] Completed fetching " 389 + <> string.inspect(total_so_far) 390 + <> " records for " 391 + <> repo 392 + <> "/" 393 + <> collection, 394 + ) 395 + new_acc 396 + } 306 397 } 307 398 } 308 399 Error(err) -> { ··· 324 415 // 302/308: redirect (PDS moved) 325 416 // 403: forbidden (private account) 326 417 // 502/520: bad gateway / cloudflare error (server down) 327 - 400 | 404 | 302 | 308 | 403 | 502 | 520 -> acc 418 + 400 | 404 | 302 | 308 | 403 | 502 | 520 -> { 419 + acc 420 + } 328 421 // Other unexpected errors should be logged 329 422 _ -> { 330 423 logging.log( 331 424 logging.Error, 332 - "[backfill] Failed to fetch records for " 425 + "[backfill] Unexpected status " 426 + <> string.inspect(resp.status) 427 + <> " fetching " 333 428 <> repo 334 429 <> "/" 335 430 <> collection 336 - <> " (status: " 337 - <> string.inspect(resp.status) 431 + <> " from " 432 + <> pds_url 433 + <> " (URL: " 434 + <> url 338 435 <> ")", 339 436 ) 340 437 acc ··· 613 710 }) 614 711 } 615 712 616 - /// Backfill all external collections for a newly discovered actor 713 + /// Backfill all collections (primary and external) for a newly discovered actor 617 714 /// This is called when a new actor is created via Jetstream or GraphQL mutations 618 - pub fn backfill_external_collections_for_actor( 715 + pub fn backfill_collections_for_actor( 619 716 db: sqlight.Connection, 620 717 did: String, 718 + collection_ids: List(String), 621 719 external_collection_ids: List(String), 622 720 plc_url: String, 623 721 ) -> Nil { 722 + let all_collections = list.append(collection_ids, external_collection_ids) 723 + let total_count = list.length(all_collections) 724 + 624 725 logging.log( 625 726 logging.Info, 626 727 "[backfill] Starting background sync for new actor: " 627 728 <> did 628 729 <> " (" 730 + <> string.inspect(total_count) 731 + <> " collections: " 732 + <> string.inspect(list.length(collection_ids)) 733 + <> " primary + " 629 734 <> string.inspect(list.length(external_collection_ids)) 630 - <> " external collections)", 735 + <> " external)", 631 736 ) 632 737 633 738 // Resolve DID to get PDS endpoint 634 739 case resolve_did(did, plc_url) { 635 740 Ok(atp_data) -> { 636 - // Fetch and index records for each external collection 637 - list.each(external_collection_ids, fn(collection) { 741 + // Fetch and index records for all collections (primary + external) 742 + list.each(all_collections, fn(collection) { 638 743 logging.log( 639 744 logging.Info, 640 745 "[backfill] Fetching " <> collection <> " for " <> did,
+95 -45
server/src/database.gleam
··· 521 521 case uris { 522 522 [] -> Ok(dict.new()) 523 523 _ -> { 524 - // Build placeholders for SQL IN clause 525 - let placeholders = 526 - list.map(uris, fn(_) { "?" }) 527 - |> string.join(", ") 524 + // Process in batches to avoid SQL parameter limits (max 999 params) 525 + let batch_size = 900 526 + let batches = list.sized_chunk(uris, batch_size) 528 527 529 - let sql = 530 - " 531 - SELECT uri, cid 532 - FROM record 533 - WHERE uri IN (" <> placeholders <> ") 534 - " 528 + // Process each batch and merge results 529 + use accumulated_dict <- result.try( 530 + list.try_fold(batches, dict.new(), fn(acc_dict, batch) { 531 + // Build placeholders for SQL IN clause 532 + let placeholders = 533 + list.map(batch, fn(_) { "?" }) 534 + |> string.join(", ") 535 535 536 - // Convert URIs to sqlight.Value list 537 - let params = list.map(uris, sqlight.text) 536 + let sql = 537 + " 538 + SELECT uri, cid 539 + FROM record 540 + WHERE uri IN (" <> placeholders <> ") 541 + " 538 542 539 - let decoder = { 540 - use uri <- decode.field(0, decode.string) 541 - use cid <- decode.field(1, decode.string) 542 - decode.success(#(uri, cid)) 543 - } 543 + // Convert URIs to sqlight.Value list 544 + let params = list.map(batch, sqlight.text) 544 545 545 - use results <- result.try(sqlight.query( 546 - sql, 547 - on: conn, 548 - with: params, 549 - expecting: decoder, 550 - )) 546 + let decoder = { 547 + use uri <- decode.field(0, decode.string) 548 + use cid <- decode.field(1, decode.string) 549 + decode.success(#(uri, cid)) 550 + } 551 + 552 + use results <- result.try(sqlight.query( 553 + sql, 554 + on: conn, 555 + with: params, 556 + expecting: decoder, 557 + )) 558 + 559 + // Merge with accumulated dictionary 560 + let batch_dict = dict.from_list(results) 561 + Ok(dict.merge(acc_dict, batch_dict)) 562 + }), 563 + ) 551 564 552 - // Convert list of tuples to Dict 553 - Ok(dict.from_list(results)) 565 + Ok(accumulated_dict) 566 + } 567 + } 568 + } 569 + 570 + /// Gets existing CIDs from the database (checks if any CID exists, regardless of URI) 571 + /// Returns a list of CIDs that exist in the database 572 + fn get_existing_cids_batch( 573 + conn: sqlight.Connection, 574 + cids: List(String), 575 + ) -> Result(List(String), sqlight.Error) { 576 + case cids { 577 + [] -> Ok([]) 578 + _ -> { 579 + // Process in batches to avoid SQL parameter limits (max 999 params) 580 + let batch_size = 900 581 + let batches = list.sized_chunk(cids, batch_size) 582 + 583 + // Process each batch and collect results 584 + use all_results <- result.try( 585 + list.try_fold(batches, [], fn(acc_results, batch) { 586 + // Build placeholders for SQL IN clause 587 + let placeholders = 588 + list.map(batch, fn(_) { "?" }) 589 + |> string.join(", ") 590 + 591 + let sql = 592 + " 593 + SELECT cid 594 + FROM record 595 + WHERE cid IN (" <> placeholders <> ") 596 + " 597 + 598 + let cid_decoder = { 599 + use cid <- decode.field(0, decode.string) 600 + decode.success(cid) 601 + } 602 + 603 + use results <- result.try(sqlight.query( 604 + sql, 605 + on: conn, 606 + with: list.map(batch, sqlight.text), 607 + expecting: cid_decoder, 608 + )) 609 + 610 + // Append to accumulated results 611 + Ok(list.append(acc_results, results)) 612 + }), 613 + ) 614 + 615 + Ok(all_results) 554 616 } 555 617 } 556 618 } ··· 623 685 // Get all URIs from the incoming records 624 686 let uris = list.map(records, fn(record) { record.uri }) 625 687 626 - // Fetch existing CIDs for these URIs 688 + // Fetch existing CIDs for these URIs (batched to avoid SQL parameter limits) 627 689 use existing_cids <- result.try(get_existing_cids(conn, uris)) 628 690 629 691 // Get all CIDs that already exist in the database (for any URI) 630 - let all_incoming_cids = list.map(records, fn(record) { record.cid }) 631 - let check_all_cids_sql = 632 - " 633 - SELECT cid 634 - FROM record 635 - WHERE cid IN (" 636 - <> string.join(list.map(all_incoming_cids, fn(_) { "?" }), ", ") 637 - <> ") 638 - " 639 - 640 - let cid_decoder = { 641 - use cid <- decode.field(0, decode.string) 642 - decode.success(cid) 643 - } 692 + // Check in batches to avoid exceeding SQL parameter limits 693 + let all_incoming_cids = 694 + list.map(records, fn(record) { record.cid }) 695 + |> list.unique() 644 696 645 - use existing_cids_in_db <- result.try(sqlight.query( 646 - check_all_cids_sql, 647 - on: conn, 648 - with: list.map(all_incoming_cids, sqlight.text), 649 - expecting: cid_decoder, 697 + use existing_cids_in_db <- result.try(get_existing_cids_batch( 698 + conn, 699 + all_incoming_cids, 650 700 )) 651 701 652 702 // Create a set of existing CIDs for fast lookup
+4 -2
server/src/event_handler.gleam
··· 92 92 time_us: Int, 93 93 commit: goose.CommitData, 94 94 plc_url: String, 95 + collection_ids: List(String), 95 96 external_collection_ids: List(String), 96 97 ) -> Nil { 97 98 let uri = "at://" <> did <> "/" <> commit.collection <> "/" <> commit.rkey ··· 162 163 // Ensure actor exists before inserting record 163 164 case actor_validator.ensure_actor_exists(db, did, plc_url) { 164 165 Ok(is_new_actor) -> { 165 - // If this is a new actor, synchronously backfill external collections 166 + // If this is a new actor, synchronously backfill all collections 166 167 // This ensures subscription joins have complete data immediately 167 168 // We're already in a spawned process per event, so blocking is fine 168 169 case is_new_actor { ··· 170 171 // Publish stats event for new actor 171 172 stats_pubsub.publish(stats_pubsub.ActorCreated) 172 173 173 - backfill.backfill_external_collections_for_actor( 174 + backfill.backfill_collections_for_actor( 174 175 db, 175 176 did, 177 + collection_ids, 176 178 external_collection_ids, 177 179 plc_url, 178 180 )
+15 -1
server/src/graphql_gleam.gleam
··· 343 343 } 344 344 } 345 345 346 - // Step 4: Determine external collections for backfill 346 + // Step 4: Determine local and external collections for backfill 347 + let collection_ids = 348 + parsed_lexicons 349 + |> list.filter_map(fn(lex) { 350 + case 351 + backfill.nsid_matches_domain_authority(lex.id, domain_authority) 352 + { 353 + True -> Ok(lex.id) 354 + // Local collection, include 355 + False -> Error(Nil) 356 + // External collection, skip 357 + } 358 + }) 359 + 347 360 let external_collection_ids = 348 361 parsed_lexicons 349 362 |> list.filter_map(fn(lex) { ··· 363 376 db: db, 364 377 auth_base_url: auth_base_url, 365 378 plc_url: plc_url, 379 + collection_ids: collection_ids, 366 380 external_collection_ids: external_collection_ids, 367 381 ) 368 382
+5
server/src/jetstream_consumer.gleam
··· 222 222 ) 223 223 224 224 // Start the unified consumer 225 + let local_collections = local_collection_ids 225 226 let ext_collections = external_collection_ids 226 227 let pid = 227 228 process.spawn_unlinked(fn() { ··· 232 233 handle_jetstream_event( 233 234 db, 234 235 event_json, 236 + local_collections, 235 237 ext_collections, 236 238 plc_url, 237 239 ) ··· 275 277 fn handle_jetstream_event( 276 278 db: sqlight.Connection, 277 279 event_json: String, 280 + collection_ids: List(String), 278 281 external_collection_ids: List(String), 279 282 plc_url: String, 280 283 ) -> Nil { ··· 295 298 time_us, 296 299 commit, 297 300 plc_url, 301 + collection_ids, 298 302 external_collection_ids, 299 303 ) 300 304 False -> Nil ··· 308 312 time_us, 309 313 commit, 310 314 plc_url, 315 + collection_ids, 311 316 external_collection_ids, 312 317 ) 313 318 }
+13 -8
server/src/mutation_resolvers.gleam
··· 26 26 db: sqlight.Connection, 27 27 auth_base_url: String, 28 28 plc_url: String, 29 + collection_ids: List(String), 29 30 external_collection_ids: List(String), 30 31 ) 31 32 } ··· 136 137 ctx.plc_url, 137 138 )) 138 139 139 - // If new actor, spawn backfill for external collections 140 + // If new actor, spawn backfill for all collections 140 141 case is_new_actor { 141 142 True -> { 142 143 process.spawn_unlinked(fn() { 143 - backfill.backfill_external_collections_for_actor( 144 + backfill.backfill_collections_for_actor( 144 145 ctx.db, 145 146 user_info.did, 147 + ctx.collection_ids, 146 148 ctx.external_collection_ids, 147 149 ctx.plc_url, 148 150 ) ··· 321 323 ctx.plc_url, 322 324 )) 323 325 324 - // If new actor, spawn backfill for external collections 326 + // If new actor, spawn backfill for all collections 325 327 case is_new_actor { 326 328 True -> { 327 329 process.spawn_unlinked(fn() { 328 - backfill.backfill_external_collections_for_actor( 330 + backfill.backfill_collections_for_actor( 329 331 ctx.db, 330 332 user_info.did, 333 + ctx.collection_ids, 331 334 ctx.external_collection_ids, 332 335 ctx.plc_url, 333 336 ) ··· 480 483 ctx.plc_url, 481 484 )) 482 485 483 - // If new actor, spawn backfill for external collections 486 + // If new actor, spawn backfill for all collections 484 487 case is_new_actor { 485 488 True -> { 486 489 process.spawn_unlinked(fn() { 487 - backfill.backfill_external_collections_for_actor( 490 + backfill.backfill_collections_for_actor( 488 491 ctx.db, 489 492 user_info.did, 493 + ctx.collection_ids, 490 494 ctx.external_collection_ids, 491 495 ctx.plc_url, 492 496 ) ··· 604 608 ctx.plc_url, 605 609 )) 606 610 607 - // If new actor, spawn backfill for external collections 611 + // If new actor, spawn backfill for all collections 608 612 case is_new_actor { 609 613 True -> { 610 614 process.spawn_unlinked(fn() { 611 - backfill.backfill_external_collections_for_actor( 615 + backfill.backfill_collections_for_actor( 612 616 ctx.db, 613 617 user_info.did, 618 + ctx.collection_ids, 614 619 ctx.external_collection_ids, 615 620 ctx.plc_url, 616 621 )