···44psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL
55 CREATE USER aip WITH PASSWORD 'aip';
66 CREATE DATABASE aip OWNER aip;
77+ CREATE USER tap WITH PASSWORD 'tap';
88+ CREATE DATABASE tap OWNER tap;
79EOSQL
+188-3
src/admin/backfill.rs
···11use axum::Json;
22use axum::extract::State;
33use axum::http::StatusCode;
44+use serde::Deserialize;
45use serde_json::Value;
5667use crate::AppState;
78use crate::error::AppError;
99+use crate::tap;
810911use super::auth::AdminAuth;
1012use super::types::{BackfillJob, CreateBackfillBody};
11131212-/// POST /admin/backfill — create a new backfill job.
1414+// ---------------------------------------------------------------------------
1515+// Relay discovery (reused from old backfill module)
1616+// ---------------------------------------------------------------------------
1717+1818+#[derive(Deserialize)]
1919+struct ListReposResponse {
2020+ repos: Vec<RepoEntry>,
2121+ cursor: Option<String>,
2222+}
2323+2424+#[derive(Deserialize)]
2525+struct RepoEntry {
2626+ did: String,
2727+}
2828+2929+/// Discover all DIDs that have records in `collection` via the relay's
3030+/// `com.atproto.sync.listReposByCollection` endpoint. Paginates until done.
3131+async fn list_repos_by_collection(
3232+ http: &reqwest::Client,
3333+ relay_url: &str,
3434+ collection: &str,
3535+) -> Result<Vec<String>, String> {
3636+ let base = relay_url.trim_end_matches('/');
3737+ let mut dids = Vec::new();
3838+ let mut cursor: Option<String> = None;
3939+4040+ loop {
4141+ let mut url = format!(
4242+ "{base}/xrpc/com.atproto.sync.listReposByCollection?collection={collection}&limit=1000"
4343+ );
4444+ if let Some(ref c) = cursor {
4545+ url.push_str(&format!("&cursor={c}"));
4646+ }
4747+4848+ let resp = http
4949+ .get(&url)
5050+ .send()
5151+ .await
5252+ .map_err(|e| format!("relay request failed: {e}"))?;
5353+5454+ if !resp.status().is_success() {
5555+ return Err(format!("relay returned {}", resp.status()));
5656+ }
5757+5858+ let body: ListReposResponse = resp
5959+ .json()
6060+ .await
6161+ .map_err(|e| format!("invalid relay response: {e}"))?;
6262+6363+ let page_count = body.repos.len();
6464+ for repo in body.repos {
6565+ dids.push(repo.did);
6666+ }
6767+6868+ match body.cursor {
6969+ Some(c) if page_count > 0 => cursor = Some(c),
7070+ _ => break,
7171+ }
7272+ }
7373+7474+ Ok(dids)
7575+}
7676+7777+// ---------------------------------------------------------------------------
7878+// Admin handlers
7979+// ---------------------------------------------------------------------------
8080+8181+/// POST /admin/backfill — create a backfill job, discover repos, and add them to Tap.
1382pub(super) async fn create_backfill(
1483 State(state): State<AppState>,
1584 _admin: AdminAuth,
1685 Json(body): Json<CreateBackfillBody>,
1786) -> Result<(StatusCode, Json<Value>), AppError> {
8787+ // Create a backfill_jobs record for tracking/audit.
1888 let row: (String,) = sqlx::query_as(
1989 "INSERT INTO backfill_jobs (collection, did) VALUES ($1, $2) RETURNING id::text",
2090 )
···2494 .await
2595 .map_err(|e| AppError::Internal(format!("failed to create backfill job: {e}")))?;
26969797+ let job_id = row.0.clone();
9898+9999+ // Mark as running.
100100+ let _ = sqlx::query(
101101+ "UPDATE backfill_jobs SET status = 'running', started_at = NOW() WHERE id::text = $1",
102102+ )
103103+ .bind(&job_id)
104104+ .execute(&state.db)
105105+ .await;
106106+107107+ // Determine target collections.
108108+ let collections: Vec<String> = if let Some(ref col) = body.collection {
109109+ vec![col.clone()]
110110+ } else {
111111+ let rows: Vec<(String,)> = sqlx::query_as(
112112+ "SELECT id FROM lexicons WHERE backfill = TRUE AND lexicon_json->'defs'->'main'->>'type' = 'record'",
113113+ )
114114+ .fetch_all(&state.db)
115115+ .await
116116+ .map_err(|e| AppError::Internal(format!("failed to query backfill-eligible lexicons: {e}")))?;
117117+ rows.into_iter().map(|(id,)| id).collect()
118118+ };
119119+120120+ if collections.is_empty() {
121121+ let _ = sqlx::query(
122122+ "UPDATE backfill_jobs SET status = 'completed', completed_at = NOW(), error = 'no backfill-eligible collections' WHERE id::text = $1",
123123+ )
124124+ .bind(&job_id)
125125+ .execute(&state.db)
126126+ .await;
127127+128128+ return Ok((
129129+ StatusCode::CREATED,
130130+ Json(serde_json::json!({
131131+ "id": job_id,
132132+ "status": "completed",
133133+ "error": "no backfill-eligible collections",
134134+ })),
135135+ ));
136136+ }
137137+138138+ // Discover repos and add them to Tap.
139139+ let mut all_dids = Vec::new();
140140+141141+ for collection in &collections {
142142+ let dids = if let Some(ref did) = body.did {
143143+ vec![did.clone()]
144144+ } else {
145145+ match list_repos_by_collection(&state.http, &state.config.relay_url, collection).await {
146146+ Ok(dids) => dids,
147147+ Err(e) => {
148148+ tracing::warn!(collection, error = %e, "failed to discover repos, skipping");
149149+ continue;
150150+ }
151151+ }
152152+ };
153153+154154+ all_dids.extend(dids);
155155+ }
156156+157157+ // Deduplicate DIDs.
158158+ all_dids.sort();
159159+ all_dids.dedup();
160160+161161+ let total_repos = all_dids.len() as i32;
162162+163163+ // Update job with total repos.
164164+ let _ = sqlx::query("UPDATE backfill_jobs SET total_repos = $2 WHERE id::text = $1")
165165+ .bind(&job_id)
166166+ .bind(total_repos)
167167+ .execute(&state.db)
168168+ .await;
169169+170170+ // Add repos to Tap in batches.
171171+ if !all_dids.is_empty() {
172172+ for chunk in all_dids.chunks(1000) {
173173+ if let Err(e) = tap::add_repos(
174174+ &state.http,
175175+ &state.config.tap_url,
176176+ state.config.tap_admin_password.as_deref(),
177177+ chunk,
178178+ )
179179+ .await
180180+ {
181181+ tracing::warn!(error = %e, "failed to add repos to tap");
182182+ let _ = sqlx::query(
183183+ "UPDATE backfill_jobs SET status = 'failed', completed_at = NOW(), error = $2 WHERE id::text = $1",
184184+ )
185185+ .bind(&job_id)
186186+ .bind(&e)
187187+ .execute(&state.db)
188188+ .await;
189189+190190+ return Ok((
191191+ StatusCode::CREATED,
192192+ Json(serde_json::json!({
193193+ "id": job_id,
194194+ "status": "failed",
195195+ "error": e,
196196+ })),
197197+ ));
198198+ }
199199+ }
200200+ }
201201+202202+ // Mark as completed (Tap handles the actual backfill asynchronously).
203203+ let _ = sqlx::query(
204204+ "UPDATE backfill_jobs SET status = 'completed', completed_at = NOW(), processed_repos = $2 WHERE id::text = $1",
205205+ )
206206+ .bind(&job_id)
207207+ .bind(total_repos)
208208+ .execute(&state.db)
209209+ .await;
210210+27211 Ok((
28212 StatusCode::CREATED,
29213 Json(serde_json::json!({
3030- "id": row.0,
3131- "status": "pending",
214214+ "id": job_id,
215215+ "status": "completed",
216216+ "total_repos": total_repos,
32217 })),
33218 ))
34219}
+5-5
src/admin/lexicons.rs
···1010use super::auth::AdminAuth;
1111use super::types::{LexiconSummary, UploadLexiconBody};
12121313-/// Send the current record collection list to the Jetstream task so it
1414-/// reconnects with the updated filter.
1515-async fn notify_jetstream(state: &AppState) {
1313+/// Send the current record collection list to the Tap task so it
1414+/// syncs the updated filter.
1515+async fn notify_collections(state: &AppState) {
1616 let collections = state.lexicons.get_record_collections().await;
1717 let _ = state.collections_tx.send(collections);
1818}
···9393 state.lexicons.upsert(parsed).await;
94949595 if is_record {
9696- notify_jetstream(&state).await;
9696+ notify_collections(&state).await;
9797 }
98989999 let status = if revision == 1 {
···197197 }
198198199199 state.lexicons.remove(&id).await;
200200- notify_jetstream(&state).await;
200200+ notify_collections(&state).await;
201201202202 Ok(StatusCode::NO_CONTENT)
203203}
+5-5
src/admin/network_lexicons.rs
···1111use super::auth::AdminAuth;
1212use super::types::{AddNetworkLexiconBody, NetworkLexiconSummary};
13131414-/// Send the current record collection list to the Jetstream task so it
1515-/// reconnects with the updated filter.
1616-async fn notify_jetstream(state: &AppState) {
1414+/// Send the current record collection list to the Tap task so it
1515+/// syncs the updated filter.
1616+async fn notify_collections(state: &AppState) {
1717 let collections = state.lexicons.get_record_collections().await;
1818 let _ = state.collections_tx.send(collections);
1919}
···9595 state.lexicons.upsert(parsed).await;
96969797 if is_record {
9898- notify_jetstream(&state).await;
9898+ notify_collections(&state).await;
9999 }
100100101101 Ok((
···165165 .await;
166166167167 state.lexicons.remove(&nsid).await;
168168- notify_jetstream(&state).await;
168168+ notify_collections(&state).await;
169169170170 Ok(StatusCode::NO_CONTENT)
171171}
-373
src/backfill.rs
···11-use serde::Deserialize;
22-use serde_json::Value;
33-use sqlx::PgPool;
44-use std::sync::Arc;
55-use tokio::sync::Semaphore;
66-use tracing::{debug, error, info, warn};
77-88-use crate::profile;
99-1010-// ---------------------------------------------------------------------------
1111-// Relay / PDS response types
1212-// ---------------------------------------------------------------------------
1313-1414-#[derive(Deserialize)]
1515-struct ListReposResponse {
1616- repos: Vec<RepoEntry>,
1717- cursor: Option<String>,
1818-}
1919-2020-#[derive(Deserialize)]
2121-struct RepoEntry {
2222- did: String,
2323-}
2424-2525-#[derive(Deserialize)]
2626-struct ListRecordsResponse {
2727- records: Vec<RecordEntry>,
2828- cursor: Option<String>,
2929-}
3030-3131-#[derive(Deserialize)]
3232-struct RecordEntry {
3333- uri: String,
3434- cid: String,
3535- value: Value,
3636-}
3737-3838-// ---------------------------------------------------------------------------
3939-// Relay discovery
4040-// ---------------------------------------------------------------------------
4141-4242-/// Discover all DIDs that have records in `collection` via the relay's
4343-/// `com.atproto.sync.listReposByCollection` endpoint. Paginates until done.
4444-async fn list_repos_by_collection(
4545- http: &reqwest::Client,
4646- relay_url: &str,
4747- collection: &str,
4848-) -> Result<Vec<String>, String> {
4949- let base = relay_url.trim_end_matches('/');
5050- let mut dids = Vec::new();
5151- let mut cursor: Option<String> = None;
5252-5353- loop {
5454- let mut url = format!(
5555- "{base}/xrpc/com.atproto.sync.listReposByCollection?collection={collection}&limit=1000"
5656- );
5757- if let Some(ref c) = cursor {
5858- url.push_str(&format!("&cursor={c}"));
5959- }
6060-6161- let resp = http
6262- .get(&url)
6363- .send()
6464- .await
6565- .map_err(|e| format!("relay request failed: {e}"))?;
6666-6767- if !resp.status().is_success() {
6868- return Err(format!("relay returned {}", resp.status()));
6969- }
7070-7171- let body: ListReposResponse = resp
7272- .json()
7373- .await
7474- .map_err(|e| format!("invalid relay response: {e}"))?;
7575-7676- let page_count = body.repos.len();
7777- for repo in body.repos {
7878- dids.push(repo.did);
7979- }
8080-8181- match body.cursor {
8282- Some(c) if page_count > 0 => cursor = Some(c),
8383- _ => break,
8484- }
8585- }
8686-8787- Ok(dids)
8888-}
8989-9090-// ---------------------------------------------------------------------------
9191-// PDS record fetching
9292-// ---------------------------------------------------------------------------
9393-9494-/// Fetch all records for a DID + collection from their PDS via
9595-/// `com.atproto.repo.listRecords`. Paginates until done.
9696-async fn fetch_records(
9797- http: &reqwest::Client,
9898- pds_url: &str,
9999- did: &str,
100100- collection: &str,
101101-) -> Result<Vec<(String, String, String, Value)>, String> {
102102- let base = pds_url.trim_end_matches('/');
103103- let mut records = Vec::new();
104104- let mut cursor: Option<String> = None;
105105-106106- loop {
107107- let mut url = format!(
108108- "{base}/xrpc/com.atproto.repo.listRecords?repo={did}&collection={collection}&limit=100"
109109- );
110110- if let Some(ref c) = cursor {
111111- url.push_str(&format!("&cursor={c}"));
112112- }
113113-114114- let resp = http
115115- .get(&url)
116116- .send()
117117- .await
118118- .map_err(|e| format!("PDS listRecords failed: {e}"))?;
119119-120120- if !resp.status().is_success() {
121121- return Err(format!("PDS returned {} for {did}", resp.status()));
122122- }
123123-124124- let body: ListRecordsResponse = resp
125125- .json()
126126- .await
127127- .map_err(|e| format!("invalid PDS listRecords response: {e}"))?;
128128-129129- let page_count = body.records.len();
130130- for entry in body.records {
131131- let rkey = entry
132132- .uri
133133- .split('/')
134134- .next_back()
135135- .unwrap_or_default()
136136- .to_string();
137137- records.push((entry.uri, rkey, entry.cid, entry.value));
138138- }
139139-140140- match body.cursor {
141141- Some(c) if page_count > 0 => cursor = Some(c),
142142- _ => break,
143143- }
144144- }
145145-146146- Ok(records)
147147-}
148148-149149-// ---------------------------------------------------------------------------
150150-// Job runner
151151-// ---------------------------------------------------------------------------
152152-153153-/// Run a single backfill job: discover repos, fetch records, upsert into DB.
154154-async fn run_job(
155155- db: &PgPool,
156156- http: &reqwest::Client,
157157- relay_url: &str,
158158- plc_url: &str,
159159- job_id: &str,
160160-) -> Result<(), String> {
161161- // Fetch the job
162162- let job: (Option<String>, Option<String>) =
163163- sqlx::query_as("SELECT collection, did FROM backfill_jobs WHERE id::text = $1")
164164- .bind(job_id)
165165- .fetch_one(db)
166166- .await
167167- .map_err(|e| format!("failed to fetch job: {e}"))?;
168168-169169- let (job_collection, job_did) = job;
170170-171171- // Mark as running
172172- let _ = sqlx::query(
173173- "UPDATE backfill_jobs SET status = 'running', started_at = NOW() WHERE id::text = $1",
174174- )
175175- .bind(job_id)
176176- .execute(db)
177177- .await;
178178-179179- // Determine target collections
180180- let collections: Vec<String> = if let Some(ref col) = job_collection {
181181- vec![col.clone()]
182182- } else {
183183- // All backfill-eligible collections
184184- let rows: Vec<(String,)> = sqlx::query_as(
185185- "SELECT id FROM lexicons WHERE backfill = TRUE AND lexicon_json->'defs'->'main'->>'type' = 'record'",
186186- )
187187- .fetch_all(db)
188188- .await
189189- .map_err(|e| format!("failed to query backfill-eligible lexicons: {e}"))?;
190190- rows.into_iter().map(|(id,)| id).collect()
191191- };
192192-193193- if collections.is_empty() {
194194- let _ = sqlx::query(
195195- "UPDATE backfill_jobs SET status = 'completed', completed_at = NOW(), error = 'no backfill-eligible collections' WHERE id::text = $1",
196196- )
197197- .bind(job_id)
198198- .execute(db)
199199- .await;
200200- return Ok(());
201201- }
202202-203203- info!(job = job_id, ?collections, "starting backfill");
204204-205205- let semaphore = Arc::new(Semaphore::new(8));
206206- let mut total_repos = 0i32;
207207- let mut processed_repos = 0i32;
208208- let mut total_records = 0i32;
209209-210210- for collection in &collections {
211211- // Discover DIDs
212212- let dids = if let Some(ref did) = job_did {
213213- vec![did.clone()]
214214- } else {
215215- match list_repos_by_collection(http, relay_url, collection).await {
216216- Ok(dids) => dids,
217217- Err(e) => {
218218- warn!(collection, error = %e, "failed to discover repos, skipping");
219219- continue;
220220- }
221221- }
222222- };
223223-224224- total_repos += dids.len() as i32;
225225- let _ = sqlx::query("UPDATE backfill_jobs SET total_repos = $2 WHERE id::text = $1")
226226- .bind(job_id)
227227- .bind(total_repos)
228228- .execute(db)
229229- .await;
230230-231231- // Process each DID concurrently (bounded by semaphore)
232232- let mut tasks = Vec::new();
233233-234234- for did in dids {
235235- let permit = semaphore.clone().acquire_owned().await.unwrap();
236236- let http = http.clone();
237237- let db = db.clone();
238238- let collection = collection.clone();
239239-240240- let plc_url = plc_url.to_string();
241241- let task = tokio::spawn(async move {
242242- let _permit = permit;
243243- backfill_repo(&db, &http, &plc_url, &did, &collection).await
244244- });
245245- tasks.push(task);
246246- }
247247-248248- for task in tasks {
249249- match task.await {
250250- Ok(Ok(count)) => {
251251- total_records += count;
252252- processed_repos += 1;
253253- }
254254- Ok(Err(e)) => {
255255- warn!(error = %e, "repo backfill failed");
256256- processed_repos += 1;
257257- }
258258- Err(e) => {
259259- warn!(error = %e, "repo backfill task panicked");
260260- processed_repos += 1;
261261- }
262262- }
263263-264264- // Update progress periodically
265265- let _ = sqlx::query(
266266- "UPDATE backfill_jobs SET processed_repos = $2, total_records = $3 WHERE id::text = $1",
267267- )
268268- .bind(job_id)
269269- .bind(processed_repos)
270270- .bind(total_records)
271271- .execute(db)
272272- .await;
273273- }
274274- }
275275-276276- // Mark completed
277277- let _ = sqlx::query(
278278- "UPDATE backfill_jobs SET status = 'completed', completed_at = NOW(), processed_repos = $2, total_records = $3 WHERE id::text = $1",
279279- )
280280- .bind(job_id)
281281- .bind(processed_repos)
282282- .bind(total_records)
283283- .execute(db)
284284- .await;
285285-286286- info!(
287287- job = job_id,
288288- processed_repos, total_records, "backfill completed"
289289- );
290290- Ok(())
291291-}
292292-293293-/// Backfill a single repo's records for a collection. Returns the number of
294294-/// records upserted.
295295-async fn backfill_repo(
296296- db: &PgPool,
297297- http: &reqwest::Client,
298298- plc_url: &str,
299299- did: &str,
300300- collection: &str,
301301-) -> Result<i32, String> {
302302- // Resolve PDS
303303- let pds = profile::resolve_pds_endpoint(http, plc_url, did)
304304- .await
305305- .map_err(|e| format!("PDS resolution failed for {did}: {e}"))?;
306306-307307- // Fetch records
308308- let records = fetch_records(http, &pds, did, collection).await?;
309309- let count = records.len() as i32;
310310-311311- debug!(did, collection, count, "fetched records from PDS");
312312-313313- // Upsert into DB
314314- for (uri, rkey, cid, value) in records {
315315- let _ = sqlx::query(
316316- r#"
317317- INSERT INTO records (uri, did, collection, rkey, record, cid)
318318- VALUES ($1, $2, $3, $4, $5, $6)
319319- ON CONFLICT (uri) DO UPDATE
320320- SET record = EXCLUDED.record,
321321- cid = EXCLUDED.cid
322322- "#,
323323- )
324324- .bind(&uri)
325325- .bind(did)
326326- .bind(collection)
327327- .bind(&rkey)
328328- .bind(&value)
329329- .bind(&cid)
330330- .execute(db)
331331- .await
332332- .map_err(|e| format!("DB upsert failed for {uri}: {e}"))?;
333333- }
334334-335335- Ok(count)
336336-}
337337-338338-// ---------------------------------------------------------------------------
339339-// Background worker
340340-// ---------------------------------------------------------------------------
341341-342342-/// Spawn a background task that polls for pending backfill jobs and runs them.
343343-pub fn spawn_worker(db: PgPool, http: reqwest::Client, relay_url: String, plc_url: String) {
344344- tokio::spawn(async move {
345345- info!("backfill worker started");
346346- loop {
347347- // Poll for a pending job
348348- let job: Option<(String,)> = sqlx::query_as(
349349- "SELECT id::text FROM backfill_jobs WHERE status = 'pending' ORDER BY created_at ASC LIMIT 1",
350350- )
351351- .fetch_optional(&db)
352352- .await
353353- .unwrap_or(None);
354354-355355- if let Some((job_id,)) = job {
356356- info!(job = %job_id, "picked up backfill job");
357357- if let Err(e) = run_job(&db, &http, &relay_url, &plc_url, &job_id).await {
358358- error!(job = %job_id, error = %e, "backfill job failed");
359359- let _ = sqlx::query(
360360- "UPDATE backfill_jobs SET status = 'failed', completed_at = NOW(), error = $2 WHERE id::text = $1",
361361- )
362362- .bind(&job_id)
363363- .bind(&e)
364364- .execute(&db)
365365- .await;
366366- }
367367- } else {
368368- // No pending jobs, wait before polling again
369369- tokio::time::sleep(std::time::Duration::from_secs(5)).await;
370370- }
371371- }
372372- });
373373-}
···11-use futures_util::StreamExt;
22-use serde::Deserialize;
33-use serde_json::Value;
44-use sqlx::PgPool;
55-use std::sync::Arc;
66-use std::sync::atomic::{AtomicI64, Ordering};
77-use tokio::sync::watch;
88-use tokio_tungstenite::tungstenite::Message;
99-1010-use crate::lexicon::{LexiconRegistry, ParsedLexicon, ProcedureAction};
1111-1212-// ---------------------------------------------------------------------------
1313-// Jetstream event types
1414-// ---------------------------------------------------------------------------
1515-1616-#[derive(Deserialize)]
1717-struct JetstreamEvent {
1818- did: String,
1919- time_us: i64,
2020- kind: String,
2121- commit: Option<JetstreamCommit>,
2222-}
2323-2424-#[derive(Deserialize)]
2525-struct JetstreamCommit {
2626- operation: String,
2727- collection: String,
2828- rkey: String,
2929- record: Option<Value>,
3030- cid: Option<String>,
3131-}
3232-3333-// ---------------------------------------------------------------------------
3434-// Public API
3535-// ---------------------------------------------------------------------------
3636-3737-/// The static collection we always watch for lexicon schema updates.
3838-const LEXICON_SCHEMA_COLLECTION: &str = "com.atproto.lexicon.schema";
3939-4040-/// Spawn a background task that subscribes to the Jetstream firehose and
4141-/// indexes records for collections specified by the watch channel.
4242-///
4343-/// When the collection list is empty, the task idles without connecting.
4444-/// When collections change, it disconnects and reconnects with the new filter.
4545-pub fn spawn(
4646- db: PgPool,
4747- jetstream_url: String,
4848- mut collections_rx: watch::Receiver<Vec<String>>,
4949- lexicons: LexiconRegistry,
5050- collections_tx: watch::Sender<Vec<String>>,
5151-) {
5252- tokio::spawn(async move {
5353- let cursor: Arc<AtomicI64> = Arc::new(AtomicI64::new(0));
5454-5555- loop {
5656- // Wait until we have at least one collection to subscribe to.
5757- let collections = collections_rx.borrow_and_update().clone();
5858- if collections.is_empty() {
5959- tracing::info!("no collections configured, jetstream idle");
6060- // Block until the collection list changes.
6161- if collections_rx.changed().await.is_err() {
6262- // Sender dropped — shut down.
6363- tracing::info!("jetstream watch channel closed, shutting down");
6464- return;
6565- }
6666- continue;
6767- }
6868-6969- // Always include the lexicon schema collection alongside the dynamic ones.
7070- let mut wanted = collections.clone();
7171- if !wanted.contains(&LEXICON_SCHEMA_COLLECTION.to_string()) {
7272- wanted.push(LEXICON_SCHEMA_COLLECTION.to_string());
7373- }
7474-7575- // Connect and process events. If the collection list changes
7676- // mid-stream, `run` returns so we can reconnect with new filters.
7777- match run(
7878- &db,
7979- &jetstream_url,
8080- &cursor,
8181- &wanted,
8282- &mut collections_rx,
8383- &lexicons,
8484- &collections_tx,
8585- )
8686- .await
8787- {
8888- Ok(()) => {
8989- tracing::info!("jetstream reconnecting due to collection change");
9090- }
9191- Err(e) => {
9292- tracing::warn!("jetstream disconnected: {e}");
9393- tokio::time::sleep(std::time::Duration::from_secs(2)).await;
9494- tracing::info!("reconnecting to jetstream...");
9595- }
9696- }
9797- }
9898- });
9999-}
100100-101101-// ---------------------------------------------------------------------------
102102-// Connection loop
103103-// ---------------------------------------------------------------------------
104104-105105-async fn run(
106106- db: &PgPool,
107107- jetstream_url: &str,
108108- cursor: &Arc<AtomicI64>,
109109- collections: &[String],
110110- collections_rx: &mut watch::Receiver<Vec<String>>,
111111- lexicons: &LexiconRegistry,
112112- collections_tx: &watch::Sender<Vec<String>>,
113113-) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
114114- let wanted: String = collections
115115- .iter()
116116- .map(|c| format!("wantedCollections={c}"))
117117- .collect::<Vec<_>>()
118118- .join("&");
119119-120120- let mut url = format!("{jetstream_url}?{wanted}");
121121-122122- let last = cursor.load(Ordering::Relaxed);
123123- if last > 0 {
124124- // Rewind 5 seconds for gapless playback.
125125- let rewound = last - 5_000_000;
126126- url.push_str(&format!("&cursor={rewound}"));
127127- tracing::info!(cursor = rewound, "resuming jetstream with cursor");
128128- }
129129-130130- tracing::info!(collections = ?collections, "connecting to jetstream");
131131-132132- let (ws, _) = tokio_tungstenite::connect_async(&url).await?;
133133- tracing::info!("connected to jetstream");
134134-135135- let (_, mut read) = ws.split();
136136-137137- loop {
138138- tokio::select! {
139139- msg = read.next() => {
140140- let msg = match msg {
141141- Some(Ok(m)) => m,
142142- Some(Err(e)) => return Err(e.into()),
143143- None => break,
144144- };
145145-146146- let text = match msg {
147147- Message::Text(t) => t,
148148- Message::Close(_) => break,
149149- _ => continue,
150150- };
151151-152152- let event: JetstreamEvent = match serde_json::from_str(&text) {
153153- Ok(e) => e,
154154- Err(e) => {
155155- tracing::debug!("skipping unparseable event: {e}");
156156- continue;
157157- }
158158- };
159159-160160- // Update cursor.
161161- cursor.store(event.time_us, Ordering::Relaxed);
162162-163163- if event.kind != "commit" {
164164- continue;
165165- }
166166-167167- let commit = match event.commit {
168168- Some(c) => c,
169169- None => continue,
170170- };
171171-172172- let uri = format!(
173173- "at://{}/{}/{}",
174174- event.did, commit.collection, commit.rkey,
175175- );
176176-177177- // Handle lexicon schema events for tracked network lexicons.
178178- if commit.collection == LEXICON_SCHEMA_COLLECTION {
179179- handle_lexicon_schema_event(
180180- db,
181181- lexicons,
182182- collections_tx,
183183- &event.did,
184184- &commit,
185185- )
186186- .await;
187187- continue;
188188- }
189189-190190- match commit.operation.as_str() {
191191- "create" | "update" => {
192192- let record = match commit.record {
193193- Some(r) => r,
194194- None => continue,
195195- };
196196- let cid = commit.cid.unwrap_or_default();
197197-198198- if let Err(e) = sqlx::query(
199199- r#"
200200- INSERT INTO records (uri, did, collection, rkey, record, cid, indexed_at)
201201- VALUES ($1, $2, $3, $4, $5, $6, NOW())
202202- ON CONFLICT (uri) DO UPDATE
203203- SET record = EXCLUDED.record,
204204- cid = EXCLUDED.cid,
205205- indexed_at = NOW()
206206- "#,
207207- )
208208- .bind(&uri)
209209- .bind(&event.did)
210210- .bind(&commit.collection)
211211- .bind(&commit.rkey)
212212- .bind(&record)
213213- .bind(&cid)
214214- .execute(db)
215215- .await
216216- {
217217- tracing::warn!(uri = %uri, "failed to upsert record: {e}");
218218- }
219219- }
220220- "delete" => {
221221- if let Err(e) = sqlx::query("DELETE FROM records WHERE uri = $1")
222222- .bind(&uri)
223223- .execute(db)
224224- .await
225225- {
226226- tracing::warn!(uri = %uri, "failed to delete record: {e}");
227227- }
228228- }
229229- _ => {}
230230- }
231231- }
232232- // If the collection list changes, break out to reconnect.
233233- _ = collections_rx.changed() => {
234234- tracing::info!("collection filter changed, will reconnect");
235235- return Ok(());
236236- }
237237- }
238238- }
239239-240240- Ok(())
241241-}
242242-243243-// ---------------------------------------------------------------------------
244244-// Lexicon schema event handler
245245-// ---------------------------------------------------------------------------
246246-247247-/// Handle a `com.atproto.lexicon.schema` commit event for tracked network lexicons.
248248-async fn handle_lexicon_schema_event(
249249- db: &PgPool,
250250- lexicons: &LexiconRegistry,
251251- collections_tx: &watch::Sender<Vec<String>>,
252252- did: &str,
253253- commit: &JetstreamCommit,
254254-) {
255255- let nsid = &commit.rkey;
256256-257257- // Check if this NSID is one we're tracking and the DID matches the authority.
258258- let tracked: Option<(Option<String>,)> = sqlx::query_as(
259259- "SELECT target_collection FROM network_lexicons WHERE nsid = $1 AND authority_did = $2",
260260- )
261261- .bind(nsid)
262262- .bind(did)
263263- .fetch_optional(db)
264264- .await
265265- .unwrap_or(None);
266266-267267- let target_collection = match tracked {
268268- Some((tc,)) => tc,
269269- None => return, // Not a tracked network lexicon.
270270- };
271271-272272- match commit.operation.as_str() {
273273- "create" | "update" => {
274274- let record = match &commit.record {
275275- Some(r) => r,
276276- None => return,
277277- };
278278-279279- let parsed = match ParsedLexicon::parse(
280280- record.clone(),
281281- 1,
282282- target_collection.clone(),
283283- ProcedureAction::Upsert,
284284- ) {
285285- Ok(p) => p,
286286- Err(e) => {
287287- tracing::warn!(nsid, "failed to parse lexicon schema event: {e}");
288288- return;
289289- }
290290- };
291291-292292- let is_record = parsed.lexicon_type == crate::lexicon::LexiconType::Record;
293293-294294- // Upsert into lexicons table.
295295- if let Err(e) = sqlx::query(
296296- r#"
297297- INSERT INTO lexicons (id, lexicon_json, backfill, target_collection)
298298- VALUES ($1, $2, false, $3)
299299- ON CONFLICT (id) DO UPDATE SET
300300- lexicon_json = EXCLUDED.lexicon_json,
301301- target_collection = EXCLUDED.target_collection,
302302- revision = lexicons.revision + 1,
303303- updated_at = NOW()
304304- "#,
305305- )
306306- .bind(nsid)
307307- .bind(record)
308308- .bind(&target_collection)
309309- .execute(db)
310310- .await
311311- {
312312- tracing::warn!(nsid, "failed to upsert lexicon from event: {e}");
313313- return;
314314- }
315315-316316- // Update last_fetched_at.
317317- let _ =
318318- sqlx::query("UPDATE network_lexicons SET last_fetched_at = NOW() WHERE nsid = $1")
319319- .bind(nsid)
320320- .execute(db)
321321- .await;
322322-323323- lexicons.upsert(parsed).await;
324324- tracing::info!(nsid, "updated network lexicon from jetstream event");
325325-326326- if is_record {
327327- let collections = lexicons.get_record_collections().await;
328328- let _ = collections_tx.send(collections);
329329- }
330330- }
331331- "delete" => {
332332- // Remove from lexicons table and registry.
333333- let _ = sqlx::query("DELETE FROM lexicons WHERE id = $1")
334334- .bind(nsid)
335335- .execute(db)
336336- .await;
337337-338338- let was_present = lexicons.remove(nsid).await;
339339- if was_present {
340340- tracing::info!(nsid, "removed network lexicon from jetstream delete event");
341341- let collections = lexicons.get_record_collections().await;
342342- let _ = collections_tx.send(collections);
343343- }
344344- }
345345- _ => {}
346346- }
347347-}
+1-2
src/lib.rs
···11pub mod admin;
22pub mod auth;
33-pub mod backfill;
43pub mod config;
54pub mod error;
66-pub mod jetstream;
75pub mod lexicon;
86pub mod profile;
97pub mod repo;
108pub mod resolve;
119pub mod server;
1010+pub mod tap;
1211pub mod xrpc;
13121413use config::Config;
+23-9
src/main.rs
···11use happyview::config::Config;
22use happyview::lexicon::{LexiconRegistry, ParsedLexicon, ProcedureAction};
33use happyview::resolve::{fetch_lexicon_from_pds, resolve_nsid_authority};
44-use happyview::{AppState, backfill, jetstream, server};
44+use happyview::{AppState, server, tap};
55use tokio::sync::watch;
66use tracing::{info, warn};
77···107107 }
108108109109 let initial_collections = lexicons.get_record_collections().await;
110110+ let initial_collections_for_sync = initial_collections.clone();
110111 let (collections_tx, collections_rx) = watch::channel(initial_collections);
111112112113 let state = AppState {
···117118 collections_tx,
118119 };
119120120120- jetstream::spawn(
121121+ // Sync initial collections to Tap on startup.
122122+ {
123123+ let mut wanted = initial_collections_for_sync;
124124+ if !wanted.contains(&"com.atproto.lexicon.schema".to_string()) {
125125+ wanted.push("com.atproto.lexicon.schema".to_string());
126126+ }
127127+ if let Err(e) = tap::sync_collections(
128128+ &state.http,
129129+ &config.tap_url,
130130+ config.tap_admin_password.as_deref(),
131131+ &wanted,
132132+ )
133133+ .await
134134+ {
135135+ warn!("failed to sync initial collections to tap: {e}");
136136+ }
137137+ }
138138+139139+ tap::spawn(
121140 state.db.clone(),
122122- config.jetstream_url.clone(),
141141+ config.tap_url.clone(),
142142+ config.tap_admin_password.clone(),
123143 collections_rx,
124144 state.lexicons.clone(),
125145 state.collections_tx.clone(),
126126- );
127127- backfill::spawn_worker(
128128- state.db.clone(),
129129- state.http.clone(),
130130- config.relay_url.clone(),
131131- config.plc_url.clone(),
132146 );
133147134148 let app = server::router(state);
+475
src/tap.rs
···11+use futures_util::{SinkExt, StreamExt};
22+use serde::Deserialize;
33+use serde_json::Value;
44+use sqlx::PgPool;
55+use tokio::sync::watch;
66+use tokio_tungstenite::tungstenite::Message;
77+use tokio_tungstenite::tungstenite::client::IntoClientRequest;
88+99+use crate::lexicon::{LexiconRegistry, ParsedLexicon, ProcedureAction};
1010+1111+// ---------------------------------------------------------------------------
1212+// Tap event types (matches Tap's outbox JSON format)
1313+// ---------------------------------------------------------------------------
1414+1515+#[derive(Deserialize)]
1616+struct TapEvent {
1717+ id: u64,
1818+ #[serde(rename = "type")]
1919+ event_type: String,
2020+ record: Option<TapRecordEvent>,
2121+ identity: Option<TapIdentityEvent>,
2222+}
2323+2424+#[derive(Deserialize)]
2525+struct TapRecordEvent {
2626+ did: String,
2727+ collection: String,
2828+ rkey: String,
2929+ action: String,
3030+ record: Option<Value>,
3131+ cid: Option<String>,
3232+ #[allow(dead_code)]
3333+ live: Option<bool>,
3434+}
3535+3636+#[derive(Deserialize)]
3737+#[allow(dead_code)]
3838+struct TapIdentityEvent {
3939+ did: String,
4040+ handle: Option<String>,
4141+ #[serde(rename = "isActive")]
4242+ is_active: Option<bool>,
4343+ status: Option<String>,
4444+}
4545+4646+// ---------------------------------------------------------------------------
4747+// Tap HTTP client helpers
4848+// ---------------------------------------------------------------------------
4949+5050+async fn tap_put(
5151+ http: &reqwest::Client,
5252+ tap_url: &str,
5353+ path: &str,
5454+ password: Option<&str>,
5555+ body: &Value,
5656+) -> Result<(), String> {
5757+ let url = format!("{}{}", tap_url.trim_end_matches('/'), path);
5858+ let mut req = http.put(&url).json(body);
5959+ if let Some(pw) = password {
6060+ req = req.basic_auth("admin", Some(pw));
6161+ }
6262+ let resp = req
6363+ .send()
6464+ .await
6565+ .map_err(|e| format!("tap HTTP request failed: {e}"))?;
6666+ if !resp.status().is_success() {
6767+ let status = resp.status();
6868+ let body = resp.text().await.unwrap_or_default();
6969+ return Err(format!("tap returned {status}: {body}"));
7070+ }
7171+ Ok(())
7272+}
7373+7474+async fn tap_post(
7575+ http: &reqwest::Client,
7676+ tap_url: &str,
7777+ path: &str,
7878+ password: Option<&str>,
7979+ body: &Value,
8080+) -> Result<(), String> {
8181+ let url = format!("{}{}", tap_url.trim_end_matches('/'), path);
8282+ let mut req = http.post(&url).json(body);
8383+ if let Some(pw) = password {
8484+ req = req.basic_auth("admin", Some(pw));
8585+ }
8686+ let resp = req
8787+ .send()
8888+ .await
8989+ .map_err(|e| format!("tap HTTP request failed: {e}"))?;
9090+ if !resp.status().is_success() {
9191+ let status = resp.status();
9292+ let body = resp.text().await.unwrap_or_default();
9393+ return Err(format!("tap returned {status}: {body}"));
9494+ }
9595+ Ok(())
9696+}
9797+9898+/// Sync Tap's collection filters and signal collections with HappyView's
9999+/// current record collections.
100100+pub async fn sync_collections(
101101+ http: &reqwest::Client,
102102+ tap_url: &str,
103103+ tap_admin_password: Option<&str>,
104104+ collections: &[String],
105105+) -> Result<(), String> {
106106+ let body = serde_json::json!({ "collections": collections });
107107+ tap_put(
108108+ http,
109109+ tap_url,
110110+ "/collection-filters",
111111+ tap_admin_password,
112112+ &body,
113113+ )
114114+ .await?;
115115+ tap_put(
116116+ http,
117117+ tap_url,
118118+ "/signal-collections",
119119+ tap_admin_password,
120120+ &body,
121121+ )
122122+ .await?;
123123+ Ok(())
124124+}
125125+126126+/// Add repos to Tap for backfill via POST /repos/add.
127127+pub async fn add_repos(
128128+ http: &reqwest::Client,
129129+ tap_url: &str,
130130+ tap_admin_password: Option<&str>,
131131+ dids: &[String],
132132+) -> Result<(), String> {
133133+ let body = serde_json::json!({ "dids": dids });
134134+ tap_post(http, tap_url, "/repos/add", tap_admin_password, &body).await
135135+}
136136+137137+// ---------------------------------------------------------------------------
138138+// Public API
139139+// ---------------------------------------------------------------------------
140140+141141+/// The static collection we always include for lexicon schema updates.
142142+const LEXICON_SCHEMA_COLLECTION: &str = "com.atproto.lexicon.schema";
143143+144144+/// Spawn a background task that connects to Tap's WebSocket channel and
145145+/// processes record + identity events. Replaces both jetstream and backfill.
146146+///
147147+/// When the collection list changes (via `collections_rx`), the task syncs
148148+/// the updated filters to Tap's HTTP API.
149149+pub fn spawn(
150150+ db: PgPool,
151151+ tap_url: String,
152152+ tap_admin_password: Option<String>,
153153+ mut collections_rx: watch::Receiver<Vec<String>>,
154154+ lexicons: LexiconRegistry,
155155+ collections_tx: watch::Sender<Vec<String>>,
156156+) {
157157+ let http = reqwest::Client::new();
158158+159159+ tokio::spawn(async move {
160160+ loop {
161161+ // Build WebSocket URL from HTTP URL.
162162+ let ws_url = build_ws_url(&tap_url);
163163+164164+ match run(
165165+ &db,
166166+ &http,
167167+ &tap_url,
168168+ tap_admin_password.as_deref(),
169169+ &ws_url,
170170+ &mut collections_rx,
171171+ &lexicons,
172172+ &collections_tx,
173173+ )
174174+ .await
175175+ {
176176+ Ok(()) => {
177177+ tracing::info!("tap reconnecting due to collection change");
178178+ }
179179+ Err(e) => {
180180+ tracing::warn!("tap disconnected: {e}");
181181+ tokio::time::sleep(std::time::Duration::from_secs(2)).await;
182182+ tracing::info!("reconnecting to tap...");
183183+ }
184184+ }
185185+ }
186186+ });
187187+}
188188+189189+fn build_ws_url(tap_url: &str) -> String {
190190+ let base = tap_url.trim_end_matches('/');
191191+ let ws_base = if let Some(rest) = base.strip_prefix("https://") {
192192+ format!("wss://{rest}")
193193+ } else if let Some(rest) = base.strip_prefix("http://") {
194194+ format!("ws://{rest}")
195195+ } else {
196196+ format!("ws://{base}")
197197+ };
198198+ format!("{ws_base}/channel")
199199+}
200200+201201+// ---------------------------------------------------------------------------
202202+// Connection loop
203203+// ---------------------------------------------------------------------------
204204+205205+#[allow(clippy::too_many_arguments)]
206206+async fn run(
207207+ db: &PgPool,
208208+ http: &reqwest::Client,
209209+ tap_url: &str,
210210+ tap_admin_password: Option<&str>,
211211+ ws_url: &str,
212212+ collections_rx: &mut watch::Receiver<Vec<String>>,
213213+ lexicons: &LexiconRegistry,
214214+ collections_tx: &watch::Sender<Vec<String>>,
215215+) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
216216+ tracing::info!(url = %ws_url, "connecting to tap");
217217+218218+ let mut request = ws_url.to_string().into_client_request()?;
219219+ if let Some(pw) = tap_admin_password {
220220+ use base64::Engine;
221221+ let encoded = base64::engine::general_purpose::STANDARD.encode(format!("admin:{pw}"));
222222+ request
223223+ .headers_mut()
224224+ .insert("Authorization", format!("Basic {encoded}").parse().unwrap());
225225+ }
226226+227227+ let (ws, _): (
228228+ tokio_tungstenite::WebSocketStream<
229229+ tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
230230+ >,
231231+ _,
232232+ ) = tokio_tungstenite::connect_async(request).await?;
233233+ tracing::info!("connected to tap");
234234+235235+ let (mut write, mut read) = ws.split();
236236+237237+ loop {
238238+ tokio::select! {
239239+ msg = read.next() => {
240240+ let msg = match msg {
241241+ Some(Ok(m)) => m,
242242+ Some(Err(e)) => return Err(e.into()),
243243+ None => break,
244244+ };
245245+246246+ let text = match msg {
247247+ Message::Text(t) => t,
248248+ Message::Close(_) => break,
249249+ _ => continue,
250250+ };
251251+252252+ let event: TapEvent = match serde_json::from_str(&text) {
253253+ Ok(e) => e,
254254+ Err(e) => {
255255+ tracing::debug!("skipping unparseable tap event: {e}");
256256+ continue;
257257+ }
258258+ };
259259+260260+ let event_id = event.id;
261261+262262+ match event.event_type.as_str() {
263263+ "record" => {
264264+ if let Some(record) = event.record {
265265+ handle_record_event(db, lexicons, collections_tx, &record).await;
266266+ }
267267+ }
268268+ "identity" => {
269269+ if let Some(identity) = event.identity {
270270+ tracing::debug!(
271271+ did = %identity.did,
272272+ handle = ?identity.handle,
273273+ "received identity event from tap"
274274+ );
275275+ }
276276+ }
277277+ other => {
278278+ tracing::debug!(event_type = %other, "unknown tap event type");
279279+ }
280280+ }
281281+282282+ // Ack the event.
283283+ let ack = serde_json::json!({ "type": "ack", "id": event_id });
284284+ if let Err(e) = write.send(Message::Text(ack.to_string().into())).await {
285285+ tracing::warn!("failed to send ack: {e}");
286286+ return Err(e.into());
287287+ }
288288+ }
289289+ // If the collection list changes, sync to Tap and continue.
290290+ _ = collections_rx.changed() => {
291291+ let collections = collections_rx.borrow_and_update().clone();
292292+ tracing::info!(?collections, "collection filter changed, syncing to tap");
293293+294294+ // Always include the lexicon schema collection.
295295+ let mut wanted = collections;
296296+ if !wanted.contains(&LEXICON_SCHEMA_COLLECTION.to_string()) {
297297+ wanted.push(LEXICON_SCHEMA_COLLECTION.to_string());
298298+ }
299299+300300+ if let Err(e) = sync_collections(http, tap_url, tap_admin_password, &wanted).await {
301301+ tracing::warn!("failed to sync collections to tap: {e}");
302302+ }
303303+ }
304304+ }
305305+ }
306306+307307+ Ok(())
308308+}
309309+310310+// ---------------------------------------------------------------------------
311311+// Record event handler
312312+// ---------------------------------------------------------------------------
313313+314314+async fn handle_record_event(
315315+ db: &PgPool,
316316+ lexicons: &LexiconRegistry,
317317+ collections_tx: &watch::Sender<Vec<String>>,
318318+ record: &TapRecordEvent,
319319+) {
320320+ let uri = format!("at://{}/{}/{}", record.did, record.collection, record.rkey,);
321321+322322+ // Handle lexicon schema events for tracked network lexicons.
323323+ if record.collection == LEXICON_SCHEMA_COLLECTION {
324324+ handle_lexicon_schema_event(db, lexicons, collections_tx, &record.did, record).await;
325325+ return;
326326+ }
327327+328328+ match record.action.as_str() {
329329+ "create" | "update" => {
330330+ let rec = match &record.record {
331331+ Some(r) => r,
332332+ None => return,
333333+ };
334334+ let cid = record.cid.as_deref().unwrap_or_default();
335335+336336+ if let Err(e) = sqlx::query(
337337+ r#"
338338+ INSERT INTO records (uri, did, collection, rkey, record, cid, indexed_at)
339339+ VALUES ($1, $2, $3, $4, $5, $6, NOW())
340340+ ON CONFLICT (uri) DO UPDATE
341341+ SET record = EXCLUDED.record,
342342+ cid = EXCLUDED.cid,
343343+ indexed_at = NOW()
344344+ "#,
345345+ )
346346+ .bind(&uri)
347347+ .bind(&record.did)
348348+ .bind(&record.collection)
349349+ .bind(&record.rkey)
350350+ .bind(rec)
351351+ .bind(cid)
352352+ .execute(db)
353353+ .await
354354+ {
355355+ tracing::warn!(uri = %uri, "failed to upsert record: {e}");
356356+ }
357357+ }
358358+ "delete" => {
359359+ if let Err(e) = sqlx::query("DELETE FROM records WHERE uri = $1")
360360+ .bind(&uri)
361361+ .execute(db)
362362+ .await
363363+ {
364364+ tracing::warn!(uri = %uri, "failed to delete record: {e}");
365365+ }
366366+ }
367367+ _ => {}
368368+ }
369369+}
370370+371371+// ---------------------------------------------------------------------------
372372+// Lexicon schema event handler
373373+// ---------------------------------------------------------------------------
374374+375375+/// Handle a `com.atproto.lexicon.schema` record event for tracked network lexicons.
376376+async fn handle_lexicon_schema_event(
377377+ db: &PgPool,
378378+ lexicons: &LexiconRegistry,
379379+ collections_tx: &watch::Sender<Vec<String>>,
380380+ did: &str,
381381+ record: &TapRecordEvent,
382382+) {
383383+ let nsid = &record.rkey;
384384+385385+ // Check if this NSID is one we're tracking and the DID matches the authority.
386386+ let tracked: Option<(Option<String>,)> = sqlx::query_as(
387387+ "SELECT target_collection FROM network_lexicons WHERE nsid = $1 AND authority_did = $2",
388388+ )
389389+ .bind(nsid)
390390+ .bind(did)
391391+ .fetch_optional(db)
392392+ .await
393393+ .unwrap_or(None);
394394+395395+ let target_collection = match tracked {
396396+ Some((tc,)) => tc,
397397+ None => return, // Not a tracked network lexicon.
398398+ };
399399+400400+ match record.action.as_str() {
401401+ "create" | "update" => {
402402+ let rec = match &record.record {
403403+ Some(r) => r,
404404+ None => return,
405405+ };
406406+407407+ let parsed = match ParsedLexicon::parse(
408408+ rec.clone(),
409409+ 1,
410410+ target_collection.clone(),
411411+ ProcedureAction::Upsert,
412412+ ) {
413413+ Ok(p) => p,
414414+ Err(e) => {
415415+ tracing::warn!(nsid, "failed to parse lexicon schema event: {e}");
416416+ return;
417417+ }
418418+ };
419419+420420+ let is_record = parsed.lexicon_type == crate::lexicon::LexiconType::Record;
421421+422422+ // Upsert into lexicons table.
423423+ if let Err(e) = sqlx::query(
424424+ r#"
425425+ INSERT INTO lexicons (id, lexicon_json, backfill, target_collection)
426426+ VALUES ($1, $2, false, $3)
427427+ ON CONFLICT (id) DO UPDATE SET
428428+ lexicon_json = EXCLUDED.lexicon_json,
429429+ target_collection = EXCLUDED.target_collection,
430430+ revision = lexicons.revision + 1,
431431+ updated_at = NOW()
432432+ "#,
433433+ )
434434+ .bind(nsid)
435435+ .bind(rec)
436436+ .bind(&target_collection)
437437+ .execute(db)
438438+ .await
439439+ {
440440+ tracing::warn!(nsid, "failed to upsert lexicon from event: {e}");
441441+ return;
442442+ }
443443+444444+ // Update last_fetched_at.
445445+ let _ =
446446+ sqlx::query("UPDATE network_lexicons SET last_fetched_at = NOW() WHERE nsid = $1")
447447+ .bind(nsid)
448448+ .execute(db)
449449+ .await;
450450+451451+ lexicons.upsert(parsed).await;
452452+ tracing::info!(nsid, "updated network lexicon from tap event");
453453+454454+ if is_record {
455455+ let collections = lexicons.get_record_collections().await;
456456+ let _ = collections_tx.send(collections);
457457+ }
458458+ }
459459+ "delete" => {
460460+ // Remove from lexicons table and registry.
461461+ let _ = sqlx::query("DELETE FROM lexicons WHERE id = $1")
462462+ .bind(nsid)
463463+ .execute(db)
464464+ .await;
465465+466466+ let was_present = lexicons.remove(nsid).await;
467467+ if was_present {
468468+ tracing::info!(nsid, "removed network lexicon from tap delete event");
469469+ let collections = lexicons.get_record_collections().await;
470470+ let _ = collections_tx.send(collections);
471471+ }
472472+ }
473473+ _ => {}
474474+ }
475475+}
+2-1
tests/common/app.rs
···3535 port: 0,
3636 database_url: String::new(), // not used — pool is already connected
3737 aip_url: mock_url.clone(),
3838- jetstream_url: String::new(),
3838+ tap_url: "http://localhost:2480".into(),
3939+ tap_admin_password: None,
3940 relay_url: mock_url.clone(),
4041 plc_url: mock_url.clone(),
4142 static_dir: "./web/out".into(),