Server tools to backfill, tail, mirror, and verify PLC logs
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

remove log

dawn cc23bc9b 0a0777eb

+154 -156
-1
Cargo.lock
··· 37 37 "governor", 38 38 "http-body-util", 39 39 "k256", 40 - "log", 41 40 "multibase", 42 41 "native-tls", 43 42 "opentelemetry",
-1
Cargo.toml
··· 22 22 futures = "0.3.31" 23 23 governor = "0.10.1" 24 24 http-body-util = "0.1.3" 25 - log = "0.4.28" 26 25 native-tls = "0.2.14" 27 26 opentelemetry = "0.30.0" 28 27 opentelemetry-otlp = { version = "0.30.0" }
+7 -7
src/backfill.rs
··· 25 25 let mut workers: JoinSet<anyhow::Result<()>> = JoinSet::new(); 26 26 27 27 let t_step = Instant::now(); 28 - log::info!( 28 + tracing::info!( 29 29 "fetching backfill for {} weeks with {source_workers} workers...", 30 30 weeks.lock().await.len() 31 31 ); ··· 38 38 workers.spawn(async move { 39 39 while let Some(week) = weeks.lock().await.pop() { 40 40 let when = Into::<Dt>::into(week).to_rfc3339(); 41 - log::trace!("worker {w}: fetching week {when} (-{})", week.n_ago()); 41 + tracing::trace!("worker {w}: fetching week {when} (-{})", week.n_ago()); 42 42 week_to_pages(source.clone(), week, dest.clone()) 43 43 .await 44 - .inspect_err(|e| log::error!("failing week_to_pages: {e}"))?; 44 + .inspect_err(|e| tracing::error!("failing week_to_pages: {e}"))?; 45 45 } 46 - log::info!("done with the weeks ig"); 46 + tracing::info!("done with the weeks ig"); 47 47 Ok(()) 48 48 }); 49 49 } ··· 52 52 53 53 // wait for the big backfill to finish 54 54 while let Some(res) = workers.join_next().await { 55 - res.inspect_err(|e| log::error!("problem joining source workers: {e}"))? 56 - .inspect_err(|e| log::error!("problem *from* source worker: {e}"))?; 55 + res.inspect_err(|e| tracing::error!("problem joining source workers: {e}"))? 56 + .inspect_err(|e| tracing::error!("problem *from* source worker: {e}"))?; 57 57 } 58 - log::info!( 58 + tracing::info!( 59 59 "finished fetching backfill in {:?}. senders remaining: {}", 60 60 t_step.elapsed(), 61 61 dest.strong_count()
+3 -3
src/bin/allegedly.rs
··· 95 95 let matches = Cli::command().get_matches(); 96 96 let name = matches.subcommand().map(|(name, _)| name).unwrap_or("???"); 97 97 bin_init(args.command.enable_otel()); 98 - log::info!("{}", logo(name)); 98 + tracing::info!("{}", logo(name)); 99 99 100 100 let globals = args.globals.clone(); 101 101 ··· 116 116 .await 117 117 .expect("to poll upstream") 118 118 }); 119 - log::trace!("ensuring output directory exists"); 119 + tracing::trace!("ensuring output directory exists"); 120 120 create_dir_all(&dest) 121 121 .await 122 122 .expect("to ensure output dir exists"); ··· 143 143 .expect("to write pages to stdout"); 144 144 } 145 145 } 146 - log::info!("whew, {:?}. goodbye!", t0.elapsed()); 146 + tracing::info!("whew, {:?}. goodbye!", t0.elapsed()); 147 147 Ok(()) 148 148 }
+5 -5
src/bin/audit.rs
··· 41 41 while let Some(next) = tasks.join_next().await { 42 42 match next { 43 43 Err(e) if e.is_panic() => { 44 - log::error!("a joinset task panicked: {e}. bailing now. (should we panic?)"); 44 + tracing::error!("a joinset task panicked: {e}. bailing now. (should we panic?)"); 45 45 return Err(e.into()); 46 46 } 47 47 Err(e) => { 48 - log::error!("a joinset task failed to join: {e}"); 48 + tracing::error!("a joinset task failed to join: {e}"); 49 49 return Err(e.into()); 50 50 } 51 51 Ok(Err(e)) => { 52 - log::error!("a joinset task completed with error: {e}"); 52 + tracing::error!("a joinset task completed with error: {e}"); 53 53 return Err(e); 54 54 } 55 55 Ok(Ok(name)) => { 56 - log::trace!("a task completed: {name:?}. {} left", tasks.len()); 56 + tracing::trace!("a task completed: {name:?}. {} left", tasks.len()); 57 57 } 58 58 } 59 59 } ··· 76 76 async fn main() -> anyhow::Result<()> { 77 77 let args = CliArgs::parse(); 78 78 bin_init(args.instrumentation.enable_opentelemetry); 79 - log::info!("{}", logo("audit")); 79 + tracing::info!("{}", logo("audit")); 80 80 run(args.globals, args.args).await?; 81 81 Ok(()) 82 82 }
+12 -12
src/bin/backfill.rs
··· 99 99 if no_bulk { 100 100 // simple mode, just poll upstream from teh beginning 101 101 if http != DEFAULT_HTTP.parse()? { 102 - log::warn!("ignoring non-default bulk http setting since --no-bulk was set"); 102 + tracing::warn!("ignoring non-default bulk http setting since --no-bulk was set"); 103 103 } 104 104 if let Some(d) = dir { 105 - log::warn!("ignoring bulk dir setting ({d:?}) since --no-bulk was set."); 105 + tracing::warn!("ignoring bulk dir setting ({d:?}) since --no-bulk was set."); 106 106 } 107 107 if let Some(u) = until { 108 - log::warn!( 108 + tracing::warn!( 109 109 "ignoring `until` setting ({u:?}) since --no-bulk was set. (feature request?)" 110 110 ); 111 111 } ··· 113 113 upstream.set_path("/export"); 114 114 let throttle = Duration::from_millis(upstream_throttle_ms); 115 115 if let Some(fjall_path) = to_fjall { 116 - log::trace!("opening fjall db at {fjall_path:?}..."); 116 + tracing::trace!("opening fjall db at {fjall_path:?}..."); 117 117 let db = FjallDb::open(&fjall_path)?; 118 - log::trace!("opened fjall db"); 118 + tracing::trace!("opened fjall db"); 119 119 120 120 let (poll_tx, poll_out) = mpsc::channel::<SeqPage>(128); // normal/small pages 121 121 let (full_tx, full_out) = mpsc::channel::<SeqPage>(1); // don't need to buffer at this filter ··· 166 166 167 167 // set up sinks 168 168 if let Some(pg_url) = to_postgres { 169 - log::trace!("connecting to postgres..."); 169 + tracing::trace!("connecting to postgres..."); 170 170 let db = Db::new(pg_url.as_str(), postgres_cert).await?; 171 - log::trace!("connected to postgres"); 171 + tracing::trace!("connected to postgres"); 172 172 173 173 tasks.spawn(backfill_to_pg(db.clone(), reset, bulk_out, found_last_tx)); 174 174 if catch_up { ··· 185 185 while let Some(next) = tasks.join_next().await { 186 186 match next { 187 187 Err(e) if e.is_panic() => { 188 - log::error!("a joinset task panicked: {e}. bailing now. (should we panic?)"); 188 + tracing::error!("a joinset task panicked: {e}. bailing now. (should we panic?)"); 189 189 return Err(e.into()); 190 190 } 191 191 Err(e) => { 192 - log::error!("a joinset task failed to join: {e}"); 192 + tracing::error!("a joinset task failed to join: {e}"); 193 193 return Err(e.into()); 194 194 } 195 195 Ok(Err(e)) => { 196 - log::error!("a joinset task completed with error: {e}"); 196 + tracing::error!("a joinset task completed with error: {e}"); 197 197 return Err(e); 198 198 } 199 199 Ok(Ok(name)) => { 200 - log::trace!("a task completed: {name:?}. {} left", tasks.len()); 200 + tracing::trace!("a task completed: {name:?}. {} left", tasks.len()); 201 201 } 202 202 } 203 203 } ··· 218 218 async fn main() -> anyhow::Result<()> { 219 219 let args = CliArgs::parse(); 220 220 bin_init(false); 221 - log::info!("{}", logo("backfill")); 221 + tracing::info!("{}", logo("backfill")); 222 222 run(args.globals, args.args).await?; 223 223 Ok(()) 224 224 }
+16 -16
src/bin/mirror.rs
··· 105 105 if let Some(ref experimental_domain) = experimental_acme_domain { 106 106 domains.push(experimental_domain.clone()) 107 107 } 108 - log::info!("configuring acme for https at {domains:?}..."); 108 + tracing::info!("configuring acme for https at {domains:?}..."); 109 109 ListenConf::Acme { 110 110 domains, 111 111 cache_path, ··· 127 127 if let Some(fjall_path) = wrap_fjall { 128 128 let db = FjallDb::open(&fjall_path)?; 129 129 if compact_fjall { 130 - log::info!("compacting fjall..."); 130 + tracing::info!("compacting fjall..."); 131 131 db.compact()?; 132 132 } 133 133 134 - log::debug!("getting the latest seq from fjall..."); 134 + tracing::debug!("getting the latest seq from fjall..."); 135 135 let latest_seq = db 136 136 .get_latest()? 137 137 .map(|(seq, _)| seq) 138 138 .expect("there to be at least one op in the db. did you backfill?"); 139 - log::info!("starting seq polling from seq {latest_seq}..."); 139 + tracing::info!("starting seq polling from seq {latest_seq}..."); 140 140 141 141 let (send_page, recv_page) = mpsc::channel::<allegedly::SeqPage>(8); 142 142 ··· 153 153 tasks.spawn(async move { 154 154 let mut current_seq = latest_seq; 155 155 loop { 156 - log::info!("seq polling from seq {current_seq}"); 156 + tracing::info!("seq polling from seq {current_seq}"); 157 157 let (inner_tx, mut inner_rx) = mpsc::channel::<allegedly::SeqPage>(8); 158 158 159 159 // run poller; it ends only when the channel closes ··· 190 190 current_seq = last_seq_from_poll; 191 191 192 192 // switch to streaming 193 - log::info!("caught up at seq {current_seq}, switching to /export/stream"); 193 + tracing::info!("caught up at seq {current_seq}, switching to /export/stream"); 194 194 let (stream_inner_tx, mut stream_inner_rx) = mpsc::channel::<allegedly::SeqPage>(8); 195 195 let stream_task = tokio::spawn(tail_upstream_stream( 196 196 Some(current_seq), ··· 210 210 211 211 // stream ended/errored — loop back to polling to resync 212 212 match stream_task.await { 213 - Ok(Ok(())) => log::info!("stream closed cleanly, resyncing via poll"), 214 - Ok(Err(e)) => log::warn!("stream error: {e}, resyncing via poll"), 215 - Err(e) => log::warn!("stream task join error: {e}"), 213 + Ok(Ok(())) => tracing::info!("stream closed cleanly, resyncing via poll"), 214 + Ok(Err(e)) => tracing::warn!("stream error: {e}, resyncing via poll"), 215 + Err(e) => tracing::warn!("stream task join error: {e}"), 216 216 } 217 217 } 218 218 }); ··· 230 230 ))?; 231 231 let db = Db::new(wrap_pg.as_str(), wrap_pg_cert).await?; 232 232 233 - log::debug!("getting the latest op from the db..."); 233 + tracing::debug!("getting the latest op from the db..."); 234 234 let latest = db 235 235 .get_latest() 236 236 .await? 237 237 .expect("there to be at least one op in the db. did you backfill?"); 238 - log::debug!("starting polling from {latest}..."); 238 + tracing::debug!("starting polling from {latest}..."); 239 239 240 240 let (send_page, recv_page) = mpsc::channel(8); 241 241 ··· 256 256 while let Some(next) = tasks.join_next().await { 257 257 match next { 258 258 Err(e) if e.is_panic() => { 259 - log::error!("a joinset task panicked: {e}. bailing now. (should we panic?)"); 259 + tracing::error!("a joinset task panicked: {e}. bailing now. (should we panic?)"); 260 260 return Err(e.into()); 261 261 } 262 262 Err(e) => { 263 - log::error!("a joinset task failed to join: {e}"); 263 + tracing::error!("a joinset task failed to join: {e}"); 264 264 return Err(e.into()); 265 265 } 266 266 Ok(Err(e)) => { 267 - log::error!("a joinset task completed with error: {e}"); 267 + tracing::error!("a joinset task completed with error: {e}"); 268 268 return Err(e); 269 269 } 270 270 Ok(Ok(name)) => { 271 - log::trace!("a task completed: {name:?}. {} left", tasks.len()); 271 + tracing::trace!("a task completed: {name:?}. {} left", tasks.len()); 272 272 } 273 273 } 274 274 } ··· 294 294 async fn main() -> anyhow::Result<()> { 295 295 let args = CliArgs::parse(); 296 296 bin_init(args.instrumentation.enable_opentelemetry); 297 - log::info!("{}", logo("mirror")); 297 + tracing::info!("{}", logo("mirror")); 298 298 run(args.globals, args.args, !args.wrap_mode).await?; 299 299 Ok(()) 300 300 }
+5 -5
src/cached_value.rs
··· 16 16 impl<T: Clone> ExpiringValue<T> { 17 17 fn get(&self, now: Instant) -> Option<T> { 18 18 if now <= self.expires { 19 - log::trace!("returning val (fresh for {:?})", self.expires - now); 19 + tracing::trace!("returning val (fresh for {:?})", self.expires - now); 20 20 Some(self.value.clone()) 21 21 } else { 22 - log::trace!("hiding expired val"); 22 + tracing::trace!("hiding expired val"); 23 23 None 24 24 } 25 25 } ··· 50 50 if let Some(v) = val.as_ref().and_then(|v| v.get(now)) { 51 51 return Ok(v); 52 52 } 53 - log::debug!( 53 + tracing::debug!( 54 54 "value {}, fetching...", 55 55 if val.is_some() { 56 56 "expired" ··· 62 62 .fetcher 63 63 .fetch() 64 64 .await 65 - .inspect_err(|e| log::warn!("value fetch failed, next access will retry: {e}"))?; 66 - log::debug!("fetched ok, saving a copy for cache."); 65 + .inspect_err(|e| tracing::warn!("value fetch failed, next access will retry: {e}"))?; 66 + tracing::debug!("fetched ok, saving a copy for cache."); 67 67 *val = Some(ExpiringValue { 68 68 value: new.clone(), 69 69 expires: now + self.validitiy,
+10 -10
src/lib.rs
··· 140 140 if n < 900 { 141 141 let last_age = page.ops.last().map(|op| chrono::Utc::now() - op.created_at); 142 142 let Some(age) = last_age else { 143 - log::info!("full_pages done, empty final page"); 143 + tracing::info!("full_pages done, empty final page"); 144 144 return Ok("full pages (hmm)"); 145 145 }; 146 146 if age <= chrono::TimeDelta::hours(6) { 147 - log::info!("full_pages done, final page of {n} ops"); 147 + tracing::info!("full_pages done, final page of {n} ops"); 148 148 } else { 149 - log::warn!("full_pages finished with small page of {n} ops, but it's {age} old"); 149 + tracing::warn!("full_pages finished with small page of {n} ops, but it's {age} old"); 150 150 } 151 151 return Ok("full pages (cool)"); 152 152 } 153 - log::trace!("full_pages: continuing with page of {n} ops"); 153 + tracing::trace!("full_pages: continuing with page of {n} ops"); 154 154 tx.send(page).await?; 155 155 } 156 156 Err(anyhow::anyhow!( ··· 167 167 if n < 900 { 168 168 let last_age = page.ops.last().map(|op| chrono::Utc::now() - op.created_at); 169 169 let Some(age) = last_age else { 170 - log::info!("full_pages done, empty final page"); 170 + tracing::info!("full_pages done, empty final page"); 171 171 return Ok("full pages (hmm)"); 172 172 }; 173 173 if age <= chrono::TimeDelta::hours(6) { 174 - log::info!("full_pages done, final page of {n} ops"); 174 + tracing::info!("full_pages done, final page of {n} ops"); 175 175 } else { 176 - log::warn!("full_pages finished with small page of {n} ops, but it's {age} old"); 176 + tracing::warn!("full_pages finished with small page of {n} ops, but it's {age} old"); 177 177 } 178 178 return Ok("full pages (cool)"); 179 179 } 180 - log::trace!("full_pages: continuing with page of {n} ops"); 180 + tracing::trace!("full_pages: continuing with page of {n} ops"); 181 181 tx.send(page).await?; 182 182 } 183 183 Err(anyhow::anyhow!( ··· 201 201 } 202 202 } 203 203 if let Some(notify) = notify_last_at { 204 - log::trace!("notifying last_at: {last_at:?}"); 204 + tracing::trace!("notifying last_at: {last_at:?}"); 205 205 if notify.send(last_at).is_err() { 206 - log::error!("receiver for last_at dropped, can't notify"); 206 + tracing::error!("receiver for last_at dropped, can't notify"); 207 207 }; 208 208 } 209 209 Ok("pages_to_stdout")
+8 -8
src/mirror/fjall.rs
··· 382 382 while let Some(op) = op_rx.recv().await { 383 383 let json = serde_json::to_string(&op.to_sequenced_json()).unwrap(); 384 384 if let Err(e) = socket.send(Message::Text(json)).await { 385 - log::warn!("closing export stream: {e}"); 385 + tracing::warn!("closing export stream: {e}"); 386 386 return; 387 387 } 388 388 cursor = op.seq + 1; ··· 390 390 391 391 match read.await { 392 392 Ok(Err(e)) => { 393 - log::error!("stream read failed: {e}"); 393 + tracing::error!("stream read failed: {e}"); 394 394 return; 395 395 } 396 396 Err(e) => { 397 - log::error!("stream read task panicked: {e}"); 397 + tracing::error!("stream read task panicked: {e}"); 398 398 return; 399 399 } 400 400 Ok(Ok(())) => {} ··· 429 429 experimental: ExperimentalConf, 430 430 fjall: FjallDb, 431 431 ) -> anyhow::Result<&'static str> { 432 - log::info!("starting fjall mirror server..."); 432 + tracing::info!("starting fjall mirror server..."); 433 433 434 434 let client = Client::builder() 435 435 .user_agent(UA) ··· 461 461 .at("/export/stream", get(export_stream)); 462 462 463 463 if experimental.write_upstream { 464 - log::info!("enabling experimental write forwarding to upstream"); 464 + tracing::info!("enabling experimental write forwarding to upstream"); 465 465 466 466 let ip_limiter = IpLimiters::new(Quota::per_hour(10.try_into().unwrap())); 467 467 let did_limiter = CreatePlcOpLimiter::new(Quota::per_hour(4.try_into().unwrap())); ··· 514 514 } 515 515 516 516 let mut headers: reqwest::header::HeaderMap = req.headers().clone(); 517 - log::trace!("original request headers: {headers:?}"); 517 + tracing::trace!("original request headers: {headers:?}"); 518 518 headers.insert("Host", upstream.host_str().unwrap().parse().unwrap()); 519 519 let client_ua = headers 520 520 .get(USER_AGENT) ··· 526 526 .parse() 527 527 .unwrap(), 528 528 ); 529 - log::trace!("adjusted request headers: {headers:?}"); 529 + tracing::trace!("adjusted request headers: {headers:?}"); 530 530 531 531 let mut target = upstream.clone(); 532 532 target.set_path(&did); ··· 538 538 .send() 539 539 .await 540 540 .map_err(|e| { 541 - log::warn!("upstream write fail: {e}"); 541 + tracing::warn!("upstream write fail: {e}"); 542 542 Error::from_string( 543 543 failed_to_reach_named("upstream PLC"), 544 544 StatusCode::BAD_GATEWAY,
+2 -2
src/mirror/mod.rs
··· 184 184 } 185 185 let auto_cert = auto_cert.build().expect("acme config to build"); 186 186 187 - log::trace!("auto_cert: {auto_cert:?}"); 187 + tracing::trace!("auto_cert: {auto_cert:?}"); 188 188 189 189 let notice_task = tokio::task::spawn(run_insecure_notice(ipv6)); 190 190 let listener = TcpListener::bind(if ipv6 { "[::]:443" } else { "0.0.0.0:443" }); 191 191 let app_res = run(app, listener.acme(auto_cert)).await; 192 - log::warn!("server task ended, aborting insecure server task..."); 192 + tracing::warn!("server task ended, aborting insecure server task..."); 193 193 notice_task.abort(); 194 194 app_res?; 195 195 notice_task.await??;
+6 -6
src/mirror/pg.rs
··· 179 179 .send() 180 180 .await 181 181 .map_err(|e| { 182 - log::error!("upstream req fail: {e}"); 182 + tracing::error!("upstream req fail: {e}"); 183 183 Error::from_string( 184 184 failed_to_reach_named("wrapped reference PLC"), 185 185 StatusCode::BAD_GATEWAY, ··· 215 215 } 216 216 217 217 let mut headers: reqwest::header::HeaderMap = req.headers().clone(); 218 - log::trace!("original request headers: {headers:?}"); 218 + tracing::trace!("original request headers: {headers:?}"); 219 219 headers.insert("Host", upstream.host_str().unwrap().parse().unwrap()); 220 220 let client_ua = headers 221 221 .get(USER_AGENT) ··· 227 227 .parse() 228 228 .unwrap(), 229 229 ); 230 - log::trace!("adjusted request headers: {headers:?}"); 230 + tracing::trace!("adjusted request headers: {headers:?}"); 231 231 232 232 let mut target = upstream.clone(); 233 233 target.set_path(&did); ··· 239 239 .send() 240 240 .await 241 241 .map_err(|e| { 242 - log::warn!("upstream write fail: {e}"); 242 + tracing::warn!("upstream write fail: {e}"); 243 243 Error::from_string( 244 244 failed_to_reach_named("upstream PLC"), 245 245 StatusCode::BAD_GATEWAY, ··· 274 274 experimental: ExperimentalConf, 275 275 db: Option<Db>, 276 276 ) -> anyhow::Result<&'static str> { 277 - log::info!("starting server..."); 277 + tracing::info!("starting server..."); 278 278 279 279 let client = Client::builder() 280 280 .user_agent(UA) ··· 306 306 .at("/export", get(proxy)); 307 307 308 308 if experimental.write_upstream { 309 - log::info!("enabling experimental write forwarding to upstream"); 309 + tracing::info!("enabling experimental write forwarding to upstream"); 310 310 311 311 let ip_limiter = IpLimiters::new(Quota::per_hour(10.try_into().unwrap())); 312 312 let did_limiter = CreatePlcOpLimiter::new(Quota::per_hour(4.try_into().unwrap()));
+30 -30
src/plc_fjall.rs
··· 1031 1031 }; 1032 1032 1033 1033 for err in &errors { 1034 - log::warn!("dropping op {} {} (seq {seq}) parse error: {err}", op.did, op.cid); 1034 + tracing::warn!("dropping op {} {} (seq {seq}) parse error: {err}", op.did, op.cid); 1035 1035 } 1036 1036 if !errors.is_empty() { 1037 1037 // if parse failed but not fatal, we just dont store it ··· 1069 1069 .map(|e| e.to_string()) 1070 1070 .collect::<Vec<_>>() 1071 1071 .join("\n"); 1072 - log::warn!("dropping op {} {} (seq {seq}) invalid sig:\n{msg}", op.did, op.cid); 1072 + tracing::warn!("dropping op {} {} (seq {seq}) invalid sig:\n{msg}", op.did, op.cid); 1073 1073 return Ok(0); 1074 1074 } 1075 1075 } 1076 1076 Err(e) => { 1077 - log::warn!("dropping op {} {} (seq {seq}): {e}", op.did, op.cid); 1077 + tracing::warn!("dropping op {} {} (seq {seq}): {e}", op.did, op.cid); 1078 1078 return Ok(0); 1079 1079 } 1080 1080 } 1081 - log::debug!("verified op {} {} (seq {seq})", op.did, op.cid); 1081 + tracing::debug!("verified op {} {} (seq {seq})", op.did, op.cid); 1082 1082 } 1083 1083 1084 1084 let db_op = DbOp { ··· 1104 1104 1105 1105 self.inner.notify_stream.notify_waiters(); 1106 1106 1107 - log::debug!("inserted op {} {} (seq {seq})", op.did, op.cid); 1107 + tracing::debug!("inserted op {} {} (seq {seq})", op.did, op.cid); 1108 1108 Ok(1) 1109 1109 } 1110 1110 ··· 1257 1257 let (seq, by_did_key_bytes) = match (found_seq, found_by_did_key) { 1258 1258 (Some(s), Some(k)) => (s, k), 1259 1259 _ => { 1260 - log::warn!("drop_op: by_did entry not found for {did_str}"); 1260 + tracing::warn!("drop_op: by_did entry not found for {did_str}"); 1261 1261 return Ok(()); 1262 1262 } 1263 1263 }; ··· 1310 1310 }); 1311 1311 let prev_cid_ok = op.operation.prev.is_none() || prev_op.is_some(); 1312 1312 if !prev_cid_ok { 1313 - log::error!("audit: op {did} {cid} prev cid mismatch or missing predecessor, is db corrupted?"); 1313 + tracing::error!("audit: op {did} {cid} prev cid mismatch or missing predecessor, is db corrupted?"); 1314 1314 failed += 1; 1315 1315 send_invalid(); 1316 1316 continue; ··· 1325 1325 .map(|e| e.to_string()) 1326 1326 .collect::<Vec<_>>() 1327 1327 .join("\n "); 1328 - log::warn!("audit: invalid op {} {}:\n {msg}", did, cid); 1328 + tracing::warn!("audit: invalid op {} {}:\n {msg}", did, cid); 1329 1329 failed += 1; 1330 1330 send_invalid(); 1331 1331 } 1332 1332 } 1333 1333 Err(e) => { 1334 - log::warn!("audit: invalid op {} {}: {e}", did, cid); 1334 + tracing::warn!("audit: invalid op {} {}: {e}", did, cid); 1335 1335 failed += 1; 1336 1336 send_invalid(); 1337 1337 } ··· 1437 1437 if reset { 1438 1438 let db = db.clone(); 1439 1439 tokio::task::spawn_blocking(move || db.clear()).await??; 1440 - log::warn!("fjall reset: cleared all data"); 1440 + tracing::warn!("fjall reset: cleared all data"); 1441 1441 } 1442 1442 1443 1443 let mut last_at = None; ··· 1492 1492 } 1493 1493 } 1494 1494 } 1495 - log::debug!("finished receiving bulk pages"); 1495 + tracing::debug!("finished receiving bulk pages"); 1496 1496 1497 1497 if let Some(notify) = notify_last_at { 1498 - log::trace!("notifying last_at: {last_at:?}"); 1498 + tracing::trace!("notifying last_at: {last_at:?}"); 1499 1499 if notify.send(last_at).is_err() { 1500 - log::error!("receiver for last_at dropped, can't notify"); 1500 + tracing::error!("receiver for last_at dropped, can't notify"); 1501 1501 }; 1502 1502 } 1503 1503 1504 1504 tokio::task::spawn_blocking(move || db.persist(PersistMode::SyncAll)).await??; 1505 1505 1506 - log::info!( 1506 + tracing::info!( 1507 1507 "backfill_to_fjall: inserted {ops_inserted} ops in {:?}", 1508 1508 t0.elapsed() 1509 1509 ); ··· 1515 1515 db: FjallDb, 1516 1516 mut pages: mpsc::Receiver<crate::SeqPage>, 1517 1517 ) -> anyhow::Result<&'static str> { 1518 - log::info!("starting seq_pages_to_fjall writer..."); 1518 + tracing::info!("starting seq_pages_to_fjall writer..."); 1519 1519 1520 1520 let t0 = Instant::now(); 1521 1521 let mut ops_inserted: usize = 0; ··· 1523 1523 while let Some(page) = pages.recv().await { 1524 1524 let first_seq = page.ops.first().map(|op| op.seq); 1525 1525 let last_seq = page.ops.last().map(|op| op.seq); 1526 - log::debug!( 1526 + tracing::debug!( 1527 1527 "seq_pages: received page with {} ops, seq {:?}..{:?}", 1528 1528 page.ops.len(), 1529 1529 first_seq, ··· 1534 1534 let count = tokio::task::spawn_blocking(move || -> anyhow::Result<usize> { 1535 1535 let mut count: usize = 0; 1536 1536 for seq_op in &page.ops { 1537 - log::debug!("seq_pages: processing op {} {} (seq {})", seq_op.did, seq_op.cid, seq_op.seq); 1537 + tracing::debug!("seq_pages: processing op {} {} (seq {})", seq_op.did, seq_op.cid, seq_op.seq); 1538 1538 let common_op = CommonOp { 1539 1539 did: seq_op.did.clone(), 1540 1540 cid: seq_op.cid.clone(), ··· 1549 1549 }) 1550 1550 .await??; 1551 1551 if count < page_len { 1552 - log::warn!( 1552 + tracing::warn!( 1553 1553 "seq_pages: page seq {:?}..{:?} inserted {count}/{page_len} ops ({} dropped)", 1554 1554 first_seq, 1555 1555 last_seq, ··· 1559 1559 ops_inserted += count; 1560 1560 } 1561 1561 1562 - log::info!( 1562 + tracing::info!( 1563 1563 "no more seq pages. inserted {ops_inserted} ops in {:?}", 1564 1564 t0.elapsed() 1565 1565 ); ··· 1570 1570 db: FjallDb, 1571 1571 invalid_ops_tx: mpsc::Sender<InvalidOp>, 1572 1572 ) -> anyhow::Result<&'static str> { 1573 - log::info!("starting fjall audit..."); 1573 + tracing::info!("starting fjall audit..."); 1574 1574 let t0 = std::time::Instant::now(); 1575 1575 let (checked, failed) = tokio::task::spawn_blocking(move || db.audit(invalid_ops_tx)).await??; 1576 - log::info!( 1576 + tracing::info!( 1577 1577 "fjall audit complete in {:?}, {checked} ops checked", 1578 1578 t0.elapsed() 1579 1579 ); 1580 1580 if failed > 0 { 1581 - log::error!("audit found {failed} invalid operations"); 1581 + tracing::error!("audit found {failed} invalid operations"); 1582 1582 } 1583 1583 Ok("audit_fjall") 1584 1584 } ··· 1589 1589 only_drop: bool, 1590 1590 mut invalid_ops_rx: mpsc::Receiver<InvalidOp>, 1591 1591 ) -> anyhow::Result<&'static str> { 1592 - log::info!("starting fjall fix ops..."); 1592 + tracing::info!("starting fjall fix ops..."); 1593 1593 let mut fixed_dids = std::collections::HashSet::new(); 1594 1594 let mut count = 0; 1595 1595 ··· 1615 1615 continue; 1616 1616 } 1617 1617 1618 - log::trace!("fetching upstream ops to fix did: {did}"); 1618 + tracing::trace!("fetching upstream ops to fix did: {did}"); 1619 1619 let mut url = upstream.clone(); 1620 1620 url.set_path(&format!("/{did}/log/audit")); 1621 1621 ··· 1626 1626 StatusCode::OK => match resp.json().await { 1627 1627 Ok(ops) => ops, 1628 1628 Err(e) => { 1629 - log::warn!("failed to parse upstream ops for {did}: {e}"); 1629 + tracing::warn!("failed to parse upstream ops for {did}: {e}"); 1630 1630 continue; 1631 1631 } 1632 1632 }, 1633 1633 StatusCode::NOT_FOUND => { 1634 - log::trace!("did not found upstream: {did}"); 1634 + tracing::trace!("did not found upstream: {did}"); 1635 1635 Vec::new() // this essentially means drop the whole did 1636 1636 } 1637 1637 s => { 1638 - log::warn!("failed to fetch upstream for {did}: {s}"); 1638 + tracing::warn!("failed to fetch upstream for {did}: {s}"); 1639 1639 continue; 1640 1640 } 1641 1641 }; 1642 1642 1643 - log::trace!("fetched {} ops for {did}", ops.len()); 1643 + tracing::trace!("fetched {} ops for {did}", ops.len()); 1644 1644 1645 1645 // we drop all ops first just to be safe 1646 1646 let existing = db.ops_for_did(&did)?; ··· 1655 1655 // if we don't skip these we might miss some ops in between 1656 1656 // the latest_at we started with vs the one we ended up with 1657 1657 if op.created_at > latest_at { 1658 - log::trace!( 1658 + tracing::trace!( 1659 1659 "skipping op {} for {did} because it is newer than latest_at {latest_at}", 1660 1660 op.cid 1661 1661 ); ··· 1671 1671 fixed_dids.insert(did); 1672 1672 } 1673 1673 1674 - log::info!("fixed {count} ops"); 1674 + tracing::info!("fixed {count} ops"); 1675 1675 1676 1676 Ok("fix_ops_fjall") 1677 1677 }
+19 -19
src/plc_pg.rs
··· 49 49 pub async fn new(pg_uri: &str, cert: Option<PathBuf>) -> Result<Self, anyhow::Error> { 50 50 // we're going to interact with did-method-plc's database, so make sure 51 51 // it's what we expect: check for db migrations. 52 - log::trace!("checking migrations..."); 52 + tracing::trace!("checking migrations..."); 53 53 54 54 let connector = cert.map(get_tls).transpose()?; 55 55 ··· 77 77 drop(client); 78 78 // make sure the connection worker thing doesn't linger 79 79 conn_task.await??; 80 - log::info!("db connection succeeded and plc migrations appear as expected"); 80 + tracing::info!("db connection succeeded and plc migrations appear as expected"); 81 81 82 82 Ok(Self { 83 83 pg_uri: pg_uri.to_string(), ··· 86 86 } 87 87 88 88 pub async fn connect(&self) -> Result<(Client, JoinHandle<Result<(), PgError>>), PgError> { 89 - log::trace!("connecting postgres..."); 89 + tracing::trace!("connecting postgres..."); 90 90 if let Some(ref connector) = self.cert { 91 91 get_client_and_task(&self.pg_uri, connector.clone()).await 92 92 } else { ··· 115 115 db: Db, 116 116 mut pages: mpsc::Receiver<ExportPage>, 117 117 ) -> anyhow::Result<&'static str> { 118 - log::info!("starting pages_to_pg writer..."); 118 + tracing::info!("starting pages_to_pg writer..."); 119 119 120 120 let (mut client, task) = db.connect().await?; 121 121 ··· 135 135 let mut dids_inserted = 0; 136 136 137 137 while let Some(page) = pages.recv().await { 138 - log::trace!("writing page with {} ops", page.ops.len()); 138 + tracing::trace!("writing page with {} ops", page.ops.len()); 139 139 let tx = client.transaction().await?; 140 140 for op in page.ops { 141 141 ops_inserted += tx ··· 156 156 } 157 157 drop(task); 158 158 159 - log::info!( 159 + tracing::info!( 160 160 "no more pages. inserted {ops_inserted} ops and {dids_inserted} dids in {:?}", 161 161 t0.elapsed() 162 162 ); ··· 194 194 if reset { 195 195 let n = tx.execute(&format!("DELETE FROM {table}"), &[]).await?; 196 196 if n > 0 { 197 - log::warn!("postgres reset: deleted {n} from {table}"); 197 + tracing::warn!("postgres reset: deleted {n} from {table}"); 198 198 } 199 199 } else { 200 200 let n: i64 = tx ··· 206 206 } 207 207 } 208 208 } 209 - log::trace!("tables clean: {:?}", t_step.elapsed()); 209 + tracing::trace!("tables clean: {:?}", t_step.elapsed()); 210 210 211 211 let t_step = Instant::now(); 212 212 tx.execute("ALTER TABLE operations SET UNLOGGED", &[]) 213 213 .await?; 214 214 tx.execute("ALTER TABLE dids SET UNLOGGED", &[]).await?; 215 - log::trace!("set tables unlogged: {:?}", t_step.elapsed()); 215 + tracing::trace!("set tables unlogged: {:?}", t_step.elapsed()); 216 216 217 217 let t_step = Instant::now(); 218 218 tx.execute(r#"DROP INDEX "operations_createdAt_index""#, &[]) 219 219 .await?; 220 220 tx.execute("DROP INDEX operations_did_createdat_idx", &[]) 221 221 .await?; 222 - log::trace!("indexes dropped: {:?}", t_step.elapsed()); 222 + tracing::trace!("indexes dropped: {:?}", t_step.elapsed()); 223 223 224 224 let t_step = Instant::now(); 225 - log::trace!("starting binary COPY IN..."); 225 + tracing::trace!("starting binary COPY IN..."); 226 226 let types = &[ 227 227 Type::TEXT, 228 228 Type::JSONB, ··· 256 256 last_at = last_at.filter(|&l| l >= s.last_at).or(Some(s.last_at)); 257 257 } 258 258 } 259 - log::debug!("finished receiving bulk pages"); 259 + tracing::debug!("finished receiving bulk pages"); 260 260 261 261 if let Some(notify) = notify_last_at { 262 - log::trace!("notifying last_at: {last_at:?}"); 262 + tracing::trace!("notifying last_at: {last_at:?}"); 263 263 if notify.send(last_at).is_err() { 264 - log::error!("receiver for last_at dropped, can't notify"); 264 + tracing::error!("receiver for last_at dropped, can't notify"); 265 265 }; 266 266 } 267 267 268 268 let n = writer.as_mut().finish().await?; 269 - log::trace!("COPY IN wrote {n} ops: {:?}", t_step.elapsed()); 269 + tracing::trace!("COPY IN wrote {n} ops: {:?}", t_step.elapsed()); 270 270 271 271 // CAUTION: these indexes MUST match up exactly with the kysely ones we dropped 272 272 let t_step = Instant::now(); ··· 280 280 &[], 281 281 ) 282 282 .await?; 283 - log::trace!("indexes recreated: {:?}", t_step.elapsed()); 283 + tracing::trace!("indexes recreated: {:?}", t_step.elapsed()); 284 284 285 285 let t_step = Instant::now(); 286 286 let n = tx ··· 289 289 &[], 290 290 ) 291 291 .await?; 292 - log::trace!("INSERT wrote {n} dids: {:?}", t_step.elapsed()); 292 + tracing::trace!("INSERT wrote {n} dids: {:?}", t_step.elapsed()); 293 293 294 294 let t_step = Instant::now(); 295 295 tx.execute("ALTER TABLE dids SET LOGGED", &[]).await?; 296 296 tx.execute("ALTER TABLE operations SET LOGGED", &[]).await?; 297 - log::trace!("set tables LOGGED: {:?}", t_step.elapsed()); 297 + tracing::trace!("set tables LOGGED: {:?}", t_step.elapsed()); 298 298 299 299 tx.commit().await?; 300 300 drop(task); 301 - log::info!("total backfill time: {:?}", t0.elapsed()); 301 + tracing::info!("total backfill time: {:?}", t0.elapsed()); 302 302 303 303 Ok("backfill_to_pg") 304 304 }
+18 -18
src/poll.rs
··· 143 143 use tokio::io::{AsyncBufReadExt, BufReader}; 144 144 use tokio_util::compat::FuturesAsyncReadCompatExt; 145 145 146 - log::trace!("Getting page: {url}"); 146 + tracing::trace!("Getting page: {url}"); 147 147 148 148 let res = CLIENT.get(url).send().await?.error_for_status()?; 149 149 let stream = Box::pin( ··· 165 165 } 166 166 match serde_json::from_str::<Op>(line) { 167 167 Ok(op) => ops.push(op), 168 - Err(e) => log::warn!("failed to parse op: {e} ({line})"), 168 + Err(e) => tracing::warn!("failed to parse op: {e} ({line})"), 169 169 } 170 170 } 171 171 Ok(None) => break, 172 172 Err(e) => { 173 - log::warn!("transport error mid-page: {}; returning partial page", e); 173 + tracing::warn!("transport error mid-page: {}; returning partial page", e); 174 174 break; 175 175 } 176 176 } ··· 216 216 throttle: Duration, 217 217 dest: mpsc::Sender<ExportPage>, 218 218 ) -> anyhow::Result<&'static str> { 219 - log::info!("starting upstream poller at {base} after {after:?}"); 219 + tracing::info!("starting upstream poller at {base} after {after:?}"); 220 220 let mut tick = tokio::time::interval(throttle); 221 221 let mut prev_last: Option<LastOp> = after.map(Into::into); 222 222 let mut boundary_state: Option<PageBoundaryState> = None; ··· 232 232 let (mut page, next_last) = match get_page(url).await { 233 233 Ok(res) => res, 234 234 Err(e) => { 235 - log::warn!("error polling upstream: {e}"); 235 + tracing::warn!("error polling upstream: {e}"); 236 236 continue; 237 237 } 238 238 }; ··· 246 246 match dest.try_send(page) { 247 247 Ok(()) => {} 248 248 Err(mpsc::error::TrySendError::Full(page)) => { 249 - log::warn!("export: destination channel full, awaiting..."); 249 + tracing::warn!("export: destination channel full, awaiting..."); 250 250 dest.send(page).await?; 251 251 } 252 252 e => e?, ··· 263 263 use tokio::io::{AsyncBufReadExt, BufReader}; 264 264 use tokio_util::compat::FuturesAsyncReadCompatExt; 265 265 266 - log::trace!("getting seq page: {url}"); 266 + tracing::trace!("getting seq page: {url}"); 267 267 268 268 let res = CLIENT.get(url).send().await?.error_for_status()?; 269 269 let stream = Box::pin( ··· 285 285 } 286 286 match serde_json::from_str::<SeqOp>(line) { 287 287 Ok(op) => ops.push(op), 288 - Err(e) => log::warn!("failed to parse seq op: {e} ({line})"), 288 + Err(e) => tracing::warn!("failed to parse seq op: {e} ({line})"), 289 289 } 290 290 } 291 291 Ok(None) => break, 292 292 Err(e) => { 293 - log::warn!( 293 + tracing::warn!( 294 294 "transport error mid-seq-page: {}; returning partial page", 295 295 e 296 296 ); ··· 315 315 throttle: Duration, 316 316 dest: mpsc::Sender<SeqPage>, 317 317 ) -> anyhow::Result<&'static str> { 318 - log::info!("starting seq upstream poller at {base} after {after:?}"); 318 + tracing::info!("starting seq upstream poller at {base} after {after:?}"); 319 319 let mut tick = tokio::time::interval(throttle); 320 320 let mut last_seq: u64 = after.unwrap_or(0); 321 321 ··· 329 329 let page = match get_seq_page(url).await { 330 330 Ok(p) => p, 331 331 Err(e) => { 332 - log::warn!("error polling upstream (seq): {e}"); 332 + tracing::warn!("error polling upstream (seq): {e}"); 333 333 continue; 334 334 } 335 335 }; ··· 339 339 } 340 340 341 341 if !page.is_empty() { 342 - log::debug!( 342 + tracing::debug!( 343 343 "seq poll: page with {} ops, seq {}..{}", 344 344 page.ops.len(), 345 345 page.ops.first().map(|op| op.seq).unwrap_or(0), ··· 348 348 match dest.try_send(page) { 349 349 Ok(()) => {} 350 350 Err(mpsc::error::TrySendError::Full(page)) => { 351 - log::warn!("seq poll: destination channel full, awaiting..."); 351 + tracing::warn!("seq poll: destination channel full, awaiting..."); 352 352 dest.send(page).await?; 353 353 } 354 354 e => e?, ··· 388 388 .append_pair("cursor", &seq.to_string()); 389 389 } 390 390 391 - log::info!("connecting to stream: {url}"); 391 + tracing::info!("connecting to stream: {url}"); 392 392 let (mut ws, _) = connect_async(url.as_str()).await?; 393 - log::info!("stream connected"); 393 + tracing::info!("stream connected"); 394 394 395 395 while let Some(msg) = ws.next().await { 396 396 let msg = msg?; 397 397 let text = match msg { 398 398 Message::Text(t) => t, 399 399 Message::Close(_) => { 400 - log::info!("stream closed by server"); 400 + tracing::info!("stream closed by server"); 401 401 break; 402 402 } 403 403 _ => continue, ··· 406 406 let op: SeqOp = match serde_json::from_str(&text) { 407 407 Ok(op) => op, 408 408 Err(e) => { 409 - log::warn!("failed to parse stream event: {e} ({text})"); 409 + tracing::warn!("failed to parse stream event: {e} ({text})"); 410 410 continue; 411 411 } 412 412 }; 413 413 414 414 let page = SeqPage { ops: vec![op] }; 415 415 if dest.send(page).await.is_err() { 416 - log::info!("stream dest channel closed, stopping"); 416 + tracing::info!("stream dest channel closed, stopping"); 417 417 break; 418 418 } 419 419 }
+4 -4
src/ratelimit.rs
··· 61 61 .map_err(|e| e.wait_time_from(CLOCK.now())) 62 62 } 63 63 fn housekeep(&self) { 64 - log::debug!( 64 + tracing::debug!( 65 65 "limiter size before housekeeping: {} dids", 66 66 self.limiter.len() 67 67 ); ··· 125 125 } 126 126 } 127 127 fn housekeep(&self) { 128 - log::debug!( 128 + tracing::debug!( 129 129 "limiter sizes before housekeeping: {}/ip {}/v6_56 {}/v6_48", 130 130 self.per_ip.len(), 131 131 self.ip6_56.len(), ··· 205 205 206 206 match self.limiters.check_key(&key) { 207 207 Ok(_) => { 208 - log::debug!("allowing key {key:?}"); 208 + tracing::debug!("allowing key {key:?}"); 209 209 self.ep.call(req).await 210 210 } 211 211 Err(d) => { 212 212 let wait_time = d.as_secs(); 213 213 214 - log::debug!("rate limit exceeded for {key:?}, quota reset in {wait_time}s"); 214 + tracing::debug!("rate limit exceeded for {key:?}, quota reset in {wait_time}s"); 215 215 216 216 let res = Response::builder() 217 217 .status(StatusCode::TOO_MANY_REQUESTS)
+9 -9
src/weekly.rs
··· 97 97 async fn reader_for(&self, week: Week) -> anyhow::Result<impl AsyncRead> { 98 98 let FolderSource(dir) = self; 99 99 let path = dir.join(format!("{}.jsonl.gz", week.0)); 100 - log::debug!("opening folder source: {path:?}"); 100 + tracing::debug!("opening folder source: {path:?}"); 101 101 let file = File::open(path) 102 102 .await 103 - .inspect_err(|e| log::error!("failed to open file: {e}"))?; 103 + .inspect_err(|e| tracing::error!("failed to open file: {e}"))?; 104 104 let decoder = GzipDecoder::new(BufReader::new(file)); 105 105 Ok(decoder) 106 106 } ··· 151 151 encoder.shutdown().await?; 152 152 let now = Instant::now(); 153 153 154 - log::info!( 154 + tracing::info!( 155 155 "done week {:3 } ({:10 }): {week_ops:7 } ({:5.0 }/s) ops, {:5 }k total ({:5.0 }/s)", 156 156 current_week.map(|w| -w.n_ago()).unwrap_or(0), 157 157 current_week.unwrap_or(Week(0)).0, ··· 170 170 week_ops = 0; 171 171 week_t0 = now; 172 172 } 173 - log::trace!("writing: {op:?}"); 173 + tracing::trace!("writing: {op:?}"); 174 174 encoder 175 175 .write_all(serde_json::to_string(&op)?.as_bytes()) 176 176 .await?; ··· 182 182 // don't forget the final file 183 183 encoder.shutdown().await?; 184 184 let now = Instant::now(); 185 - log::info!( 185 + tracing::info!( 186 186 "done week {:3 } ({:10 }): {week_ops:7 } ({:5.0 }/s) ops, {:5 }k total ({:5.0 }/s)", 187 187 current_week.map(|w| -w.n_ago()).unwrap_or(0), 188 188 current_week.unwrap_or(Week(0)).0, ··· 206 206 let reader = match source.reader_for(week).await { 207 207 Ok(r) => r, 208 208 Err(e) => { 209 - log::warn!( 209 + tracing::warn!( 210 210 "week_to_pages reader_for failed {e}, retrying in {}s", 211 211 retry_backoff.as_secs() 212 212 ); ··· 223 223 Ok(Some(c)) => Some(c), 224 224 Ok(None) => None, 225 225 Err(e) => { 226 - log::warn!( 226 + tracing::warn!( 227 227 "failed to get next chunk: {e}, retrying week in {}s", 228 228 retry_backoff.as_secs() 229 229 ); ··· 237 237 .into_iter() 238 238 .filter_map(|s| { 239 239 serde_json::from_str::<Op>(&s) 240 - .inspect_err(|e| log::warn!("failed to parse op: {e} ({s})")) 240 + .inspect_err(|e| tracing::warn!("failed to parse op: {e} ({s})")) 241 241 .ok() 242 242 }) 243 243 .collect(); ··· 248 248 249 249 let page = ExportPage { ops }; 250 250 if let Err(e) = dest.send(page).await { 251 - log::error!("failed to send page (receiver closed): {e}"); 251 + tracing::error!("failed to send page (receiver closed): {e}"); 252 252 return Err(e.into()); 253 253 } 254 254 }