Deployment and lifecycle management for Nix
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

client/daemon: successfully joined both lobby and private channels

+103 -152
+1
Cargo.lock
··· 1263 1263 name = "sower" 1264 1264 version = "0.2.0-dev" 1265 1265 dependencies = [ 1266 + "anyhow", 1266 1267 "clap", 1267 1268 "hmac", 1268 1269 "jwt",
+1
client/Cargo.toml
··· 6 6 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html 7 7 8 8 [dependencies] 9 + anyhow = "1.0.86" 9 10 clap = { version = "4.5.1", features = ["derive", "color"] } 10 11 hmac = "0.12.1" 11 12 jwt = "0.16.0"
+2
client/justfile
··· 1 + start: 2 + cargo watch -x 'run -- daemon --url=http://localhost:4000 -b=../.bootstrap.token'
+88 -150
client/src/sower/daemon.rs
··· 3 3 use std::sync::Arc; 4 4 use std::time::Duration; 5 5 6 + use anyhow::{Context, Result}; 6 7 use hmac::{Hmac, Mac}; 7 8 use jwt::SignWithKey; 8 9 use phoenix_channels_client::url::Url; 9 - use phoenix_channels_client::{ 10 - Channel, Event, EventPayload, EventsError, Payload, Socket, Topic, JSON, 11 - }; 10 + use phoenix_channels_client::{Event, EventPayload, EventsError, Payload, Socket, Topic, JSON}; 12 11 use serde_derive::{Deserialize, Serialize}; 13 12 use sha2::Sha256; 13 + use tokio::signal; 14 14 use tokio::sync::mpsc; 15 - use tokio::{signal, time}; 16 - use tracing::{debug, error, info}; 15 + use tracing::{debug, error, info, warn}; 17 16 18 17 pub struct Daemon { 19 18 tree: Tree, 20 19 socket: Arc<Socket>, 21 - lobby_channel: Arc<Channel>, 22 - lobby_topic: Arc<Topic>, 23 20 } 24 21 25 22 #[derive(Deserialize, Serialize)] ··· 48 45 Err(e) => panic!("Failed to connect, {}", e), 49 46 } 50 47 51 - let topic = Topic::from_string("client:all".to_string()); 52 - info!("Joining lobby {}", topic); 53 - let channel = socket.channel(topic.clone(), None).await.unwrap(); 54 - channel.join(Duration::from_secs(15)).await.unwrap(); 55 - 56 - Self { 57 - lobby_channel: channel, 58 - lobby_topic: topic, 59 - socket, 60 - tree, 61 - } 48 + Self { socket, tree } 62 49 } 63 50 64 51 fn sign_login_jwt(key: String, tree: &Tree) -> Result<String, jwt::Error> { ··· 73 60 Ok(jwt) 74 61 } 75 62 76 - //async fn login(&mut self) { 77 - // info!("Registering with sower"); 78 - // let Payload::JSONPayload { json } = self 79 - // .lobby_channel 80 - // .call( 81 - // Event::from_string("register".to_string()), 82 - // Payload::json_from_serialized( 83 - // json!({ "name": &self.tree.name, "type": &self.tree.seed_type}).to_string(), 84 - // ) 85 - // .unwrap(), 86 - // Duration::from_secs(5), 87 - // ) 88 - // .await 89 - // .unwrap() 90 - // else { 91 - // panic!("unable to register") 92 - // }; 93 - // 94 - // self.tree.id = if let JSON::Str { string, .. } = &json { 95 - // info!("Received tree id {}", string); 96 - // Some(string.to_string()) 97 - // } else { 98 - // panic!("unable to parse registration response") 99 - // }; 100 - //} 101 - 102 - pub async fn run(&mut self) -> Result<(), std::io::Error> { 63 + pub async fn run(&mut self) -> Result<(), anyhow::Error> { 103 64 //self.login().await; 104 65 let (private_channel_tx, mut private_channel_rx) = mpsc::channel(1); 105 - let (shutdown_send, shutdown_recv) = mpsc::unbounded_channel(); 66 + let (shutdown_send, mut shutdown_recv) = mpsc::unbounded_channel(); 106 67 107 68 tokio::select! { 108 69 _ = signal::ctrl_c() => { 70 + shutdown_send.send(true).unwrap() 71 + }, 72 + 73 + _ = shutdown_recv.recv() => { 109 74 info!("Received shutdown"); 110 - shutdown_send.send(true).unwrap() 111 75 }, 112 76 113 - _ = self.run_lobby(shutdown_recv, private_channel_tx) => {}, 77 + _ = Self::run_lobby(self.socket.clone(), private_channel_tx) => {}, 78 + 79 + _ = async { 80 + let tree_id = match private_channel_rx.recv().await { 81 + Some(tree_id) => { 82 + debug!("Setting server's tree_id to {}", tree_id); 83 + Some(tree_id) 84 + }, 85 + None => { 86 + error!("Failed to discover tree:id"); 87 + shutdown_send.send(true).unwrap(); 88 + None 89 + } 90 + }; 114 91 115 - //_ = async { 116 - // match private_channel_rx.recv().await { 117 - // Some(tree_id) => { 118 - // debug!("Setting server's tree_id to {}", tree_id); 119 - // self.tree.server_id = Some(tree_id) 120 - // }, 121 - // None => shutdown_send.send(true).unwrap() 122 - // }; 123 - // //let events = lobby_channel.events(); 124 - // //info!("Joining lobby {}", self.lobby_topic); 125 - // //lobby_channel.join(Duration::from_secs(15)).await.unwrap(); 126 - // // 127 - // //info!("Listening for lobby events"); 128 - // //loop { 129 - // // match events.event().await { 130 - // // event => debug!("{:?}", event) 131 - // // } 132 - // //} 133 - //} => {}, 92 + self.tree.server_id = tree_id.clone(); 134 93 135 - //_ = async { 136 - // let until = match statuses.status().await { 137 - // Ok(ChannelStatus::WaitingToRejoin { until }) => until, 138 - // other => panic!("Didn't wait to rejoin after being unauthorized instead {:?}", other) 139 - // }; 140 - //} => {}, 94 + Self::run_private_channel(self.socket.clone(), tree_id.unwrap()).await 141 95 142 - //_ = async { 143 - // info!("Starting submit loop"); 144 - // let mut interval = time::interval(time::Duration::from_secs(5)); 145 - // loop { 146 - // interval.tick().await; 147 - // let seeds = self.tree.seeds.clone().unwrap(); 148 - // let reply_payload = lobby_channel.call( 149 - // Event::from_string("seed:sync".to_string()), 150 - // Payload::json_from_serialized(json!({ "booted_seed": seeds.booted, "current_seed": seeds.current, "profile_seed": seeds.profile }).to_string()).unwrap(), 151 - // Duration::from_secs(5) 152 - // ).await; 153 - // 154 - // match reply_payload { 155 - // Ok(payload) => info!("got reply: {}", payload), 156 - // Err(err) => error!("error waiting for reply: {}", err) 157 - // } 158 - // } 159 - //} => {} 96 + } => {}, 160 97 } 161 98 162 99 info!("Closing socket"); ··· 166 103 Ok(()) 167 104 } 168 105 169 - async fn run_lobby( 170 - &mut self, 171 - mut shutdown_rx: mpsc::UnboundedReceiver<bool>, 172 - private_channel_tx: mpsc::Sender<String>, 173 - ) { 174 - let events = self.lobby_channel.events(); 175 - let lobby_topic = self.lobby_topic.clone(); 176 - info!("Joining lobby {}", self.lobby_topic); 177 - self.lobby_channel 106 + async fn run_lobby(socket: Arc<Socket>, private_channel_tx: mpsc::Sender<String>) { 107 + let topic = Topic::from_string("client:all".to_string()); 108 + debug!("Joining channel {}", topic); 109 + let channel = socket.channel(topic.clone(), None).await.unwrap(); 110 + channel.join(Duration::from_secs(15)).await.unwrap(); 111 + info!("Joined channel {}", topic); 112 + 113 + let events = channel.events(); 114 + let topic = topic.clone(); 115 + 116 + loop { 117 + match events.event().await { 118 + Ok(EventPayload { event, payload }) => match event { 119 + Event::User { .. } => { 120 + match payload { 121 + Payload::JSONPayload { 122 + json: JSON::Object { object }, 123 + } => match object.get("tree_id") { 124 + Some(JSON::Str { string: tree_id }) => { 125 + let _ = private_channel_tx.send(tree_id.to_string()).await; 126 + continue; 127 + } 128 + Some(unknown) => error!("Unknown tree_id: {}", unknown), 129 + None => error!("No tree_id received from server"), 130 + }, 131 + Payload::JSONPayload { json } => { 132 + debug!("[{}] unknown event {:?}", topic, json) 133 + } 134 + Payload::Binary { bytes } => { 135 + debug!("[{}] unknown event {:?}", topic, bytes) 136 + } 137 + }; 138 + } 139 + Event::Phoenix { phoenix } => { 140 + debug!("[{}] unknown phoenix event {}", topic, phoenix) 141 + } 142 + }, 143 + Err(events_error) => match events_error { 144 + EventsError::NoMoreEvents => break, 145 + EventsError::MissedEvents { missed_event_count } => { 146 + warn!("[{}] events missed: {}", topic, missed_event_count); 147 + } 148 + }, 149 + } 150 + } 151 + } 152 + 153 + async fn run_private_channel(socket: Arc<Socket>, tree_id: String) -> Result<()> { 154 + let topic = Topic::from_string(format!("client:{}", tree_id)); 155 + debug!("Joining channel {}", topic); 156 + let channel = socket.channel(topic.clone(), None).await?; 157 + channel 178 158 .join(Duration::from_secs(15)) 179 159 .await 160 + .with_context(|| format!("Failed to join channel {}", topic)) 180 161 .unwrap(); 162 + info!("Joined channel {}", topic); 181 163 182 - tokio::select! { 183 - _ = shutdown_rx.recv() => { 184 - debug!("Received shutdown to run_lobby") 185 - }, 186 - _ = async move { 187 - info!("Listening for lobby events"); 188 - loop { 189 - match events.event().await { 190 - Ok(EventPayload { event, payload }) => match event { 191 - Event::User { 192 - user: user_event_name, 193 - } => { 194 - let payload = match payload { 195 - Payload::JSONPayload { 196 - json: JSON::Object { object }, 197 - } => { 198 - match object.get("tree_id") { 199 - Some(JSON::Str { string: tree_id }) => { 200 - let _ = private_channel_tx.send(tree_id.to_string()).await; 201 - } 202 - Some(unknown) => error!("Unknown tree_id: {}", unknown), 203 - None => error!("No tree_id received from server"), 204 - } 205 - } 206 - Payload::JSONPayload { json } => debug!("{:?}", json), 207 - Payload::Binary { bytes } => debug!("{:?}", bytes), 208 - }; 209 - println!( 210 - "channel {} event {} sent with payload {:#?}", 211 - lobby_topic, user_event_name, payload 212 - ); 213 - } 214 - Event::Phoenix { phoenix } => { 215 - println!("channel {} {}", lobby_topic, phoenix) 216 - } 217 - }, 218 - Err(events_error) => match events_error { 219 - EventsError::NoMoreEvents => break, 220 - EventsError::MissedEvents { missed_event_count } => { 221 - eprintln!( 222 - "{} events missed on channel {}", 223 - missed_event_count, lobby_topic 224 - ); 225 - } 226 - }, 227 - } 228 - } 229 - } => { 230 - info!("Leaving the lobby"); 231 - self.lobby_channel.leave().await.unwrap(); 164 + let events = channel.events(); 165 + 166 + info!("Listening for private events"); 167 + loop { 168 + match events.event().await { 169 + event => debug!("{:?}", event), 232 170 } 233 171 } 234 172 }
+1
flake.nix
··· 66 66 67 67 # rust 68 68 pkgs.cargo 69 + pkgs.cargo-watch 69 70 pkgs.clippy 70 71 pkgs.rust-analyzer 71 72 pkgs.rustc
+9 -1
lib/sower_web/client_channel.ex
··· 8 8 {:ok, socket} 9 9 end 10 10 11 - def join("client:" <> client_name, _params, _socket) do 11 + def join("client:" <> client_name, _params, socket = %{assigns: %{tree_id: tree_id}}) do 12 + if tree_id == client_name do 13 + {:ok, socket} 14 + else 15 + {:error, %{reason: "unauthorized"}} 16 + end 17 + end 18 + 19 + def join(_topic, _params, _socket) do 12 20 {:error, %{reason: "unauthorized"}} 13 21 end 14 22
+1 -1
lib/sower_web/live/tree_live/index.html.heex
··· 9 9 row_click={fn {_id, tree} -> JS.navigate(~p"/trees/#{tree}") end} 10 10 > 11 11 <:col :let={{_id, tree}} label="name"><%= tree.name %></:col> 12 - <:col :let={{_id, tree}} label="type"><%= tree.type %></:col> 12 + <:col :let={{_id, tree}} label="seed type"><%= tree.seed_type %></:col> 13 13 <:action :let={{_id, tree}}> 14 14 <div class="sr-only"> 15 15 <.link navigate={~p"/trees/#{tree}"}>Show</.link>