Bevy+Ratutui powered Monitoring of Pico-Strike devices
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Better tcp loop, initial rpc impl

+226 -88
+29 -1
src/device.rs
··· 31 31 pub ip: IpAddr, 32 32 } 33 33 34 + #[derive(Debug, Component, Default, Clone, Copy)] 35 + pub struct DeviceDetector { 36 + pub blip_threshold: usize, 37 + pub blip_size: usize, 38 + pub max_duty: u16, 39 + pub duty: u16, 40 + } 41 + 34 42 fn on_remove_device(mut world: DeferredWorld, context: HookContext) { 35 43 let component = world 36 44 .entity(context.entity) ··· 110 118 levels: Vec<Entity>, 111 119 } 112 120 121 + #[derive(Debug)] 122 + pub enum ConnectionState { 123 + Disconnected, 124 + Connecting, 125 + Connected, 126 + } 127 + 128 + impl core::fmt::Display for ConnectionState { 129 + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 130 + match self { 131 + Self::Disconnected => write!(f, "Disconnected"), 132 + Self::Connecting => write!(f, "Connecting"), 133 + Self::Connected => write!(f, "Connected"), 134 + } 135 + } 136 + } 137 + 113 138 #[derive(Debug, Resource)] 114 - pub struct ConnectedDevice(pub Entity); 139 + pub struct ConnectedDevice { 140 + pub device: Entity, 141 + pub connection_state: ConnectionState, 142 + } 115 143 116 144 fn register_devices( 117 145 incoming: Res<DiscoverResponse>,
+59 -15
src/net.rs
··· 1 1 use core::net::IpAddr; 2 2 use std::{ 3 - io::Read, 3 + io::{Read, Write}, 4 4 net::{Ipv4Addr, SocketAddr, SocketAddrV4, TcpStream, UdpSocket}, 5 5 time::Duration, 6 6 }; ··· 27 27 #[derive(Debug, Resource)] 28 28 pub struct DiscoverResponse(pub Receiver<InstanceDetails>); 29 29 30 + pub enum StrikeUpdateState { 31 + Disconnected, 32 + Connecting, 33 + Connected, 34 + Updating(striker_proto::StrikerResponse), 35 + } 36 + 30 37 #[derive(Debug, Resource)] 31 - pub struct StrikeUpdates(pub Receiver<striker_proto::StrikerResponse>); 38 + pub struct StrikeUpdates(pub Receiver<StrikeUpdateState>); 32 39 33 40 #[derive(Debug, Resource)] 34 - pub struct StrikeConnect(pub Sender<StrikeConnection>); 41 + pub struct StrikeRequests(pub Sender<striker_proto::StrikerRequest>); 35 42 36 - pub enum StrikeConnection { 43 + #[derive(Debug, Resource)] 44 + pub struct StrikeActions(pub Sender<StrikeAction>); 45 + 46 + pub enum StrikeAction { 37 47 Connect(SocketAddr), 38 48 Disconnect, 39 49 } ··· 136 146 pub fn setup_strike_connection(mut commands: Commands) { 137 147 let io = IoTaskPool::get(); 138 148 139 - let (signal_tx, signal_rx) = async_channel::bounded(1); 149 + let (signal_tx, signal_rx) = async_channel::bounded(2); 150 + let (req_tx, req_rx) = async_channel::bounded(1); 140 151 let (resp_tx, resp_rx) = async_channel::bounded(64); 141 152 142 153 io.spawn(async move { 143 154 let mut read_buf = vec![0u8; 4096]; 155 + let mut write_buf = vec![0u8; 4096]; 144 156 145 - while let Ok(StrikeConnection::Connect(addr)) = signal_rx.recv().await { 157 + while let Ok(StrikeAction::Connect(addr)) = signal_rx.recv().await { 146 158 let net_fut = async { 147 159 loop { 160 + resp_tx.send(StrikeUpdateState::Connecting).await.ok(); 148 161 let Ok(stream) = Async::<TcpStream>::connect(addr).await else { 149 162 Timer::after(Duration::from_secs(1)).await; 150 163 continue; 151 164 }; 152 165 153 - while let Ok(read) = stream.read_with(|mut a| a.read(&mut read_buf)).await { 154 - let Ok(data) = striker_proto::receive_response(&mut read_buf[..read]) else { continue }; 166 + resp_tx.send(StrikeUpdateState::Connected).await.ok(); 167 + stream.write_with(|s| s.set_nodelay(true)).await.ok(); 155 168 156 - resp_tx 157 - .send(data) 158 - .await 159 - .ok(); 160 - } 169 + let read_fut = async { 170 + while let Ok(read) = stream.read_with(|mut a| a.read(&mut read_buf)).await { 171 + let Ok(data) = striker_proto::receive_response(&mut read_buf[..read]) 172 + else { 173 + continue; 174 + }; 175 + 176 + if resp_tx.send(StrikeUpdateState::Updating(data)).await.is_err() { 177 + break; 178 + } 179 + } 180 + }; 181 + 182 + let write_fut = async { 183 + while let Ok(req) = req_rx.recv().await { 184 + let Ok(payload) = striker_proto::send_request(req, &mut write_buf) 185 + else { 186 + continue; 187 + }; 188 + 189 + if stream.write_with(|mut s| s.write(payload)).await.is_err() { 190 + break; 191 + } 192 + } 193 + }; 194 + 195 + (read_fut, write_fut).race().await; 196 + 197 + stream 198 + .write_with(|s| s.shutdown(std::net::Shutdown::Both)) 199 + .await 200 + .ok(); 201 + 202 + break; 161 203 } 162 204 }; 163 205 ··· 165 207 while signal_rx 166 208 .recv() 167 209 .await 168 - .is_ok_and(|strike| !matches!(strike, StrikeConnection::Disconnect)) 210 + .is_ok_and(|strike| !matches!(strike, StrikeAction::Disconnect)) 169 211 { 170 212 } 171 213 }; 172 214 173 215 (net_fut, cancel_fut).race().await; 216 + resp_tx.send(StrikeUpdateState::Disconnected).await.ok(); 174 217 } 175 218 }) 176 219 .detach(); 177 220 178 - commands.insert_resource(StrikeConnect(signal_tx)); 221 + commands.insert_resource(StrikeActions(signal_tx)); 179 222 commands.insert_resource(StrikeUpdates(resp_rx)); 223 + commands.insert_resource(StrikeRequests(req_tx)); 180 224 } 181 225 182 226 pub struct NetPlugin;
+16 -11
src/views/home.rs
··· 15 15 use bevy_ratatui::RatatuiContext; 16 16 use ratatui::{ 17 17 layout::{Constraint, HorizontalAlignment, Layout}, 18 - style::{Color, Style}, 18 + style::{Color, Style, Stylize}, 19 19 widgets::{Block, List, ListDirection, ListItem, ListState, Padding, Paragraph}, 20 20 }; 21 21 22 22 use crate::{ 23 - device::{ConnectedDevice, Device, DeviceSocket}, 23 + device::{ConnectedDevice, ConnectionState, Device, DeviceSocket}, 24 24 messages::StrikeMessage, 25 25 net::MdnsSignaler, 26 26 state::AppState, ··· 77 77 .iter(world) 78 78 .nth(offset) 79 79 .unwrap(); 80 - world.insert_resource(ConnectedDevice(device)); 80 + world.insert_resource(ConnectedDevice { 81 + device, 82 + connection_state: ConnectionState::Disconnected, 83 + }); 81 84 let mut next_state = world.resource_mut::<NextState<AppState>>(); 82 85 next_state.set(AppState::Monitoring); 83 86 ··· 114 117 .title("Striker") 115 118 .title_alignment(HorizontalAlignment::Center) 116 119 .border_style(Color::LightBlue), 117 - ); 120 + ).white(); 118 121 119 122 let items = q_devices.iter().map(|(name, addr)| { 120 123 ListItem::new(format!( ··· 125 128 126 129 let help_text = "Keys: 'q'/ESC Quit, 's' Toggle Search, UP/DOWN Choose Device(s), ENTER/SPACE Connect to Device"; 127 130 128 - let help = Paragraph::new(help_text).block( 129 - Block::bordered() 130 - .padding(Padding::horizontal(2)) 131 - .border_style(Color::LightBlue), 132 - ); 131 + let help = Paragraph::new(help_text) 132 + .block( 133 + Block::bordered() 134 + .padding(Padding::horizontal(2)) 135 + .border_style(Color::LightBlue), 136 + ).white(); 133 137 134 138 let list = List::new(items) 135 139 .highlight_symbol(">> ") 136 140 .highlight_spacing(ratatui::widgets::HighlightSpacing::Always) 137 - .highlight_style(Style::new().bg(Color::DarkGray)) 141 + .highlight_style(Style::new().on_dark_gray().black()) 138 142 .direction(ListDirection::TopToBottom) 139 143 .block( 140 144 Block::bordered() ··· 142 146 .title_alignment(HorizontalAlignment::Center) 143 147 .padding(Padding::new(2, 2, 1, 1)) 144 148 .border_style(Color::LightBlue), 145 - ); 149 + ) 150 + .white(); 146 151 147 152 frame.render_widget(paragraph, top); 148 153 frame.render_stateful_widget(list, mid, &mut list_state.0);
+122 -61
src/views/monitoring.rs
··· 22 22 text::{Line, Span}, 23 23 widgets::{Axis, Block, Chart, Dataset, Padding, Paragraph}, 24 24 }; 25 - use striker_proto::{StrikerResponse, Update}; 25 + use striker_proto::{Response, StrikerRequest, StrikerResponse, Update}; 26 26 27 27 use crate::{ 28 28 device::{ 29 - ConnectedDevice, Device, DeviceSocket, SignalAverage, SignalPeaks, SignalSource, Signals, 30 - StormLevel, StormLevels, StormSignal, StormSource, Timestamp, 29 + ConnectedDevice, ConnectionState, Device, DeviceDetector, DeviceSocket, SignalAverage, 30 + SignalPeaks, SignalSource, Signals, StormLevel, StormLevels, StormSignal, StormSource, 31 + Timestamp, 31 32 }, 32 33 messages::StrikeMessage, 33 - net::{StrikeConnect, StrikeConnection, StrikeUpdates}, 34 + net::{StrikeAction, StrikeActions, StrikeRequests, StrikeUpdateState, StrikeUpdates}, 34 35 state::AppState, 35 36 }; 36 37 ··· 53 54 54 55 pub fn enter_monitoring( 55 56 connected: Res<ConnectedDevice>, 56 - signal: Res<StrikeConnect>, 57 + signal: Res<StrikeActions>, 58 + request: Res<StrikeRequests>, 57 59 q_devices: Query<&DeviceSocket, With<Device>>, 58 60 mut commands: Commands, 59 61 ) -> Result { 60 - let addr = q_devices.get(connected.0)?; 62 + let addr = q_devices.get(connected.device)?; 61 63 62 64 signal 63 65 .0 64 - .try_send(StrikeConnection::Connect(SocketAddr::new( 65 - addr.ip, addr.port, 66 - )))?; 66 + .try_send(StrikeAction::Connect(SocketAddr::new(addr.ip, addr.port)))?; 67 + 68 + request.0.force_send(StrikerRequest { 69 + request: striker_proto::Request::DetectorInfo, 70 + })?; 67 71 68 72 commands.insert_resource(DataClear(Timer::from_seconds( 69 73 60.0 * 1.0, ··· 73 77 Ok(()) 74 78 } 75 79 76 - pub fn exit_monitoring(signal: Res<StrikeConnect>, mut commands: Commands) -> Result { 77 - signal.0.try_send(StrikeConnection::Disconnect)?; 80 + pub fn exit_monitoring(signal: Res<StrikeActions>, mut commands: Commands) -> Result { 81 + signal.0.try_send(StrikeAction::Disconnect)?; 78 82 commands.remove_resource::<DataClear>(); 79 83 80 84 Ok(()) ··· 107 111 108 112 pub fn update_device_data( 109 113 updates: Res<StrikeUpdates>, 110 - connected: Res<ConnectedDevice>, 114 + mut connected: ResMut<ConnectedDevice>, 111 115 mut commands: Commands, 112 116 ) -> Result { 113 - let mut entity = commands.entity(connected.0); 117 + let mut entity = commands.entity(connected.device); 114 118 115 - while let Ok(StrikerResponse::Update(update)) = updates.0.try_recv() { 119 + while let Ok(update) = updates.0.try_recv() { 116 120 match update { 117 - Update::Warning { timestamp, level } => { 118 - entity.with_related::<StormSource>(( 119 - Timestamp::from_seconds(timestamp)?, 120 - StormLevel(level), 121 - )); 121 + StrikeUpdateState::Connected => { 122 + connected.connection_state = ConnectionState::Connected; 122 123 } 123 - Update::Strike { 124 - timestamp, 125 - peaks, 126 - samples, 127 - average, 128 - } => { 129 - entity.with_related::<SignalSource>(( 130 - Timestamp::from_microseconds(timestamp)?, 131 - SignalPeaks::new(peaks), 132 - StormSignal::new(samples), 133 - SignalAverage::new(average), 134 - )); 124 + StrikeUpdateState::Connecting => { 125 + connected.connection_state = ConnectionState::Connecting; 126 + } 127 + StrikeUpdateState::Updating(response) => match response { 128 + StrikerResponse::Response(Response::DetectorInfo { 129 + blip_threshold, 130 + blip_size, 131 + max_duty, 132 + duty, 133 + }) => { 134 + entity.insert(DeviceDetector { 135 + blip_threshold, 136 + blip_size, 137 + max_duty, 138 + duty, 139 + }); 140 + } 141 + StrikerResponse::Update(Update::Warning { timestamp, level }) => { 142 + entity.with_related::<StormSource>(( 143 + Timestamp::from_seconds(timestamp)?, 144 + StormLevel(level), 145 + )); 146 + } 147 + StrikerResponse::Update(Update::Strike { 148 + timestamp, 149 + peaks, 150 + samples, 151 + average, 152 + }) => { 153 + entity.with_related::<SignalSource>(( 154 + Timestamp::from_microseconds(timestamp)?, 155 + SignalPeaks::new(peaks), 156 + StormSignal::new(samples), 157 + SignalAverage::new(average), 158 + )); 159 + } 160 + _ => {} 161 + }, 162 + StrikeUpdateState::Disconnected => { 163 + connected.connection_state = ConnectionState::Disconnected; 135 164 } 136 165 } 137 166 } ··· 142 171 pub fn monitoring_view( 143 172 mut context: ResMut<RatatuiContext>, 144 173 connected: Res<ConnectedDevice>, 145 - q_devices: Query<(&Name, Option<&StormLevels>, Option<&Signals>), With<Device>>, 146 - q_levels: Query<(&Timestamp, &StormLevel)>, 174 + q_devices: Query< 175 + ( 176 + &Name, 177 + Option<&DeviceDetector>, 178 + Option<&StormLevels>, 179 + Option<&Signals>, 180 + ), 181 + With<Device>, 182 + >, 183 + q_levels: Query<&StormLevel>, 147 184 q_signals: Query<(&Timestamp, &StormSignal)>, 148 185 ) -> Result { 149 186 context.draw(|frame| { ··· 154 191 ]) 155 192 .areas(frame.area()); 156 193 157 - let [left, center, right] = Layout::horizontal([ 194 + let [top_left, top_mid_left, top_mid_right, top_right] = Layout::horizontal([ 195 + Constraint::Fill(1), 158 196 Constraint::Fill(1), 159 197 Constraint::Fill(1), 160 198 Constraint::Fill(1), 161 199 ]) 162 200 .areas(top); 163 201 164 - let (device, levels, signals) = q_devices.get(connected.0).unwrap(); 202 + let [chart_block, detector_block] = 203 + Layout::vertical([Constraint::Fill(1), Constraint::Length(3)]).areas(mid); 204 + 205 + let (device, detector, levels, signals) = q_devices.get(connected.device).unwrap(); 165 206 166 207 let latest_level = levels.and_then(|c| { 167 208 c.last() ··· 172 213 .and_then(|s| s.last().copied()) 173 214 .and_then(|entity| q_signals.get(entity).ok()); 174 215 175 - let name = Paragraph::new(device.as_str()).block( 176 - Block::bordered() 177 - .padding(Padding::horizontal(2)) 178 - .title("Device") 179 - .title_alignment(HorizontalAlignment::Center) 180 - .border_style(Color::LightGreen), 181 - ); 216 + let name = Paragraph::new(device.as_str()) 217 + .block( 218 + Block::bordered() 219 + .padding(Padding::horizontal(2)) 220 + .title("Device") 221 + .title_alignment(HorizontalAlignment::Center) 222 + .border_style(Color::LightGreen), 223 + ) 224 + .white(); 182 225 183 - let timestamp_signal = latest_signal.map(|(t, _)| t); 184 - let timestamp_level = latest_level.map(|(t, _)| t); 185 - 186 - let timestamp = match (timestamp_signal, timestamp_level) { 187 - (Some(signal), Some(level)) => signal 188 - .duration_since(**level) 189 - .is_positive() 190 - .then_some(signal) 191 - .or(Some(level)), 192 - (Some(time), None) | (None, Some(time)) => Some(time), 193 - _ => None, 194 - }; 226 + let timestamp = latest_signal.map(|(t, _)| t); 195 227 196 228 let timestamp = Paragraph::new(Line::from_iter( 197 229 timestamp.map(|t| Span::from(format!("{}", t.0))), 198 230 )) 231 + .white() 199 232 .block( 200 233 Block::bordered() 201 234 .padding(Padding::horizontal(2)) ··· 207 240 let warn_level = Paragraph::new(Line::from_iter( 208 241 Some(Span::raw("Level: ")) 209 242 .into_iter() 210 - .chain(latest_level.map(|(_, s)| Span::from(format!("{}", s.0)))), 243 + .chain(latest_level.map(|s| Span::from(format!("{}", s.0)))), 211 244 )) 245 + .white() 212 246 .block( 213 247 Block::bordered() 214 248 .padding(Padding::horizontal(2)) ··· 217 251 .border_style(Color::LightGreen), 218 252 ); 219 253 254 + let connection = Paragraph::new(connected.connection_state.to_string()) 255 + .white() 256 + .block( 257 + Block::bordered() 258 + .padding(Padding::horizontal(2)) 259 + .title("State") 260 + .title_alignment(HorizontalAlignment::Center) 261 + .border_style(Color::LightGreen), 262 + ); 263 + 220 264 let data = latest_signal 221 265 .map(|(_, samples)| { 222 266 samples ··· 250 294 .y_axis(Axis::default().bounds([-y_bounds, y_bounds])) 251 295 .block(block); 252 296 253 - let help = Paragraph::new("Keys: 'q'/ESC Quit, BACKSPACE Return to Device select").block( 297 + let help = Paragraph::new("Keys: 'q'/ESC Quit, BACKSPACE Return to Device select") 298 + .block( 299 + Block::bordered() 300 + .padding(Padding::horizontal(2)) 301 + .border_style(Color::LightGreen), 302 + ) 303 + .white(); 304 + 305 + let detector = detector.copied().unwrap_or_default(); 306 + 307 + let detector_info = Paragraph::new(format!( 308 + "Threshold: {}, Blip Size: {}, Max Duty {}, Current Duty: {}", 309 + detector.blip_threshold, detector.blip_size, detector.max_duty, detector.duty 310 + )) 311 + .block( 254 312 Block::bordered() 255 313 .padding(Padding::horizontal(2)) 256 314 .border_style(Color::LightGreen), 257 - ); 315 + ) 316 + .white(); 258 317 259 - frame.render_widget(name, left); 260 - frame.render_widget(timestamp, center); 261 - frame.render_widget(warn_level, right); 262 - frame.render_widget(chart, mid); 318 + frame.render_widget(name, top_left); 319 + frame.render_widget(timestamp, top_mid_left); 320 + frame.render_widget(warn_level, top_mid_right); 321 + frame.render_widget(connection, top_right); 322 + frame.render_widget(chart, chart_block); 323 + frame.render_widget(detector_info, detector_block); 263 324 frame.render_widget(help, bottom); 264 325 })?; 265 326