don't
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(jetstream): pass compression option in request headers

Signed-off-by: tjh <x@tjh.dev>

tjh f28ca7d4 1c9dee24

+21 -21
+21 -21
crates/jetstream/src/task.rs
··· 10 10 use tokio::{sync::Notify, time::Instant}; 11 11 use tokio_tungstenite::{ 12 12 connect_async, 13 - tungstenite::{Error as TungsteniteError, Message}, 13 + tungstenite::{ClientRequestBuilder, Error as TungsteniteError, Message, http::Uri}, 14 14 }; 15 15 use tokio_util::sync::CancellationToken; 16 16 ··· 27 27 event_tx: flume::Sender<Bytes>, 28 28 client_rx: flume::Receiver<ClientCommand>, 29 29 metrics: Metrics, 30 - mut instance: url::Url, 30 + instance: url::Url, 31 31 mut options: Options, 32 32 initial_cursor: Option<u128>, 33 33 shutdown: CancellationToken, 34 34 ) { 35 - // Prepare the URL 36 - instance.set_path("/subscribe"); 37 - { 38 - let mut query = instance.query_pairs_mut(); 39 - query.clear(); 40 - #[cfg(feature = "zstd")] 41 - query.append_pair("compress", "true"); 42 - } 43 - 44 35 #[cfg(feature = "zstd")] 45 36 let dictionary = zstd::dict::DecoderDictionary::copy(ZSTD_DICTIONARY); 46 37 ··· 40 49 // How often to check whether the time elapsed since the last message 41 50 // received exceeds THRESHOLD. 42 51 let mut timeout = tokio::time::interval(Duration::from_secs(5)); 43 - // timeout.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); 44 52 45 53 'outer: loop { 46 54 let (subscribe_url, require_hello) = match options.subscribe_url(&instance, &cursor) { ··· 48 58 }; 49 59 50 60 tracing::debug!(%subscribe_url, "connecting to jetstream"); 51 - let (socket, _) = match shutdown 52 - .run_until_cancelled(connect_async(subscribe_url.as_str())) 53 - .await 54 - { 61 + let uri: Uri = subscribe_url 62 + .as_str() 63 + .parse() 64 + .expect("URL should be a valid URI"); 65 + 66 + #[cfg(feature = "zstd")] 67 + let request = ClientRequestBuilder::new(uri).with_header("Socket-Encoding", "zstd"); 68 + 69 + #[cfg(not(feature = "zstd"))] 70 + let request = ClientRequestBuilder::new(uri); 71 + 72 + let (socket, _) = match shutdown.run_until_cancelled(connect_async(request)).await { 55 73 Some(Ok(socket)) => socket, 56 74 Some(Err(error)) => { 57 75 match error { ··· 125 127 }; 126 128 127 129 let bytes: Bytes = match message { 130 + #[cfg(feature = "zstd")] 128 131 Message::Text(payload) => { 129 - #[cfg(feature = "zstd")] 130 - tracing::warn!( 131 - ?payload, 132 - "received an uncompressed payload, but 'zstd' feature is enabled" 132 + panic!( 133 + "received uncompressed message but zstd feature is enabled: {}", 134 + payload.as_str() 133 135 ); 134 - 136 + } 137 + #[cfg(not(feature = "zstd"))] 138 + Message::Text(payload) => { 135 139 metrics.modify(|mut data| { 136 140 data.bytes_received_raw += payload.len(); 137 141 data.bytes_received += payload.len();