···11+ALTER TABLE dead_letter_hooks ADD COLUMN resolved_at TEXT;
22+CREATE INDEX idx_dead_letter_hooks_resolved_at ON dead_letter_hooks (resolved_at);
+13-1
packages/docs/docs/guides/event-logs.md
···7979| `hook.executed` | info | Record AT URI | `lexicon_id` |
8080| `hook.dead_lettered` | error | Record AT URI | `lexicon_id`, `error` |
81818282-Logged when [index hooks](index-hooks.md) run. Dead-lettered events indicate a hook failed all retry attempts.
8282+Logged when [index hooks](index-hooks.md) run. Dead-lettered events indicate a hook failed all retry attempts. You can manage dead letters from the **Data > Dead Letters** page in the dashboard — see [Dead Letters](#dead-letters) below.
83838484### Backfill events
8585···127127Set `EVENT_LOG_RETENTION_DAYS=0` to disable automatic cleanup and keep logs indefinitely.
128128129129See [Configuration](../getting-started/configuration.md) for all environment variables.
130130+131131+## Dead Letters
132132+133133+When an index hook fails after all retry attempts, the event is stored in the dead letters queue. You can manage dead letters from the **Data > Dead Letters** page in the dashboard.
134134+135135+From the dead letters page you can:
136136+137137+- **Retry Hook** — replay the stored record through the index hook (use after fixing a hook script)
138138+- **Re-index** — fetch the record fresh from the PDS and run it through the full indexing pipeline (use when the record may have changed)
139139+- **Dismiss** — mark the dead letter as resolved without retrying
140140+141141+Bulk actions are available for selected rows or all entries matching the current filters.
130142131143## Next steps
132144
+600
src/admin/dead_letters.rs
···11+use axum::{
22+ Json,
33+ extract::{Path, Query, State},
44+};
55+use serde::{Deserialize, Serialize};
66+use serde_json::Value;
77+88+use super::auth::UserAuth;
99+use super::permissions::Permission;
1010+use crate::AppState;
1111+use crate::db::{adapt_sql, now_rfc3339, parse_dt};
1212+use crate::error::AppError;
1313+use crate::lua::{HookEvent, run_hook_once};
1414+use crate::record_handler::RecordEvent;
1515+1616+// ---------------------------------------------------------------------------
1717+// Query / request / response types
1818+// ---------------------------------------------------------------------------
1919+2020+#[derive(Deserialize)]
2121+pub struct ListQuery {
2222+ pub collection: Option<String>,
2323+ pub resolved: Option<String>,
2424+ pub cursor: Option<String>,
2525+ pub limit: Option<i64>,
2626+}
2727+2828+#[derive(Deserialize)]
2929+pub struct CountQuery {
3030+ pub resolved: Option<String>,
3131+}
3232+3333+#[derive(Serialize)]
3434+pub struct DeadLetterSummary {
3535+ pub id: String,
3636+ pub lexicon_id: String,
3737+ pub uri: String,
3838+ pub did: String,
3939+ pub collection: String,
4040+ pub rkey: String,
4141+ pub action: String,
4242+ pub error: String,
4343+ pub attempts: i64,
4444+ pub created_at: chrono::DateTime<chrono::Utc>,
4545+ #[serde(skip_serializing_if = "Option::is_none")]
4646+ pub resolved_at: Option<chrono::DateTime<chrono::Utc>>,
4747+}
4848+4949+#[derive(Serialize)]
5050+pub struct DeadLetterDetail {
5151+ #[serde(flatten)]
5252+ pub summary: DeadLetterSummary,
5353+ pub record: Option<Value>,
5454+}
5555+5656+#[derive(Serialize)]
5757+pub struct ListResponse {
5858+ pub dead_letters: Vec<DeadLetterSummary>,
5959+ #[serde(skip_serializing_if = "Option::is_none")]
6060+ pub cursor: Option<String>,
6161+}
6262+6363+#[derive(Serialize)]
6464+pub struct CountResponse {
6565+ pub count: i64,
6666+}
6767+6868+#[derive(Deserialize)]
6969+pub struct BulkRequest {
7070+ pub ids: Option<Vec<String>>,
7171+ pub all: Option<bool>,
7272+ pub collection: Option<String>,
7373+}
7474+7575+/// Internal row type for fetching action data needed by retry/reindex.
7676+#[allow(dead_code)]
7777+struct DeadLetterRow {
7878+ id: String,
7979+ lexicon_id: String,
8080+ uri: String,
8181+ did: String,
8282+ collection: String,
8383+ rkey: String,
8484+ action: String,
8585+ record: Option<String>,
8686+ error: String,
8787+ attempts: i64,
8888+}
8989+9090+// ---------------------------------------------------------------------------
9191+// Handlers
9292+// ---------------------------------------------------------------------------
9393+9494+/// GET /admin/dead-letters
9595+pub(super) async fn list(
9696+ auth: UserAuth,
9797+ State(state): State<AppState>,
9898+ Query(query): Query<ListQuery>,
9999+) -> Result<Json<ListResponse>, AppError> {
100100+ auth.require(Permission::DeadLettersRead).await?;
101101+ let backend = state.db_backend;
102102+ let limit = query.limit.unwrap_or(50).clamp(1, 100);
103103+104104+ let mut sql = String::from(
105105+ "SELECT id, lexicon_id, uri, did, collection, rkey, action, error, attempts, created_at, resolved_at
106106+ FROM dead_letter_hooks WHERE 1=1",
107107+ );
108108+109109+ let resolved_filter = query.resolved.as_deref().unwrap_or("false");
110110+ match resolved_filter {
111111+ "false" => sql.push_str(" AND resolved_at IS NULL"),
112112+ "true" => sql.push_str(" AND resolved_at IS NOT NULL"),
113113+ _ => {} // no filter
114114+ }
115115+116116+ if query.collection.is_some() {
117117+ sql.push_str(" AND collection = ?");
118118+ }
119119+ if query.cursor.is_some() {
120120+ sql.push_str(" AND created_at < ?");
121121+ }
122122+123123+ sql.push_str(" ORDER BY created_at DESC LIMIT ?");
124124+125125+ let sql = adapt_sql(&sql, backend);
126126+127127+ #[allow(clippy::type_complexity)]
128128+ let mut q = sqlx::query_as::<
129129+ _,
130130+ (
131131+ String,
132132+ String,
133133+ String,
134134+ String,
135135+ String,
136136+ String,
137137+ String,
138138+ String,
139139+ i64,
140140+ String,
141141+ Option<String>,
142142+ ),
143143+ >(&sql);
144144+145145+ if let Some(ref collection) = query.collection {
146146+ q = q.bind(collection);
147147+ }
148148+ if let Some(ref cursor) = query.cursor {
149149+ q = q.bind(cursor);
150150+ }
151151+ q = q.bind(limit);
152152+153153+ let rows = q
154154+ .fetch_all(&state.db)
155155+ .await
156156+ .map_err(|e| AppError::Internal(format!("failed to query dead letters: {e}")))?;
157157+158158+ let dead_letters: Vec<DeadLetterSummary> = rows
159159+ .into_iter()
160160+ .map(|row| DeadLetterSummary {
161161+ id: row.0,
162162+ lexicon_id: row.1,
163163+ uri: row.2,
164164+ did: row.3,
165165+ collection: row.4,
166166+ rkey: row.5,
167167+ action: row.6,
168168+ error: row.7,
169169+ attempts: row.8,
170170+ created_at: parse_dt(&row.9),
171171+ resolved_at: row.10.as_deref().map(parse_dt),
172172+ })
173173+ .collect();
174174+175175+ let cursor = if dead_letters.len() as i64 >= limit {
176176+ dead_letters.last().map(|dl| dl.created_at.to_rfc3339())
177177+ } else {
178178+ None
179179+ };
180180+181181+ Ok(Json(ListResponse {
182182+ dead_letters,
183183+ cursor,
184184+ }))
185185+}
186186+187187+/// GET /admin/dead-letters/count
188188+pub(super) async fn count(
189189+ auth: UserAuth,
190190+ State(state): State<AppState>,
191191+ Query(query): Query<CountQuery>,
192192+) -> Result<Json<CountResponse>, AppError> {
193193+ auth.require(Permission::DeadLettersRead).await?;
194194+ let backend = state.db_backend;
195195+196196+ let mut sql = String::from("SELECT COUNT(*) FROM dead_letter_hooks WHERE 1=1");
197197+198198+ let resolved_filter = query.resolved.as_deref().unwrap_or("false");
199199+ match resolved_filter {
200200+ "false" => sql.push_str(" AND resolved_at IS NULL"),
201201+ "true" => sql.push_str(" AND resolved_at IS NOT NULL"),
202202+ _ => {}
203203+ }
204204+205205+ let sql = adapt_sql(&sql, backend);
206206+ let (count,): (i64,) = sqlx::query_as(&sql)
207207+ .fetch_one(&state.db)
208208+ .await
209209+ .map_err(|e| AppError::Internal(format!("failed to count dead letters: {e}")))?;
210210+211211+ Ok(Json(CountResponse { count }))
212212+}
213213+214214+/// GET /admin/dead-letters/{id}
215215+pub(super) async fn detail(
216216+ auth: UserAuth,
217217+ State(state): State<AppState>,
218218+ Path(id): Path<String>,
219219+) -> Result<Json<DeadLetterDetail>, AppError> {
220220+ auth.require(Permission::DeadLettersRead).await?;
221221+ let backend = state.db_backend;
222222+223223+ let sql = adapt_sql(
224224+ "SELECT id, lexicon_id, uri, did, collection, rkey, action, error, attempts, created_at, resolved_at, record
225225+ FROM dead_letter_hooks WHERE id = ?",
226226+ backend,
227227+ );
228228+229229+ #[allow(clippy::type_complexity)]
230230+ let row: (
231231+ String,
232232+ String,
233233+ String,
234234+ String,
235235+ String,
236236+ String,
237237+ String,
238238+ String,
239239+ i64,
240240+ String,
241241+ Option<String>,
242242+ Option<String>,
243243+ ) = sqlx::query_as(&sql)
244244+ .bind(&id)
245245+ .fetch_optional(&state.db)
246246+ .await
247247+ .map_err(|e| AppError::Internal(format!("failed to fetch dead letter: {e}")))?
248248+ .ok_or_else(|| AppError::NotFound(format!("dead letter {id} not found")))?;
249249+250250+ let summary = DeadLetterSummary {
251251+ id: row.0,
252252+ lexicon_id: row.1,
253253+ uri: row.2,
254254+ did: row.3,
255255+ collection: row.4,
256256+ rkey: row.5,
257257+ action: row.6,
258258+ error: row.7,
259259+ attempts: row.8,
260260+ created_at: parse_dt(&row.9),
261261+ resolved_at: row.10.as_deref().map(parse_dt),
262262+ };
263263+264264+ let record = row.11.as_deref().and_then(|r| serde_json::from_str(r).ok());
265265+266266+ Ok(Json(DeadLetterDetail { summary, record }))
267267+}
268268+269269+/// POST /admin/dead-letters/{id}/dismiss
270270+pub(super) async fn dismiss(
271271+ auth: UserAuth,
272272+ State(state): State<AppState>,
273273+ Path(id): Path<String>,
274274+) -> Result<Json<Value>, AppError> {
275275+ auth.require(Permission::DeadLettersManage).await?;
276276+ let dl = fetch_dead_letter_for_action(&state, &id).await?;
277277+ if dl.id.is_empty() {
278278+ return Err(AppError::NotFound(format!("dead letter {id} not found")));
279279+ }
280280+ mark_resolved(&state, &id).await?;
281281+ Ok(Json(serde_json::json!({ "ok": true })))
282282+}
283283+284284+/// POST /admin/dead-letters/{id}/retry
285285+pub(super) async fn retry(
286286+ auth: UserAuth,
287287+ State(state): State<AppState>,
288288+ Path(id): Path<String>,
289289+) -> Result<Json<Value>, AppError> {
290290+ auth.require(Permission::DeadLettersManage).await?;
291291+ retry_single(&state, &id).await?;
292292+ Ok(Json(serde_json::json!({ "ok": true })))
293293+}
294294+295295+/// POST /admin/dead-letters/{id}/reindex
296296+pub(super) async fn reindex(
297297+ auth: UserAuth,
298298+ State(state): State<AppState>,
299299+ Path(id): Path<String>,
300300+) -> Result<Json<Value>, AppError> {
301301+ auth.require(Permission::DeadLettersManage).await?;
302302+ reindex_single(&state, &id).await?;
303303+ Ok(Json(serde_json::json!({ "ok": true })))
304304+}
305305+306306+/// POST /admin/dead-letters/bulk/dismiss
307307+pub(super) async fn bulk_dismiss(
308308+ auth: UserAuth,
309309+ State(state): State<AppState>,
310310+ Json(body): Json<BulkRequest>,
311311+) -> Result<Json<Value>, AppError> {
312312+ auth.require(Permission::DeadLettersManage).await?;
313313+ let backend = state.db_backend;
314314+ let now = now_rfc3339();
315315+316316+ if body.all == Some(true) {
317317+ let mut sql =
318318+ String::from("UPDATE dead_letter_hooks SET resolved_at = ? WHERE resolved_at IS NULL");
319319+ if body.collection.is_some() {
320320+ sql.push_str(" AND collection = ?");
321321+ }
322322+ let sql = adapt_sql(&sql, backend);
323323+ let mut q = sqlx::query(&sql).bind(&now);
324324+ if let Some(ref collection) = body.collection {
325325+ q = q.bind(collection);
326326+ }
327327+ q.execute(&state.db)
328328+ .await
329329+ .map_err(|e| AppError::Internal(format!("bulk dismiss failed: {e}")))?;
330330+ } else if let Some(ref ids) = body.ids {
331331+ for id in ids {
332332+ let sql = adapt_sql(
333333+ "UPDATE dead_letter_hooks SET resolved_at = ? WHERE id = ? AND resolved_at IS NULL",
334334+ backend,
335335+ );
336336+ sqlx::query(&sql)
337337+ .bind(&now)
338338+ .bind(id)
339339+ .execute(&state.db)
340340+ .await
341341+ .map_err(|e| AppError::Internal(format!("bulk dismiss failed for {id}: {e}")))?;
342342+ }
343343+ } else {
344344+ return Err(AppError::BadRequest(
345345+ "must provide 'ids' or 'all: true'".into(),
346346+ ));
347347+ }
348348+349349+ Ok(Json(serde_json::json!({ "ok": true })))
350350+}
351351+352352+/// POST /admin/dead-letters/bulk/retry
353353+pub(super) async fn bulk_retry(
354354+ auth: UserAuth,
355355+ State(state): State<AppState>,
356356+ Json(body): Json<BulkRequest>,
357357+) -> Result<Json<Value>, AppError> {
358358+ auth.require(Permission::DeadLettersManage).await?;
359359+ let ids = resolve_bulk_ids(&state, &body).await?;
360360+ for id in &ids {
361361+ retry_single(&state, id).await?;
362362+ }
363363+ Ok(Json(serde_json::json!({ "ok": true })))
364364+}
365365+366366+/// POST /admin/dead-letters/bulk/reindex
367367+pub(super) async fn bulk_reindex(
368368+ auth: UserAuth,
369369+ State(state): State<AppState>,
370370+ Json(body): Json<BulkRequest>,
371371+) -> Result<Json<Value>, AppError> {
372372+ auth.require(Permission::DeadLettersManage).await?;
373373+ let ids = resolve_bulk_ids(&state, &body).await?;
374374+ for id in &ids {
375375+ reindex_single(&state, id).await?;
376376+ }
377377+ Ok(Json(serde_json::json!({ "ok": true })))
378378+}
379379+380380+// ---------------------------------------------------------------------------
381381+// Helper functions
382382+// ---------------------------------------------------------------------------
383383+384384+/// Fetch an unresolved dead letter by ID, returning an error if not found or already resolved.
385385+async fn fetch_dead_letter_for_action(
386386+ state: &AppState,
387387+ id: &str,
388388+) -> Result<DeadLetterRow, AppError> {
389389+ let backend = state.db_backend;
390390+ let sql = adapt_sql(
391391+ "SELECT id, lexicon_id, uri, did, collection, rkey, action, record, error, attempts
392392+ FROM dead_letter_hooks WHERE id = ? AND resolved_at IS NULL",
393393+ backend,
394394+ );
395395+396396+ let row: (
397397+ String,
398398+ String,
399399+ String,
400400+ String,
401401+ String,
402402+ String,
403403+ String,
404404+ Option<String>,
405405+ String,
406406+ i64,
407407+ ) = sqlx::query_as(&sql)
408408+ .bind(id)
409409+ .fetch_optional(&state.db)
410410+ .await
411411+ .map_err(|e| AppError::Internal(format!("failed to fetch dead letter: {e}")))?
412412+ .ok_or_else(|| {
413413+ AppError::NotFound(format!("dead letter {id} not found or already resolved"))
414414+ })?;
415415+416416+ Ok(DeadLetterRow {
417417+ id: row.0,
418418+ lexicon_id: row.1,
419419+ uri: row.2,
420420+ did: row.3,
421421+ collection: row.4,
422422+ rkey: row.5,
423423+ action: row.6,
424424+ record: row.7,
425425+ error: row.8,
426426+ attempts: row.9,
427427+ })
428428+}
429429+430430+/// Mark a dead letter as resolved.
431431+async fn mark_resolved(state: &AppState, id: &str) -> Result<(), AppError> {
432432+ let backend = state.db_backend;
433433+ let now = now_rfc3339();
434434+ let sql = adapt_sql(
435435+ "UPDATE dead_letter_hooks SET resolved_at = ? WHERE id = ?",
436436+ backend,
437437+ );
438438+ sqlx::query(&sql)
439439+ .bind(&now)
440440+ .bind(id)
441441+ .execute(&state.db)
442442+ .await
443443+ .map_err(|e| AppError::Internal(format!("failed to mark dead letter resolved: {e}")))?;
444444+ Ok(())
445445+}
446446+447447+/// Update the error message and increment attempts.
448448+async fn update_error(state: &AppState, id: &str, error: &str) -> Result<(), AppError> {
449449+ let backend = state.db_backend;
450450+ let sql = adapt_sql(
451451+ "UPDATE dead_letter_hooks SET error = ?, attempts = attempts + 1 WHERE id = ?",
452452+ backend,
453453+ );
454454+ sqlx::query(&sql)
455455+ .bind(error)
456456+ .bind(id)
457457+ .execute(&state.db)
458458+ .await
459459+ .map_err(|e| AppError::Internal(format!("failed to update dead letter error: {e}")))?;
460460+ Ok(())
461461+}
462462+463463+/// Resolve a BulkRequest into a list of dead letter IDs.
464464+async fn resolve_bulk_ids(state: &AppState, body: &BulkRequest) -> Result<Vec<String>, AppError> {
465465+ if body.all == Some(true) {
466466+ let backend = state.db_backend;
467467+ let mut sql = String::from("SELECT id FROM dead_letter_hooks WHERE resolved_at IS NULL");
468468+ if body.collection.is_some() {
469469+ sql.push_str(" AND collection = ?");
470470+ }
471471+ let sql = adapt_sql(&sql, backend);
472472+ let mut q = sqlx::query_as::<_, (String,)>(&sql);
473473+ if let Some(ref collection) = body.collection {
474474+ q = q.bind(collection);
475475+ }
476476+ let rows = q
477477+ .fetch_all(&state.db)
478478+ .await
479479+ .map_err(|e| AppError::Internal(format!("failed to resolve bulk ids: {e}")))?;
480480+ Ok(rows.into_iter().map(|r| r.0).collect())
481481+ } else if let Some(ref ids) = body.ids {
482482+ Ok(ids.clone())
483483+ } else {
484484+ Err(AppError::BadRequest(
485485+ "must provide 'ids' or 'all: true'".into(),
486486+ ))
487487+ }
488488+}
489489+490490+/// Fetch the index_hook script directly from the lexicons table, bypassing the in-memory registry.
491491+async fn get_index_hook_from_db(state: &AppState, lexicon_id: &str) -> Result<Option<String>, AppError> {
492492+ let backend = state.db_backend;
493493+ let sql = adapt_sql(
494494+ "SELECT index_hook FROM lexicons WHERE id = ?",
495495+ backend,
496496+ );
497497+ let row: Option<(Option<String>,)> = sqlx::query_as(&sql)
498498+ .bind(lexicon_id)
499499+ .fetch_optional(&state.db)
500500+ .await
501501+ .map_err(|e| AppError::Internal(format!("failed to fetch index hook: {e}")))?;
502502+ Ok(row.and_then(|r| r.0))
503503+}
504504+505505+/// Retry a single dead letter by re-running its hook script.
506506+async fn retry_single(state: &AppState, id: &str) -> Result<(), AppError> {
507507+ let dl = fetch_dead_letter_for_action(state, id).await?;
508508+509509+ let script = get_index_hook_from_db(state, &dl.lexicon_id)
510510+ .await?
511511+ .ok_or_else(|| {
512512+ AppError::NotFound(format!(
513513+ "no index hook found for lexicon {}",
514514+ dl.lexicon_id
515515+ ))
516516+ })?;
517517+518518+ let record: Option<Value> = dl
519519+ .record
520520+ .as_deref()
521521+ .and_then(|r| serde_json::from_str(r).ok());
522522+523523+ let event = HookEvent {
524524+ state,
525525+ lexicon_id: &dl.lexicon_id,
526526+ script: &script,
527527+ action: &dl.action,
528528+ uri: &dl.uri,
529529+ did: &dl.did,
530530+ collection: &dl.collection,
531531+ rkey: &dl.rkey,
532532+ record: record.as_ref(),
533533+ };
534534+535535+ match run_hook_once(&event).await {
536536+ Ok(_) => {
537537+ mark_resolved(state, id).await?;
538538+ Ok(())
539539+ }
540540+ Err(e) => {
541541+ update_error(state, id, &e).await?;
542542+ Err(AppError::Internal(format!(
543543+ "retry failed for dead letter {id}: {e}"
544544+ )))
545545+ }
546546+ }
547547+}
548548+549549+/// Reindex a single dead letter by fetching the record fresh from the PDS.
550550+async fn reindex_single(state: &AppState, id: &str) -> Result<(), AppError> {
551551+ let dl = fetch_dead_letter_for_action(state, id).await?;
552552+553553+ let pds_endpoint =
554554+ crate::profile::resolve_pds_endpoint(&state.http, &state.config.plc_url, &dl.did).await?;
555555+556556+ let url = format!(
557557+ "{}/xrpc/com.atproto.repo.getRecord?repo={}&collection={}&rkey={}",
558558+ pds_endpoint, dl.did, dl.collection, dl.rkey
559559+ );
560560+561561+ let resp = state
562562+ .http
563563+ .get(&url)
564564+ .send()
565565+ .await
566566+ .map_err(|e| AppError::Internal(format!("failed to fetch record from PDS: {e}")))?;
567567+568568+ if !resp.status().is_success() {
569569+ let status = resp.status();
570570+ let body = resp.text().await.unwrap_or_else(|_| "unknown error".into());
571571+ return Err(AppError::Internal(format!(
572572+ "PDS returned {status} fetching record: {body}"
573573+ )));
574574+ }
575575+576576+ let body: Value = resp
577577+ .json()
578578+ .await
579579+ .map_err(|e| AppError::Internal(format!("failed to parse PDS response: {e}")))?;
580580+581581+ let record = body.get("value").cloned();
582582+ let cid = body
583583+ .get("cid")
584584+ .and_then(|v| v.as_str())
585585+ .map(|s| s.to_string());
586586+587587+ let event = RecordEvent {
588588+ did: dl.did.clone(),
589589+ collection: dl.collection.clone(),
590590+ rkey: dl.rkey.clone(),
591591+ action: dl.action.clone(),
592592+ record,
593593+ cid,
594594+ };
595595+596596+ crate::record_handler::handle_record_event(state, &event).await;
597597+ mark_resolved(state, id).await?;
598598+599599+ Ok(())
600600+}
···969969/// Returns `Ok(None)` when `handle()` returns nil (meaning "skip indexing"),
970970/// `Ok(Some(value))` when it returns a table (use that as the record), or
971971/// `Ok(Some(original))` for other non-nil types.
972972-async fn run_hook_once(event: &HookEvent<'_>) -> Result<Option<Value>, String> {
972972+pub async fn run_hook_once(event: &HookEvent<'_>) -> Result<Option<Value>, String> {
973973 let lua = sandbox::create_sandbox().map_err(|e| format!("failed to create Lua VM: {e}"))?;
974974 let backend = event.state.db_backend;
975975
+1-1
src/lua/mod.rs
···99mod xrpc_api;
10101111pub(crate) use execute::{
1212- HookEvent, execute_hook_script, execute_procedure_script, execute_query_script,
1212+ HookEvent, execute_hook_script, execute_procedure_script, execute_query_script, run_hook_once,
1313};
1414pub(crate) use sandbox::validate_script;