lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

basic cancellation stuff

phil 534ca250 c4b7593b

+75 -3
+1
.gitignore
··· 1 1 /target 2 + /lightrail.db
+1
Cargo.lock
··· 2232 2232 "serde_json", 2233 2233 "thiserror 2.0.18", 2234 2234 "tokio", 2235 + "tokio-util", 2235 2236 "wiremock", 2236 2237 ] 2237 2238
+1
Cargo.toml
··· 18 18 serde = { version = "1", features = ["derive"] } 19 19 thiserror = "2.0.18" 20 20 tokio = { version = "1.49.0", features = ["full"] } 21 + tokio-util = { version = "0.7", features = ["rt"] } 21 22 22 23 [[example]] 23 24 name = "list-repo-collections"
+2
src/error.rs
··· 6 6 Db(#[from] fjall::Error), 7 7 #[error("I/O: {0}")] 8 8 Io(#[from] std::io::Error), 9 + #[error("task panicked: {0}")] 10 + TaskPanic(#[from] tokio::task::JoinError), 9 11 #[error("{0}")] 10 12 Other(String), 11 13 }
+68 -3
src/main.rs
··· 1 - use clap::Parser; 2 1 use std::net::SocketAddr; 3 2 use std::path::PathBuf; 4 3 4 + use clap::Parser; 5 + use tokio_util::sync::CancellationToken; 6 + 5 7 use lightrail::error::Result; 6 8 use lightrail::storage; 9 + use lightrail::sync::{backfill, subscribe_repos}; 7 10 8 11 #[derive(Parser, Debug)] 9 12 #[command(name = "lightrail", about = "listReposByCollection indexing service")] 10 13 struct Args { 11 - /// ATProto relay or PDS host to subscribe to. 14 + /// ATProto relay or PDS host to subscribe to (e.g. bsky.network). 12 15 #[arg(long, env = "LIGHTRAIL_SUBSCRIBE")] 13 16 subscribe: String, 14 17 ··· 24 27 #[tokio::main] 25 28 async fn main() -> Result<()> { 26 29 let args = Args::parse(); 27 - let _db = storage::open(&args.db_path)?; 30 + let db = storage::open(&args.db_path)?; 31 + let token = CancellationToken::new(); 32 + 33 + // Each service runs in its own task. A task that exits with an error 34 + // cancels the token so the other services shut down cleanly. 35 + 36 + let firehose_task = tokio::spawn({ 37 + let token = token.clone(); 38 + let db = db.clone(); 39 + let host = args.subscribe.clone(); 40 + async move { 41 + let mut sub = subscribe_repos::Subscriber::new(host, db); 42 + let result = tokio::select! { 43 + _ = token.cancelled() => return Ok(()), 44 + r = sub.run() => r, 45 + }; 46 + if result.is_err() { 47 + token.cancel(); 48 + } 49 + result 50 + } 51 + }); 52 + 53 + let backfill_task = tokio::spawn({ 54 + let token = token.clone(); 55 + let db = db.clone(); 56 + let host = args.subscribe.clone(); 57 + async move { 58 + let result = tokio::select! { 59 + _ = token.cancelled() => return Ok(()), 60 + r = backfill::run(&host, db) => r, 61 + }; 62 + if result.is_err() { 63 + token.cancel(); 64 + } 65 + result 66 + } 67 + }); 68 + 69 + let server_task = tokio::spawn({ 70 + let token = token.clone(); 71 + let db = db.clone(); 72 + let addr = args.listen; 73 + async move { 74 + let result = tokio::select! { 75 + _ = token.cancelled() => return Ok(()), 76 + r = lightrail::server::serve(addr, db) => r, 77 + }; 78 + if result.is_err() { 79 + token.cancel(); 80 + } 81 + result 82 + } 83 + }); 84 + 85 + tokio::signal::ctrl_c().await?; 86 + eprintln!("Shutting down..."); 87 + token.cancel(); 88 + 89 + let (firehose, backfill, server) = tokio::join!(firehose_task, backfill_task, server_task); 90 + firehose??; 91 + backfill??; 92 + server??; 28 93 29 94 Ok(()) 30 95 }
+2
src/server/handler.rs
··· 2 2 //! 3 3 //! Uses `jacquard-axum`'s `ExtractXrpc` extractor to deserialise query parameters 4 4 //! directly into the lexicon-generated request types. 5 + //! 6 + //! TODO: xrpc-style error handling 5 7 6 8 use axum::{Json, extract::State, http::StatusCode}; 7 9 use jacquard_api::com_atproto::sync::{