Server tools to backfill, tail, mirror, and verify PLC logs
50
fork

Configure Feed

Select the types of activity you want to include in your feed.

add tracing opentelemetry exporter for web reqs

phil 30992fa9 d66bb7f3

+269 -22
+162
Cargo.lock
··· 39 39 "http-body-util", 40 40 "log", 41 41 "native-tls", 42 + "opentelemetry", 43 + "opentelemetry-otlp", 44 + "opentelemetry_sdk", 42 45 "poem", 43 46 "postgres-native-tls", 44 47 "reqwest", ··· 52 55 "tokio-postgres", 53 56 "tokio-stream", 54 57 "tokio-util", 58 + "tracing", 59 + "tracing-opentelemetry", 55 60 "tracing-subscriber", 56 61 ] 57 62 ··· 1584 1589 ] 1585 1590 1586 1591 [[package]] 1592 + name = "opentelemetry" 1593 + version = "0.30.0" 1594 + source = "registry+https://github.com/rust-lang/crates.io-index" 1595 + checksum = "aaf416e4cb72756655126f7dd7bb0af49c674f4c1b9903e80c009e0c37e552e6" 1596 + dependencies = [ 1597 + "futures-core", 1598 + "futures-sink", 1599 + "js-sys", 1600 + "pin-project-lite", 1601 + "thiserror 2.0.16", 1602 + "tracing", 1603 + ] 1604 + 1605 + [[package]] 1606 + name = "opentelemetry-http" 1607 + version = "0.30.0" 1608 + source = "registry+https://github.com/rust-lang/crates.io-index" 1609 + checksum = "50f6639e842a97dbea8886e3439710ae463120091e2e064518ba8e716e6ac36d" 1610 + dependencies = [ 1611 + "async-trait", 1612 + "bytes", 1613 + "http", 1614 + "opentelemetry", 1615 + "reqwest", 1616 + ] 1617 + 1618 + [[package]] 1619 + name = "opentelemetry-otlp" 1620 + version = "0.30.0" 1621 + source = "registry+https://github.com/rust-lang/crates.io-index" 1622 + checksum = "dbee664a43e07615731afc539ca60c6d9f1a9425e25ca09c57bc36c87c55852b" 1623 + dependencies = [ 1624 + "http", 1625 + "opentelemetry", 1626 + "opentelemetry-http", 1627 + "opentelemetry-proto", 1628 + "opentelemetry_sdk", 1629 + "prost", 1630 + "reqwest", 1631 + "thiserror 2.0.16", 1632 + "tracing", 1633 + ] 1634 + 1635 + [[package]] 1636 + name = "opentelemetry-proto" 1637 + version = "0.30.0" 1638 + source = "registry+https://github.com/rust-lang/crates.io-index" 1639 + checksum = "2e046fd7660710fe5a05e8748e70d9058dc15c94ba914e7c4faa7c728f0e8ddc" 1640 + dependencies = [ 1641 + "opentelemetry", 1642 + "opentelemetry_sdk", 1643 + "prost", 1644 + "tonic", 1645 + ] 1646 + 1647 + [[package]] 1648 + name = "opentelemetry_sdk" 1649 + version = "0.30.0" 1650 + source = "registry+https://github.com/rust-lang/crates.io-index" 1651 + checksum = "11f644aa9e5e31d11896e024305d7e3c98a88884d9f8919dbf37a9991bc47a4b" 1652 + dependencies = [ 1653 + "futures-channel", 1654 + "futures-executor", 1655 + "futures-util", 1656 + "opentelemetry", 1657 + "percent-encoding", 1658 + "rand 0.9.2", 1659 + "serde_json", 1660 + "thiserror 2.0.16", 1661 + "tokio", 1662 + "tokio-stream", 1663 + ] 1664 + 1665 + [[package]] 1587 1666 name = "parking_lot" 1588 1667 version = "0.11.2" 1589 1668 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1666 1745 ] 1667 1746 1668 1747 [[package]] 1748 + name = "pin-project" 1749 + version = "1.1.10" 1750 + source = "registry+https://github.com/rust-lang/crates.io-index" 1751 + checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" 1752 + dependencies = [ 1753 + "pin-project-internal", 1754 + ] 1755 + 1756 + [[package]] 1757 + name = "pin-project-internal" 1758 + version = "1.1.10" 1759 + source = "registry+https://github.com/rust-lang/crates.io-index" 1760 + checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" 1761 + dependencies = [ 1762 + "proc-macro2", 1763 + "quote", 1764 + "syn", 1765 + ] 1766 + 1767 + [[package]] 1669 1768 name = "pin-project-lite" 1670 1769 version = "0.2.16" 1671 1770 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1837 1936 checksum = "89ae43fd86e4158d6db51ad8e2b80f313af9cc74f5c0e03ccb87de09998732de" 1838 1937 dependencies = [ 1839 1938 "unicode-ident", 1939 + ] 1940 + 1941 + [[package]] 1942 + name = "prost" 1943 + version = "0.13.5" 1944 + source = "registry+https://github.com/rust-lang/crates.io-index" 1945 + checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" 1946 + dependencies = [ 1947 + "bytes", 1948 + "prost-derive", 1949 + ] 1950 + 1951 + [[package]] 1952 + name = "prost-derive" 1953 + version = "0.13.5" 1954 + source = "registry+https://github.com/rust-lang/crates.io-index" 1955 + checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" 1956 + dependencies = [ 1957 + "anyhow", 1958 + "itertools", 1959 + "proc-macro2", 1960 + "quote", 1961 + "syn", 1840 1962 ] 1841 1963 1842 1964 [[package]] ··· 2061 2183 "base64", 2062 2184 "bytes", 2063 2185 "encoding_rs", 2186 + "futures-channel", 2064 2187 "futures-core", 2065 2188 "futures-util", 2066 2189 "h2", ··· 2803 2926 ] 2804 2927 2805 2928 [[package]] 2929 + name = "tonic" 2930 + version = "0.13.1" 2931 + source = "registry+https://github.com/rust-lang/crates.io-index" 2932 + checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9" 2933 + dependencies = [ 2934 + "async-trait", 2935 + "base64", 2936 + "bytes", 2937 + "http", 2938 + "http-body", 2939 + "http-body-util", 2940 + "percent-encoding", 2941 + "pin-project", 2942 + "prost", 2943 + "tokio-stream", 2944 + "tower-layer", 2945 + "tower-service", 2946 + "tracing", 2947 + ] 2948 + 2949 + [[package]] 2806 2950 name = "tower" 2807 2951 version = "0.5.2" 2808 2952 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2888 3032 "log", 2889 3033 "once_cell", 2890 3034 "tracing-core", 3035 + ] 3036 + 3037 + [[package]] 3038 + name = "tracing-opentelemetry" 3039 + version = "0.31.0" 3040 + source = "registry+https://github.com/rust-lang/crates.io-index" 3041 + checksum = "ddcf5959f39507d0d04d6413119c04f33b623f4f951ebcbdddddfad2d0623a9c" 3042 + dependencies = [ 3043 + "js-sys", 3044 + "once_cell", 3045 + "opentelemetry", 3046 + "opentelemetry_sdk", 3047 + "smallvec", 3048 + "tracing", 3049 + "tracing-core", 3050 + "tracing-log", 3051 + "tracing-subscriber", 3052 + "web-time", 2891 3053 ] 2892 3054 2893 3055 [[package]]
+5
Cargo.toml
··· 16 16 http-body-util = "0.1.3" 17 17 log = "0.4.28" 18 18 native-tls = "0.2.14" 19 + opentelemetry = "0.30.0" 20 + opentelemetry-otlp = { version = "0.30.0" } 21 + opentelemetry_sdk = { version = "0.30.0", features = ["rt-tokio"] } 19 22 poem = { version = "3.1.12", features = ["acme", "compression"] } 20 23 postgres-native-tls = "0.5.1" 21 24 reqwest = { version = "0.12.23", features = ["stream", "json", "gzip"] } ··· 29 32 tokio-postgres = { version = "0.7.13", features = ["with-chrono-0_4", "with-serde_json-1"] } 30 33 tokio-stream = { version = "0.1.17", features = ["io-util"] } 31 34 tokio-util = { version = "0.7.16", features = ["compat"] } 35 + tracing = "0.1.41" 36 + tracing-opentelemetry = "0.31.0" 32 37 tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
+24 -4
src/bin/allegedly.rs
··· 1 - use allegedly::{Dt, bin::GlobalArgs, bin_init, pages_to_stdout, pages_to_weeks, poll_upstream}; 1 + use allegedly::bin::{GlobalArgs, InstrumentationArgs, bin_init}; 2 + use allegedly::{Dt, logo, pages_to_stdout, pages_to_weeks, poll_upstream}; 2 3 use clap::{CommandFactory, Parser, Subcommand}; 3 4 use std::{path::PathBuf, time::Duration, time::Instant}; 4 5 use tokio::fs::create_dir_all; ··· 48 49 Mirror { 49 50 #[command(flatten)] 50 51 args: mirror::Args, 52 + #[command(flatten)] 53 + instrumentation: InstrumentationArgs, 51 54 }, 52 55 /// Wrap any did-method-plc server, without syncing upstream (read-only) 53 56 Wrap { 54 57 #[command(flatten)] 55 58 args: mirror::Args, 59 + #[command(flatten)] 60 + instrumentation: InstrumentationArgs, 56 61 }, 57 62 /// Poll an upstream PLC server and log new ops to stdout 58 63 Tail { ··· 62 67 }, 63 68 } 64 69 70 + impl Commands { 71 + fn enable_otel(&self) -> bool { 72 + match self { 73 + Commands::Mirror { 74 + instrumentation, .. 75 + } 76 + | Commands::Wrap { 77 + instrumentation, .. 78 + } => instrumentation.enable_opentelemetry, 79 + _ => false, 80 + } 81 + } 82 + } 83 + 65 84 #[tokio::main] 66 85 async fn main() -> anyhow::Result<()> { 67 86 let args = Cli::parse(); 68 87 let matches = Cli::command().get_matches(); 69 88 let name = matches.subcommand().map(|(name, _)| name).unwrap_or("???"); 70 - bin_init(name); 89 + bin_init(args.command.enable_otel()); 90 + log::info!("{}", logo(name)); 71 91 72 92 let globals = args.globals.clone(); 73 93 ··· 96 116 .await 97 117 .expect("to write bundles to output files"); 98 118 } 99 - Commands::Mirror { args } => mirror::run(globals, args, true).await?, 100 - Commands::Wrap { args } => mirror::run(globals, args, false).await?, 119 + Commands::Mirror { args, .. } => mirror::run(globals, args, true).await?, 120 + Commands::Wrap { args, .. } => mirror::run(globals, args, false).await?, 101 121 Commands::Tail { after } => { 102 122 let mut url = globals.upstream; 103 123 url.set_path("/export");
+5 -3
src/bin/backfill.rs
··· 1 1 use allegedly::{ 2 - Db, Dt, ExportPage, FolderSource, HttpSource, backfill, backfill_to_pg, bin::GlobalArgs, 3 - bin_init, full_pages, pages_to_pg, pages_to_stdout, poll_upstream, 2 + Db, Dt, ExportPage, FolderSource, HttpSource, backfill, backfill_to_pg, 3 + bin::{GlobalArgs, bin_init}, 4 + full_pages, logo, pages_to_pg, pages_to_stdout, poll_upstream, 4 5 }; 5 6 use clap::Parser; 6 7 use reqwest::Url; ··· 199 200 #[tokio::main] 200 201 async fn main() -> anyhow::Result<()> { 201 202 let args = CliArgs::parse(); 202 - bin_init("backfill"); 203 + bin_init(false); 204 + log::info!("{}", logo("backfill")); 203 205 run(args.globals, args.args).await?; 204 206 Ok(()) 205 207 }
+28
src/bin/instrumentation/mod.rs
··· 1 + use opentelemetry::trace::TracerProvider as _; 2 + use opentelemetry_otlp::{Protocol, WithExportConfig}; 3 + use opentelemetry_sdk::trace::{RandomIdGenerator, Sampler, SdkTracer, SdkTracerProvider}; 4 + use tracing::Subscriber; 5 + use tracing_opentelemetry::OpenTelemetryLayer; 6 + use tracing_subscriber::registry::LookupSpan; 7 + 8 + pub fn otel_layer<S>() -> OpenTelemetryLayer<S, SdkTracer> 9 + where 10 + S: Subscriber + for<'span> LookupSpan<'span>, 11 + { 12 + let exporter = opentelemetry_otlp::SpanExporter::builder() 13 + .with_http() 14 + .with_protocol(Protocol::HttpBinary) 15 + .build() 16 + .expect("to build otel otlp exporter"); 17 + 18 + let provider = SdkTracerProvider::builder() 19 + .with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased( 20 + 1.0, 21 + )))) 22 + .with_id_generator(RandomIdGenerator::default()) 23 + .with_batch_exporter(exporter) 24 + .build(); 25 + 26 + let tracer = provider.tracer("tracing-otel-subscriber"); 27 + tracing_opentelemetry::layer().with_tracer(tracer) 28 + }
+7 -2
src/bin/mirror.rs
··· 1 1 use allegedly::{ 2 - Db, ExperimentalConf, ListenConf, bin::GlobalArgs, bin_init, pages_to_pg, poll_upstream, serve, 2 + Db, ExperimentalConf, ListenConf, 3 + bin::{GlobalArgs, InstrumentationArgs, bin_init}, 4 + logo, pages_to_pg, poll_upstream, serve, 3 5 }; 4 6 use clap::Parser; 5 7 use reqwest::Url; ··· 166 168 #[command(flatten)] 167 169 globals: GlobalArgs, 168 170 #[command(flatten)] 171 + instrumentation: InstrumentationArgs, 172 + #[command(flatten)] 169 173 args: Args, 170 174 /// Run the mirror in wrap mode, no upstream synchronization (read-only) 171 175 #[arg(long, action)] ··· 176 180 #[tokio::main] 177 181 async fn main() -> anyhow::Result<()> { 178 182 let args = CliArgs::parse(); 179 - bin_init("mirror"); 183 + bin_init(args.instrumentation.enable_opentelemetry); 184 + log::info!("{}", logo("mirror")); 180 185 run(args.globals, args.args, !args.wrap_mode).await?; 181 186 Ok(()) 182 187 }
+38
src/bin/mod.rs
··· 1 + mod instrumentation; 2 + 1 3 use reqwest::Url; 4 + use tracing_subscriber::layer::SubscriberExt; 2 5 3 6 #[derive(Debug, Clone, clap::Args)] 4 7 pub struct GlobalArgs { ··· 12 15 #[arg(long, global = true, env = "ALLEGEDLY_UPSTREAM_THROTTLE_MS")] 13 16 #[clap(default_value = "600")] 14 17 pub upstream_throttle_ms: u64, 18 + } 19 + 20 + #[derive(Debug, Default, Clone, clap::Args)] 21 + pub struct InstrumentationArgs { 22 + /// Export traces to an OTLP collector 23 + /// 24 + /// Configure the colletctor via standard env vars: 25 + /// - `OTEL_EXPORTER_OTLP_ENDPOINT` eg "https://api.honeycomb.io/" 26 + /// - `OTEL_EXPORTER_OTLP_HEADERS` eg "x-honeycomb-team=supersecret" 27 + /// - `OTEL_SERVICE_NAME` eg "my-app" 28 + #[arg(long, action, global = true, env = "ALLEGEDLY_ENABLE_OTEL")] 29 + pub enable_opentelemetry: bool, 30 + } 31 + 32 + pub fn bin_init(enable_otlp: bool) { 33 + let filter = tracing_subscriber::EnvFilter::builder() 34 + .with_default_directive(tracing_subscriber::filter::LevelFilter::INFO.into()) 35 + .from_env_lossy(); 36 + 37 + let stderr_log = tracing_subscriber::fmt::layer() 38 + .with_writer(std::io::stderr) 39 + .pretty(); 40 + 41 + let otel = if enable_otlp { 42 + Some(instrumentation::otel_layer()) 43 + } else { 44 + None 45 + }; 46 + 47 + let subscriber = tracing_subscriber::Registry::default() 48 + .with(filter) 49 + .with(stderr_log) 50 + .with(otel); 51 + 52 + tracing::subscriber::set_global_default(subscriber).expect("to set global tracing subscriber"); 15 53 } 16 54 17 55 #[allow(dead_code)]
-13
src/lib.rs
··· 145 145 env!("CARGO_PKG_VERSION"), 146 146 ) 147 147 } 148 - 149 - pub fn bin_init(name: &str) { 150 - if std::env::var_os("RUST_LOG").is_none() { 151 - unsafe { std::env::set_var("RUST_LOG", "info") }; 152 - } 153 - let filter = tracing_subscriber::EnvFilter::from_default_env(); 154 - tracing_subscriber::fmt() 155 - .with_writer(std::io::stderr) 156 - .with_env_filter(filter) 157 - .init(); 158 - 159 - log::info!("{}", logo(name)); 160 - }