very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[api] return json if body was json or accept header is json in /repos/*

dawn b3d49c28 41c076cf

+58 -21
+6 -4
README.md
··· 255 255 256 256 <small>[<- back to toc](#table-of-contents)</small> 257 257 258 - - `GET /repos`: get an NDJSON stream of repositories and their sync status. supports pagination and filtering: 258 + all `/repos` endpoints that return lists respond with NDJSON by default. send `Accept: application/json` or `Content-Type: application/json` to get a JSON array instead. 259 + 260 + - `GET /repos`: get a list of repositories and their sync status. supports pagination and filtering: 259 261 - `limit`: max results (default 100, max 1000) 260 262 - `cursor`: opaque key for paginating. 261 263 - `partition`: `all` (default), `pending` (backfill queue), or `resync` (retries) ··· 264 266 available before the repo has been backfilled once at least). 265 267 - `PUT /repos`: explicitly track repositories. accepts an NDJSON body of `{"did": "..."}` (or JSON array of the same). 266 268 only affects repositories that are not known or are untracked. 267 - returns a JSON array of the DIDs that were tracked. 269 + returns a list of the DIDs that were queued for backfill. 268 270 - `DELETE /repos`: untrack repositories. 269 271 accepts an NDJSON body of `{"did": "..."}` (or JSON array of the same). 270 272 only affects repositories that are currently tracked. 271 - returns a JSON array of the DIDs that were untracked. 273 + returns a list of the DIDs that were untracked. 272 274 - `POST /repos/resync`: force a new backfill for one or more repositories. 273 275 accepts an NDJSON body of `{"did": "..."}` (or JSON array of the same). 274 276 only affects repositories hydrant already knows about. 275 - returns a JSON array of the DIDs that were queued. 277 + returns a list of the DIDs that were queued. 276 278 277 279 ### database operations 278 280
+51 -17
src/api/repos.rs
··· 4 4 Json, Router, 5 5 body::Body, 6 6 extract::{Path, Query, State}, 7 - http::{StatusCode, header}, 7 + http::{HeaderMap, StatusCode, header}, 8 8 response::{IntoResponse, Response}, 9 9 routing::{delete, get, post, put}, 10 10 }; ··· 35 35 pub async fn handle_get_repos( 36 36 State(hydrant): State<Hydrant>, 37 37 Query(params): Query<GetReposParams>, 38 + headers: HeaderMap, 38 39 ) -> Result<Response, (StatusCode, String)> { 39 40 let limit = params.limit.unwrap_or(100).min(1000); 40 41 let partition = params.partition.unwrap_or_else(|| "all".to_string()); ··· 113 114 .await 114 115 .map_err(internal)??; 115 116 117 + if prefers_json(&headers) { 118 + return Ok(Json(items).into_response()); 119 + } 120 + 116 121 use futures::StreamExt; 117 122 118 123 let stream = futures::stream::iter(items.into_iter().map(|item| { ··· 144 149 145 150 pub async fn handle_put_repos( 146 151 State(hydrant): State<Hydrant>, 147 - req: axum::extract::Request, 148 - ) -> Result<Json<Vec<String>>, (StatusCode, String)> { 149 - let items = parse_body(req).await?; 152 + headers: HeaderMap, 153 + body: Body, 154 + ) -> Result<Response, (StatusCode, String)> { 155 + let items = parse_body(body, &headers).await?; 150 156 151 157 let dids: Vec<Did<'static>> = items 152 158 .into_iter() ··· 159 165 .await 160 166 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 161 167 162 - Ok(Json(queued.into_iter().map(|d| d.to_string()).collect())) 168 + Ok(did_list_response(queued, &headers)) 163 169 } 164 170 165 171 pub async fn handle_delete_repos( 166 172 State(hydrant): State<Hydrant>, 167 - req: axum::extract::Request, 168 - ) -> Result<Json<Vec<String>>, (StatusCode, String)> { 169 - let items = parse_body(req).await?; 173 + headers: HeaderMap, 174 + body: Body, 175 + ) -> Result<Response, (StatusCode, String)> { 176 + let items = parse_body(body, &headers).await?; 170 177 171 178 let dids: Vec<Did<'static>> = items 172 179 .into_iter() ··· 179 186 .await 180 187 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 181 188 182 - Ok(Json(untracked.into_iter().map(|d| d.to_string()).collect())) 189 + Ok(did_list_response(untracked, &headers)) 183 190 } 184 191 185 192 pub async fn handle_post_resync( 186 193 State(hydrant): State<Hydrant>, 187 - req: axum::extract::Request, 188 - ) -> Result<Json<Vec<String>>, (StatusCode, String)> { 189 - let items = parse_body(req).await?; 194 + headers: HeaderMap, 195 + body: Body, 196 + ) -> Result<Response, (StatusCode, String)> { 197 + let items = parse_body(body, &headers).await?; 190 198 191 199 let dids: Vec<Did<'static>> = items 192 200 .into_iter() ··· 199 207 .await 200 208 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 201 209 202 - Ok(Json(queued.into_iter().map(|d| d.to_string()).collect())) 210 + Ok(did_list_response(queued, &headers)) 203 211 } 204 212 205 - async fn parse_body(req: axum::extract::Request) -> Result<Vec<RepoRequest>, (StatusCode, String)> { 206 - let content_type = req 207 - .headers() 213 + fn prefers_json(headers: &HeaderMap) -> bool { 214 + let contains_json = |h: axum::http::HeaderName| { 215 + headers 216 + .get(h) 217 + .and_then(|v| v.to_str().ok()) 218 + .is_some_and(|v| v.contains("application/json")) 219 + }; 220 + contains_json(header::ACCEPT) || contains_json(header::CONTENT_TYPE) 221 + } 222 + 223 + fn did_list_response(dids: Vec<Did<'static>>, headers: &HeaderMap) -> Response { 224 + if prefers_json(headers) { 225 + let body: Vec<String> = dids.into_iter().map(|d| d.to_string()).collect(); 226 + Json(body).into_response() 227 + } else { 228 + let body = dids 229 + .iter() 230 + .filter_map(|d| serde_json::to_string(&d.as_str()).ok()) 231 + .map(|s| format!("{s}\n")) 232 + .collect::<String>(); 233 + ([(header::CONTENT_TYPE, "application/x-ndjson")], body).into_response() 234 + } 235 + } 236 + 237 + async fn parse_body( 238 + body: Body, 239 + headers: &HeaderMap, 240 + ) -> Result<Vec<RepoRequest>, (StatusCode, String)> { 241 + let content_type = headers 208 242 .get(header::CONTENT_TYPE) 209 243 .and_then(|h| h.to_str().ok()) 210 244 .unwrap_or("") 211 245 .to_string(); 212 246 213 - let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX) 247 + let body_bytes = axum::body::to_bytes(body, usize::MAX) 214 248 .await 215 249 .map_err(bad_request)?; 216 250
+1
src/db/types.rs
··· 368 368 ); 369 369 370 370 impl DidKey<'_> { 371 + #[allow(dead_code)] 371 372 pub fn from_did_key(s: &str) -> miette::Result<Self> { 372 373 let multibase_str = s 373 374 .strip_prefix("did:key:")