Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
75
fork

Configure Feed

Select the types of activity you want to include in your feed.

use configurable bounded channel

phil 83a4e283 9046be86

+9 -2
+9 -2
jetstream/src/lib.rs
··· 167 167 /// When reconnecting, use the time_us from your most recently processed event and maybe 168 168 /// provide a negative buffer (i.e. subtract a few seconds) to ensure gapless playback. 169 169 pub cursor: Option<chrono::DateTime<Utc>>, 170 + /// Maximum size of send channel for jetstream events. 171 + /// 172 + /// If your consuming task can't keep up with every new jetstream event in real-time, 173 + /// you might get disconnected from the server as a "slow consumer". Increasing channel_size 174 + /// can help prevent that if your consumer sometimes pauses, at a cost of higher memory 175 + /// usage while events are buffered. 176 + pub channel_size: usize, 170 177 /// Marker for record deserializable type. 171 178 /// 172 179 /// See examples/arbitrary_record.rs for an example using serde_json::Value ··· 184 191 wanted_dids: Vec::new(), 185 192 compression: JetstreamCompression::None, 186 193 cursor: None, 194 + channel_size: 1024, 187 195 record_type: PhantomData, 188 196 } 189 197 } ··· 266 274 .validate() 267 275 .map_err(ConnectionError::InvalidConfig)?; 268 276 269 - // TODO: Run some benchmarks and look into using a bounded channel instead. 270 - let (send_channel, receive_channel) = flume::unbounded(); 277 + let (send_channel, receive_channel) = flume::bounded(self.config.channel_size); 271 278 272 279 let configured_endpoint = self 273 280 .config