A human-friendly DSL for ATProto Lexicons
27
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add mlf-plugin-host + mlf-dns-cloudflare

Create the subprocess plugin protocol and the runtime that drives
plugins from the host side. Line-delimited JSON over stdin/stdout:
hello handshake (with options_schema, capabilities), init (for
host-assembled credentials), typed ops per plugin kind, and a
multi-turn ask/answer/ack mechanism for flows that can't reduce to a
simple schema (OAuth device codes, magic-link confirmations). The host
does capability gating before sending ops and refuses ask messages
under a DenyInteractiveUi for --non-interactive callers.

Plugin-side scaffolding lives in plugin.rs as a small Server that
binds to stdio and wraps the per-message bookkeeping, so individual
plugins implement only their op dispatch.

Ship mlf-dns-cloudflare as the first real plugin — a binary that
translates the five DNS ops (login/resolve_zone/list_txt/upsert_txt/
delete_txt) into Cloudflare API calls via reqwest. Options schema is
one secret field, api_token. Zone lookup walks parent domains so
_lexicon.forum.example.com finds the example.com zone automatically.

End-to-end tests use tokio::io::duplex pipes to wire a Server and a
PluginHandle to each other in-process and cover the handshake,
capability gating, happy-path round-trips, multi-turn flows,
non-interactive refusal, and early-exit handling.

authored by stavola.xyz and committed by

Tangled afee6ed9 df1ea344

+2238 -1
+56 -1
Cargo.lock
··· 490 490 491 491 [[package]] 492 492 name = "console" 493 + version = "0.15.11" 494 + source = "registry+https://github.com/rust-lang/crates.io-index" 495 + checksum = "054ccb5b10f9f2cbf51eb355ca1d05c2d279ce1804688d0db74b4733a5aeafd8" 496 + dependencies = [ 497 + "encode_unicode", 498 + "libc", 499 + "once_cell", 500 + "unicode-width 0.2.1", 501 + "windows-sys 0.59.0", 502 + ] 503 + 504 + [[package]] 505 + name = "console" 493 506 version = "0.16.3" 494 507 source = "registry+https://github.com/rust-lang/crates.io-index" 495 508 checksum = "d64e8af5551369d19cf50138de61f1c42074ab970f74e99be916646777f8fc87" ··· 646 659 checksum = "a41953f86f8a05768a6cda24def994fd2f424b04ec5c719cf89989779f199071" 647 660 dependencies = [ 648 661 "powerfmt", 662 + ] 663 + 664 + [[package]] 665 + name = "dialoguer" 666 + version = "0.11.0" 667 + source = "registry+https://github.com/rust-lang/crates.io-index" 668 + checksum = "658bce805d770f407bc62102fca7c2c64ceef2fbcb2b8bd19d2765ce093980de" 669 + dependencies = [ 670 + "console 0.15.11", 671 + "shell-words", 672 + "thiserror 1.0.69", 673 + "zeroize", 649 674 ] 650 675 651 676 [[package]] ··· 1348 1373 source = "registry+https://github.com/rust-lang/crates.io-index" 1349 1374 checksum = "7b4a6248eb93a4401ed2f37dfe8ea592d3cf05b7cf4f8efa867b6895af7e094e" 1350 1375 dependencies = [ 1351 - "console", 1376 + "console 0.16.3", 1352 1377 "once_cell", 1353 1378 "serde", 1354 1379 "similar", ··· 1738 1763 ] 1739 1764 1740 1765 [[package]] 1766 + name = "mlf-dns-cloudflare" 1767 + version = "0.1.0" 1768 + dependencies = [ 1769 + "mlf-plugin-host", 1770 + "reqwest", 1771 + "serde", 1772 + "serde_json", 1773 + "thiserror 2.0.17", 1774 + "tokio", 1775 + ] 1776 + 1777 + [[package]] 1741 1778 name = "mlf-integration-tests" 1742 1779 version = "0.1.0" 1743 1780 dependencies = [ ··· 1806 1843 "mlf-codegen-typescript", 1807 1844 "mlf-wasm", 1808 1845 "wasm-bindgen", 1846 + ] 1847 + 1848 + [[package]] 1849 + name = "mlf-plugin-host" 1850 + version = "0.1.0" 1851 + dependencies = [ 1852 + "async-trait", 1853 + "dialoguer", 1854 + "serde", 1855 + "serde_json", 1856 + "thiserror 2.0.17", 1857 + "tokio", 1809 1858 ] 1810 1859 1811 1860 [[package]] ··· 2630 2679 dependencies = [ 2631 2680 "lazy_static", 2632 2681 ] 2682 + 2683 + [[package]] 2684 + name = "shell-words" 2685 + version = "1.1.1" 2686 + source = "registry+https://github.com/rust-lang/crates.io-index" 2687 + checksum = "dc6fe69c597f9c37bfeeeeeb33da3530379845f10be461a66d16d03eca2ded77" 2633 2688 2634 2689 [[package]] 2635 2690 name = "shlex"
+2
Cargo.toml
··· 4 4 "codegen-plugins/mlf-codegen-go", 5 5 "codegen-plugins/mlf-codegen-rust", 6 6 "codegen-plugins/mlf-codegen-typescript", 7 + "dns-plugins/mlf-dns-cloudflare", 7 8 "mlf-atproto", 8 9 "mlf-cli", 10 + "mlf-plugin-host", 9 11 "mlf-codegen", 10 12 "mlf-diagnostics", 11 13 "mlf-lexicon-fetcher",
+18
dns-plugins/mlf-dns-cloudflare/Cargo.toml
··· 1 + [package] 2 + name = "mlf-dns-cloudflare" 3 + version = "0.1.0" 4 + edition = "2024" 5 + license = "MIT" 6 + description = "Official MLF DNS provider plugin for Cloudflare" 7 + 8 + [[bin]] 9 + name = "mlf-dns-cloudflare" 10 + path = "src/main.rs" 11 + 12 + [dependencies] 13 + mlf-plugin-host = { path = "../../mlf-plugin-host" } 14 + reqwest = { version = "0.12", features = ["json"] } 15 + serde = { version = "1", features = ["derive"] } 16 + serde_json = "1" 17 + thiserror = "2" 18 + tokio = { version = "1", features = ["io-util", "macros", "rt"] }
+366
dns-plugins/mlf-dns-cloudflare/src/api.rs
··· 1 + //! Thin Cloudflare API wrapper for the plugin's needs: zone lookup 2 + //! by DNS name, TXT record list/upsert/delete. Pure HTTPS calls with 3 + //! `Bearer <token>` auth. 4 + 5 + use serde::Deserialize; 6 + use thiserror::Error; 7 + 8 + const API_BASE: &str = "https://api.cloudflare.com/client/v4"; 9 + 10 + #[derive(Error, Debug)] 11 + pub enum CloudflareError { 12 + #[error("Cloudflare HTTP error: {0}")] 13 + Http(String), 14 + #[error("Cloudflare API error: {0}")] 15 + Api(String), 16 + #[error("JSON decode error: {0}")] 17 + Decode(String), 18 + } 19 + 20 + impl From<reqwest::Error> for CloudflareError { 21 + fn from(e: reqwest::Error) -> Self { 22 + CloudflareError::Http(e.to_string()) 23 + } 24 + } 25 + 26 + pub struct CloudflareClient { 27 + client: reqwest::Client, 28 + token: String, 29 + } 30 + 31 + impl CloudflareClient { 32 + pub fn new(token: &str) -> Self { 33 + Self { 34 + client: reqwest::Client::new(), 35 + token: token.to_string(), 36 + } 37 + } 38 + 39 + /// Validate the API token. Returns the primary email address 40 + /// on success (as a display name), or `None` if the endpoint 41 + /// doesn't expose one. 42 + pub async fn verify(&self) -> Result<Option<String>, CloudflareError> { 43 + #[derive(Deserialize)] 44 + struct Resp { 45 + success: bool, 46 + #[serde(default)] 47 + errors: Vec<ApiErr>, 48 + #[serde(default)] 49 + result: Option<VerifyResult>, 50 + } 51 + #[derive(Deserialize)] 52 + struct VerifyResult { 53 + #[serde(default)] 54 + status: Option<String>, 55 + } 56 + let resp: Resp = self 57 + .client 58 + .get(format!("{API_BASE}/user/tokens/verify")) 59 + .bearer_auth(&self.token) 60 + .send() 61 + .await? 62 + .json() 63 + .await 64 + .map_err(|e| CloudflareError::Decode(e.to_string()))?; 65 + if !resp.success { 66 + return Err(CloudflareError::Api(format_errors(&resp.errors))); 67 + } 68 + Ok(resp.result.and_then(|r| r.status)) 69 + } 70 + 71 + /// Find the zone that covers the given DNS name by walking up 72 + /// parent domains until a match is found. Returns `None` if no 73 + /// zone the token has access to covers it. 74 + pub async fn find_zone_for(&self, dns_name: &str) -> Result<Option<Zone>, CloudflareError> { 75 + let stripped = dns_name.strip_prefix("_lexicon.").unwrap_or(dns_name); 76 + for candidate in parent_domains(stripped) { 77 + if let Some(zone) = self.get_zone_by_name(&candidate).await? { 78 + return Ok(Some(zone)); 79 + } 80 + } 81 + Ok(None) 82 + } 83 + 84 + async fn get_zone_by_name(&self, name: &str) -> Result<Option<Zone>, CloudflareError> { 85 + #[derive(Deserialize)] 86 + struct Resp { 87 + success: bool, 88 + #[serde(default)] 89 + errors: Vec<ApiErr>, 90 + #[serde(default)] 91 + result: Vec<Zone>, 92 + } 93 + let resp: Resp = self 94 + .client 95 + .get(format!("{API_BASE}/zones")) 96 + .bearer_auth(&self.token) 97 + .query(&[("name", name)]) 98 + .send() 99 + .await? 100 + .json() 101 + .await 102 + .map_err(|e| CloudflareError::Decode(e.to_string()))?; 103 + if !resp.success { 104 + return Err(CloudflareError::Api(format_errors(&resp.errors))); 105 + } 106 + Ok(resp.result.into_iter().next()) 107 + } 108 + 109 + pub async fn list_txt( 110 + &self, 111 + zone_id: &str, 112 + name: &str, 113 + ) -> Result<Vec<DnsRecord>, CloudflareError> { 114 + #[derive(Deserialize)] 115 + struct Resp { 116 + success: bool, 117 + #[serde(default)] 118 + errors: Vec<ApiErr>, 119 + #[serde(default)] 120 + result: Vec<DnsRecord>, 121 + } 122 + let resp: Resp = self 123 + .client 124 + .get(format!("{API_BASE}/zones/{zone_id}/dns_records")) 125 + .bearer_auth(&self.token) 126 + .query(&[("type", "TXT"), ("name", name)]) 127 + .send() 128 + .await? 129 + .json() 130 + .await 131 + .map_err(|e| CloudflareError::Decode(e.to_string()))?; 132 + if !resp.success { 133 + return Err(CloudflareError::Api(format_errors(&resp.errors))); 134 + } 135 + // Cloudflare returns TXT content as an RFC 1035 quoted string 136 + // (e.g. `"did=..."`). Normalise to the raw payload so callers 137 + // don't need to know about the wire format. 138 + Ok(resp 139 + .result 140 + .into_iter() 141 + .map(|mut r| { 142 + r.content = unquote_txt(&r.content); 143 + r 144 + }) 145 + .collect()) 146 + } 147 + 148 + /// Create the TXT if absent, otherwise update the first matching 149 + /// record in place. Cloudflare allows multiple TXT records with the 150 + /// same name; we normalise to "there is exactly one". 151 + pub async fn upsert_txt( 152 + &self, 153 + zone_id: &str, 154 + name: &str, 155 + value: &str, 156 + ttl: u32, 157 + ) -> Result<String, CloudflareError> { 158 + let quoted = format!("\"{}\"", escape_for_txt(value)); 159 + let existing = self.list_txt(zone_id, name).await?; 160 + if let Some(first) = existing.first() { 161 + // Update 162 + #[derive(Deserialize)] 163 + struct Resp { 164 + success: bool, 165 + #[serde(default)] 166 + errors: Vec<ApiErr>, 167 + #[serde(default)] 168 + result: Option<DnsRecord>, 169 + } 170 + let body = serde_json::json!({ 171 + "type": "TXT", 172 + "name": name, 173 + "content": quoted, 174 + "ttl": ttl, 175 + "comment": "Automatically added by mlf publish", 176 + }); 177 + let resp: Resp = self 178 + .client 179 + .put(format!( 180 + "{API_BASE}/zones/{zone_id}/dns_records/{}", 181 + first.id 182 + )) 183 + .bearer_auth(&self.token) 184 + .json(&body) 185 + .send() 186 + .await? 187 + .json() 188 + .await 189 + .map_err(|e| CloudflareError::Decode(e.to_string()))?; 190 + if !resp.success { 191 + return Err(CloudflareError::Api(format_errors(&resp.errors))); 192 + } 193 + return Ok(resp 194 + .result 195 + .map(|r| r.id) 196 + .unwrap_or_else(|| first.id.clone())); 197 + } 198 + // Create 199 + #[derive(Deserialize)] 200 + struct Resp { 201 + success: bool, 202 + #[serde(default)] 203 + errors: Vec<ApiErr>, 204 + #[serde(default)] 205 + result: Option<DnsRecord>, 206 + } 207 + let body = serde_json::json!({ 208 + "type": "TXT", 209 + "name": name, 210 + "content": quoted, 211 + "ttl": ttl, 212 + }); 213 + let resp: Resp = self 214 + .client 215 + .post(format!("{API_BASE}/zones/{zone_id}/dns_records")) 216 + .bearer_auth(&self.token) 217 + .json(&body) 218 + .send() 219 + .await? 220 + .json() 221 + .await 222 + .map_err(|e| CloudflareError::Decode(e.to_string()))?; 223 + if !resp.success { 224 + return Err(CloudflareError::Api(format_errors(&resp.errors))); 225 + } 226 + resp.result 227 + .map(|r| r.id) 228 + .ok_or_else(|| CloudflareError::Api("create returned no result".into())) 229 + } 230 + 231 + pub async fn delete_txt(&self, zone_id: &str, record_id: &str) -> Result<(), CloudflareError> { 232 + #[derive(Deserialize)] 233 + struct Resp { 234 + success: bool, 235 + #[serde(default)] 236 + errors: Vec<ApiErr>, 237 + } 238 + let resp: Resp = self 239 + .client 240 + .delete(format!( 241 + "{API_BASE}/zones/{zone_id}/dns_records/{record_id}" 242 + )) 243 + .bearer_auth(&self.token) 244 + .send() 245 + .await? 246 + .json() 247 + .await 248 + .map_err(|e| CloudflareError::Decode(e.to_string()))?; 249 + if !resp.success { 250 + return Err(CloudflareError::Api(format_errors(&resp.errors))); 251 + } 252 + Ok(()) 253 + } 254 + } 255 + 256 + #[derive(Debug, Clone, Deserialize)] 257 + pub struct Zone { 258 + pub id: String, 259 + #[allow(dead_code)] 260 + pub name: String, 261 + } 262 + 263 + #[derive(Debug, Clone, Deserialize)] 264 + pub struct DnsRecord { 265 + pub id: String, 266 + #[allow(dead_code)] 267 + pub name: String, 268 + pub content: String, 269 + } 270 + 271 + #[derive(Debug, Clone, Deserialize)] 272 + struct ApiErr { 273 + #[allow(dead_code)] 274 + code: i64, 275 + message: String, 276 + } 277 + 278 + fn format_errors(errs: &[ApiErr]) -> String { 279 + if errs.is_empty() { 280 + return "unknown error".into(); 281 + } 282 + errs.iter() 283 + .map(|e| e.message.clone()) 284 + .collect::<Vec<_>>() 285 + .join("; ") 286 + } 287 + 288 + /// Wrap a TXT payload per RFC 1035 `character-string` quoting: 289 + /// double-quote delimited, backslash + double-quote escaped. 290 + fn escape_for_txt(s: &str) -> String { 291 + s.replace('\\', "\\\\").replace('"', "\\\"") 292 + } 293 + 294 + /// Reverse of [`escape_for_txt`]: strip one pair of outer quotes and 295 + /// unescape `\\` / `\"`. Non-quoted input is returned untouched so the 296 + /// function is safe to apply defensively. 297 + fn unquote_txt(s: &str) -> String { 298 + let inner = s 299 + .strip_prefix('"') 300 + .and_then(|t| t.strip_suffix('"')) 301 + .unwrap_or(s); 302 + let mut out = String::with_capacity(inner.len()); 303 + let mut chars = inner.chars(); 304 + while let Some(c) = chars.next() { 305 + if c == '\\' { 306 + match chars.next() { 307 + Some('\\') => out.push('\\'), 308 + Some('"') => out.push('"'), 309 + Some(other) => { 310 + out.push('\\'); 311 + out.push(other); 312 + } 313 + None => out.push('\\'), 314 + } 315 + } else { 316 + out.push(c); 317 + } 318 + } 319 + out 320 + } 321 + 322 + /// Yield each parent domain walking up: `a.b.c.d` → `a.b.c.d`, `b.c.d`, 323 + /// `c.d`, `d`. Useful for finding the zone that covers a subdomain. 324 + fn parent_domains(name: &str) -> Vec<String> { 325 + let mut out = Vec::new(); 326 + let parts: Vec<&str> = name.split('.').collect(); 327 + for i in 0..parts.len() { 328 + out.push(parts[i..].join(".")); 329 + } 330 + out 331 + } 332 + 333 + #[cfg(test)] 334 + mod tests { 335 + use super::*; 336 + 337 + #[test] 338 + fn txt_quote_round_trip() { 339 + // Plain DID-style payload: no specials, quotes wrap cleanly. 340 + let raw = "did=did:plc:xl243nyru4tbbqjkuf2uvmna"; 341 + let wrapped = format!("\"{}\"", escape_for_txt(raw)); 342 + assert_eq!(wrapped, format!("\"{raw}\"")); 343 + assert_eq!(unquote_txt(&wrapped), raw); 344 + 345 + // Payload with embedded `"` and `\` — round-trips via escapes. 346 + let raw = r#"a"b\c"#; 347 + let wrapped = format!("\"{}\"", escape_for_txt(raw)); 348 + assert_eq!(wrapped, r#""a\"b\\c""#); 349 + assert_eq!(unquote_txt(&wrapped), raw); 350 + 351 + // Unquoted input passes through unharmed — defensive. 352 + assert_eq!(unquote_txt("did=plain"), "did=plain"); 353 + } 354 + 355 + #[test] 356 + fn parent_domains_walks_up() { 357 + assert_eq!( 358 + parent_domains("foo.example.com"), 359 + vec!["foo.example.com", "example.com", "com"] 360 + ); 361 + assert_eq!( 362 + parent_domains("a.b.c.d"), 363 + vec!["a.b.c.d", "b.c.d", "c.d", "d"] 364 + ); 365 + } 366 + }
+277
dns-plugins/mlf-dns-cloudflare/src/main.rs
··· 1 + //! Official MLF DNS provider plugin for Cloudflare. 2 + //! 3 + //! Speaks the `mlf-plugin-host` subprocess protocol over stdin/stdout 4 + //! and translates each op into a Cloudflare API call via HTTPS. 5 + //! 6 + //! Options schema: 7 + //! - `api_token` (secret, required) — a Cloudflare API token scoped 8 + //! with at least `Zone:Read` + `Zone.DNS:Edit` for the zones the 9 + //! user wants to publish under. 10 + 11 + mod api; 12 + 13 + use api::{CloudflareClient, CloudflareError}; 14 + use mlf_plugin_host::plugin::{Server, empty_data, params_as}; 15 + use mlf_plugin_host::protocol::{HelloData, OptionField, PROTOCOL_VERSION, Request}; 16 + use serde::{Deserialize, Serialize}; 17 + use serde_json::{Value, json}; 18 + 19 + #[tokio::main(flavor = "current_thread")] 20 + async fn main() -> std::io::Result<()> { 21 + let mut server = Server::stdio(); 22 + 23 + let identity = HelloData { 24 + name: "cloudflare".into(), 25 + protocol_version: PROTOCOL_VERSION, 26 + kind: Some("dns".into()), 27 + capabilities: vec![ 28 + "login".into(), 29 + "list_txt".into(), 30 + "upsert_txt".into(), 31 + "delete_txt".into(), 32 + "resolve_zone".into(), 33 + ], 34 + options_schema: vec![OptionField { 35 + name: "api_token".into(), 36 + label: "Cloudflare API token".into(), 37 + help: Some( 38 + "Create at https://dash.cloudflare.com/profile/api-tokens. \ 39 + Needs Zone:Read + Zone.DNS:Edit on the zones you publish under." 40 + .into(), 41 + ), 42 + secret: true, 43 + required: true, 44 + default: None, 45 + }], 46 + }; 47 + 48 + if server.handshake(identity).await.is_err() { 49 + return Ok(()); 50 + } 51 + 52 + let mut creds: Option<Credentials> = None; 53 + 54 + while let Ok(Some(req)) = server.next_request().await { 55 + if let Err(e) = dispatch(&mut server, &req, &mut creds).await { 56 + let _ = server.reply_err("internal", &e.to_string(), false).await; 57 + } 58 + } 59 + 60 + Ok(()) 61 + } 62 + 63 + /// Credentials the host stores for us and re-sends on every session. 64 + #[derive(Debug, Clone, Serialize, Deserialize)] 65 + struct Credentials { 66 + api_token: String, 67 + } 68 + 69 + /// Shape of `init`'s params. 70 + #[derive(Debug, Deserialize)] 71 + struct InitParams { 72 + #[serde(default)] 73 + credentials: Option<Credentials>, 74 + } 75 + 76 + #[derive(Debug, Deserialize)] 77 + struct ResolveZoneParams { 78 + domain: String, 79 + } 80 + 81 + #[derive(Debug, Deserialize)] 82 + struct ListTxtParams { 83 + name: String, 84 + } 85 + 86 + #[derive(Debug, Deserialize)] 87 + struct UpsertTxtParams { 88 + name: String, 89 + value: String, 90 + #[serde(default)] 91 + ttl: Option<u32>, 92 + } 93 + 94 + #[derive(Debug, Deserialize)] 95 + struct DeleteTxtParams { 96 + name: String, 97 + record_id: String, 98 + } 99 + 100 + #[derive(thiserror::Error, Debug)] 101 + enum DispatchError { 102 + #[error("{0}")] 103 + Plugin(#[from] mlf_plugin_host::plugin::PluginError), 104 + #[error("{0}")] 105 + Cloudflare(#[from] CloudflareError), 106 + } 107 + 108 + async fn dispatch<W, R>( 109 + server: &mut Server<W, R>, 110 + req: &Request, 111 + creds: &mut Option<Credentials>, 112 + ) -> Result<(), DispatchError> 113 + where 114 + W: tokio::io::AsyncWrite + Unpin, 115 + R: tokio::io::AsyncBufReadExt + Unpin, 116 + { 117 + match req.op.as_str() { 118 + "init" => { 119 + let InitParams { credentials } = params_as(req)?; 120 + *creds = credentials; 121 + server.reply_ok(empty_data()).await?; 122 + } 123 + "login" => { 124 + // Schema-driven plugin: credentials arrive pre-filled via 125 + // `init`. If they aren't set yet, we can't validate, so 126 + // return a clean error the host can surface. 127 + let Some(c) = creds.as_ref() else { 128 + server 129 + .reply_err( 130 + "no_credentials", 131 + "login called before init set credentials", 132 + false, 133 + ) 134 + .await?; 135 + return Ok(()); 136 + }; 137 + match CloudflareClient::new(&c.api_token).verify().await { 138 + Ok(user_email) => { 139 + server 140 + .reply_ok(json!({ 141 + "credentials": c, 142 + "display_name": user_email.unwrap_or_else(|| "cloudflare".to_string()), 143 + })) 144 + .await?; 145 + } 146 + Err(e) => { 147 + server 148 + .reply_err("invalid_token", &e.to_string(), false) 149 + .await?; 150 + } 151 + } 152 + } 153 + "logout" => { 154 + *creds = None; 155 + server.reply_ok(empty_data()).await?; 156 + } 157 + "resolve_zone" => { 158 + let ResolveZoneParams { domain } = params_as(req)?; 159 + let c = require_creds(server, creds).await?; 160 + match CloudflareClient::new(&c.api_token) 161 + .find_zone_for(&domain) 162 + .await 163 + { 164 + Ok(Some(zone)) => { 165 + server 166 + .reply_ok(json!({ 167 + "zone_id": zone.id, 168 + "covered": true, 169 + })) 170 + .await?; 171 + } 172 + Ok(None) => { 173 + server 174 + .reply_ok(json!({ 175 + "zone_id": Value::Null, 176 + "covered": false, 177 + })) 178 + .await?; 179 + } 180 + Err(e) => { 181 + server 182 + .reply_err("cloudflare_error", &e.to_string(), false) 183 + .await?; 184 + } 185 + } 186 + } 187 + "list_txt" => { 188 + let ListTxtParams { name } = params_as(req)?; 189 + let c = require_creds(server, creds).await?; 190 + let client = CloudflareClient::new(&c.api_token); 191 + let zone = match client.find_zone_for(&name).await? { 192 + Some(z) => z, 193 + None => { 194 + server 195 + .reply_err("unknown_zone", &format!("no zone covers {name}"), false) 196 + .await?; 197 + return Ok(()); 198 + } 199 + }; 200 + let records = client.list_txt(&zone.id, &name).await?; 201 + server 202 + .reply_ok(json!({ 203 + "records": records.into_iter().map(|r| json!({ 204 + "id": r.id, 205 + "value": r.content, 206 + })).collect::<Vec<_>>(), 207 + })) 208 + .await?; 209 + } 210 + "upsert_txt" => { 211 + let UpsertTxtParams { name, value, ttl } = params_as(req)?; 212 + let c = require_creds(server, creds).await?; 213 + let client = CloudflareClient::new(&c.api_token); 214 + let zone = match client.find_zone_for(&name).await? { 215 + Some(z) => z, 216 + None => { 217 + server 218 + .reply_err("unknown_zone", &format!("no zone covers {name}"), false) 219 + .await?; 220 + return Ok(()); 221 + } 222 + }; 223 + // Cloudflare treats TTL=1 as "Auto" (let CF pick). Default 224 + // to it when the host doesn't specify; we don't know or care 225 + // better than Cloudflare about propagation. 226 + let rec = client 227 + .upsert_txt(&zone.id, &name, &value, ttl.unwrap_or(1)) 228 + .await?; 229 + server.reply_ok(json!({ "record_id": rec })).await?; 230 + } 231 + "delete_txt" => { 232 + let DeleteTxtParams { name, record_id } = params_as(req)?; 233 + let c = require_creds(server, creds).await?; 234 + let client = CloudflareClient::new(&c.api_token); 235 + let zone = match client.find_zone_for(&name).await? { 236 + Some(z) => z, 237 + None => { 238 + server 239 + .reply_err("unknown_zone", &format!("no zone covers {name}"), false) 240 + .await?; 241 + return Ok(()); 242 + } 243 + }; 244 + client.delete_txt(&zone.id, &record_id).await?; 245 + server.reply_ok(empty_data()).await?; 246 + } 247 + other => { 248 + server 249 + .reply_err("unknown_op", &format!("unsupported op `{other}`"), false) 250 + .await?; 251 + } 252 + } 253 + Ok(()) 254 + } 255 + 256 + async fn require_creds<W, R>( 257 + server: &mut Server<W, R>, 258 + creds: &Option<Credentials>, 259 + ) -> Result<Credentials, DispatchError> 260 + where 261 + W: tokio::io::AsyncWrite + Unpin, 262 + R: tokio::io::AsyncBufReadExt + Unpin, 263 + { 264 + if let Some(c) = creds.clone() { 265 + return Ok(c); 266 + } 267 + server 268 + .reply_err( 269 + "no_credentials", 270 + "host hasn't called init with credentials yet", 271 + false, 272 + ) 273 + .await?; 274 + Err(DispatchError::Plugin( 275 + mlf_plugin_host::plugin::PluginError::Unexpected("missing credentials".into()), 276 + )) 277 + }
+17
mlf-plugin-host/Cargo.toml
··· 1 + [package] 2 + name = "mlf-plugin-host" 3 + version = "0.1.0" 4 + edition = "2024" 5 + license = "MIT" 6 + description = "Subprocess plugin protocol + host runtime for MLF plugins" 7 + 8 + [dependencies] 9 + async-trait = "0.1" 10 + dialoguer = { version = "0.11", default-features = false, features = ["password"] } 11 + serde = { version = "1", features = ["derive"] } 12 + serde_json = "1" 13 + thiserror = "2" 14 + tokio = { version = "1", features = ["io-std", "io-util", "macros", "process", "rt"] } 15 + 16 + [dev-dependencies] 17 + tokio = { version = "1", features = ["full"] }
+59
mlf-plugin-host/src/discovery.rs
··· 1 + //! Locate a plugin binary by name. 2 + //! 3 + //! Resolution order (first match wins): 4 + //! 1. `$PATH` — the usual Unix behaviour. 5 + //! 2. `~/.config/mlf/plugins/` — MLF's user-local plugin directory. 6 + //! 7 + //! The auto-fetch step (downloading an official plugin binary on first 8 + //! use) is a stretch goal and not implemented in R4. 9 + 10 + use std::path::{Path, PathBuf}; 11 + 12 + /// Try to find a plugin named `mlf-<kind>-<name>` (e.g. `mlf-dns-cloudflare`). 13 + pub fn find(kind: &str, name: &str) -> Option<PathBuf> { 14 + let bin_name = format!("mlf-{kind}-{name}"); 15 + if let Some(p) = find_on_path(&bin_name) { 16 + return Some(p); 17 + } 18 + if let Some(home) = home_dir() 19 + && let p = home 20 + .join(".config") 21 + .join("mlf") 22 + .join("plugins") 23 + .join(&bin_name) 24 + && p.is_file() 25 + { 26 + return Some(p); 27 + } 28 + None 29 + } 30 + 31 + fn find_on_path(bin: &str) -> Option<PathBuf> { 32 + let path_var = std::env::var_os("PATH")?; 33 + for dir in std::env::split_paths(&path_var) { 34 + let candidate = dir.join(bin); 35 + if is_executable(&candidate) { 36 + return Some(candidate); 37 + } 38 + } 39 + None 40 + } 41 + 42 + #[cfg(unix)] 43 + fn is_executable(p: &Path) -> bool { 44 + use std::os::unix::fs::PermissionsExt; 45 + std::fs::metadata(p) 46 + .map(|m| m.is_file() && (m.permissions().mode() & 0o111) != 0) 47 + .unwrap_or(false) 48 + } 49 + 50 + #[cfg(not(unix))] 51 + fn is_executable(p: &Path) -> bool { 52 + p.is_file() 53 + } 54 + 55 + fn home_dir() -> Option<PathBuf> { 56 + std::env::var_os("HOME") 57 + .or_else(|| std::env::var_os("USERPROFILE")) 58 + .map(PathBuf::from) 59 + }
+361
mlf-plugin-host/src/host.rs
··· 1 + //! Host-side runtime: spawn a plugin binary, do the handshake, drive 2 + //! it through ops while servicing the non-terminal `ask` messages with 3 + //! a [`UiHandler`]. 4 + //! 5 + //! [`PluginHandle`] is generic over the I/O streams so tests can drive 6 + //! it with `tokio::io::duplex` pipes instead of real subprocesses. 7 + 8 + use crate::protocol::{ 9 + Ack, Answer, Ask, AskKind, CapabilityMissing, ErrorBody, Hello, HelloData, PROTOCOL_VERSION, 10 + Request, Response, Terminal, 11 + }; 12 + use crate::ui::{UiHandler, UiMessage}; 13 + use serde::de::DeserializeOwned; 14 + use serde_json::{Value, json}; 15 + use thiserror::Error; 16 + use tokio::io::{AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader}; 17 + use tokio::process::{Child, ChildStdin, ChildStdout}; 18 + 19 + #[derive(Error, Debug)] 20 + pub enum HostError { 21 + #[error("Failed to spawn plugin `{program}`: {source}")] 22 + Spawn { 23 + program: String, 24 + #[source] 25 + source: std::io::Error, 26 + }, 27 + 28 + #[error("Plugin exited before completing the exchange (exit status: {status:?})")] 29 + EarlyExit { status: Option<i32> }, 30 + 31 + #[error("I/O error talking to plugin: {0}")] 32 + Io(#[from] std::io::Error), 33 + 34 + #[error("Invalid JSON from plugin: {0}")] 35 + Decode(String), 36 + 37 + #[error( 38 + "Plugin advertised protocol_version {plugin}, host speaks {host}. This host can't drive it." 39 + )] 40 + IncompatibleProtocol { host: u32, plugin: u32 }, 41 + 42 + #[error("Plugin does not advertise capability `{0}`")] 43 + MissingCapability(String), 44 + 45 + #[error("Plugin returned error {code}: {message}")] 46 + PluginError { 47 + code: String, 48 + message: String, 49 + retryable: bool, 50 + }, 51 + 52 + #[error(transparent)] 53 + Ui(#[from] crate::ui::UiError), 54 + } 55 + 56 + /// A running plugin subprocess with the handshake already complete. 57 + /// 58 + /// Generic over I/O so tests can drive it with in-memory pipes. 59 + pub struct PluginHandle<W, R> 60 + where 61 + W: AsyncWrite + Unpin + Send, 62 + R: AsyncBufReadExt + Unpin + Send, 63 + { 64 + writer: W, 65 + reader: R, 66 + hello: HelloData, 67 + /// Optional underlying child — owned so we can await its exit on drop. 68 + child: Option<Child>, 69 + } 70 + 71 + impl PluginHandle<ChildStdin, BufReader<ChildStdout>> { 72 + /// Spawn `program` as a subprocess and perform the `hello` handshake. 73 + pub async fn spawn(program: &str, args: &[&str]) -> Result<Self, HostError> { 74 + let mut cmd = tokio::process::Command::new(program); 75 + cmd.args(args); 76 + cmd.stdin(std::process::Stdio::piped()); 77 + cmd.stdout(std::process::Stdio::piped()); 78 + cmd.stderr(std::process::Stdio::inherit()); 79 + let mut child = cmd.spawn().map_err(|e| HostError::Spawn { 80 + program: program.to_string(), 81 + source: e, 82 + })?; 83 + let stdin = child.stdin.take().ok_or_else(|| HostError::Spawn { 84 + program: program.to_string(), 85 + source: std::io::Error::other("child stdin unavailable"), 86 + })?; 87 + let stdout = child.stdout.take().ok_or_else(|| HostError::Spawn { 88 + program: program.to_string(), 89 + source: std::io::Error::other("child stdout unavailable"), 90 + })?; 91 + let reader = BufReader::new(stdout); 92 + PluginHandle::from_streams(stdin, reader, Some(child)).await 93 + } 94 + } 95 + 96 + impl<W, R> PluginHandle<W, R> 97 + where 98 + W: AsyncWrite + Unpin + Send, 99 + R: AsyncBufReadExt + Unpin + Send, 100 + { 101 + /// Low-level constructor. Performs the handshake on the given streams. 102 + pub async fn from_streams( 103 + writer: W, 104 + reader: R, 105 + child: Option<Child>, 106 + ) -> Result<Self, HostError> { 107 + let mut handle = PluginHandle { 108 + writer, 109 + reader, 110 + hello: HelloData { 111 + name: String::new(), 112 + protocol_version: 0, 113 + kind: None, 114 + capabilities: Vec::new(), 115 + options_schema: Vec::new(), 116 + }, 117 + child, 118 + }; 119 + handle.handshake().await?; 120 + Ok(handle) 121 + } 122 + 123 + async fn handshake(&mut self) -> Result<(), HostError> { 124 + let hello = Hello::new(PROTOCOL_VERSION); 125 + write_json(&mut self.writer, &hello).await?; 126 + let response = self.read_terminal("hello").await?; 127 + let data: HelloData = 128 + serde_json::from_value(response).map_err(|e| HostError::Decode(e.to_string()))?; 129 + if data.protocol_version != PROTOCOL_VERSION { 130 + return Err(HostError::IncompatibleProtocol { 131 + host: PROTOCOL_VERSION, 132 + plugin: data.protocol_version, 133 + }); 134 + } 135 + self.hello = data; 136 + Ok(()) 137 + } 138 + 139 + pub fn hello(&self) -> &HelloData { 140 + &self.hello 141 + } 142 + 143 + /// Check that the plugin advertises every capability in `required`. 144 + /// Returns the first one that's missing. 145 + pub fn require_capabilities(&self, required: &[&str]) -> Result<(), CapabilityMissing> { 146 + for cap in required { 147 + if !self.hello.capabilities.iter().any(|c| c == cap) { 148 + return Err(CapabilityMissing { 149 + capability: cap.to_string(), 150 + }); 151 + } 152 + } 153 + Ok(()) 154 + } 155 + 156 + /// Send an `init` op carrying the assembled session credentials. 157 + pub async fn init(&mut self, credentials: Value) -> Result<(), HostError> { 158 + let req = Request { 159 + op: "init".to_string(), 160 + params: json!({"credentials": credentials}), 161 + }; 162 + let _: Value = self.call_raw(&req, &mut NoopUi).await?; 163 + Ok(()) 164 + } 165 + 166 + /// Invoke an op and deserialise the terminal response's `data` field. 167 + /// 168 + /// If the plugin emits `ask` messages, they're dispatched to `ui`. 169 + pub async fn call<T: DeserializeOwned>( 170 + &mut self, 171 + op: &str, 172 + params: Value, 173 + ui: &mut dyn UiHandler, 174 + ) -> Result<T, HostError> { 175 + let req = Request { 176 + op: op.to_string(), 177 + params, 178 + }; 179 + let raw: Value = self.call_raw(&req, ui).await?; 180 + serde_json::from_value(raw).map_err(|e| HostError::Decode(e.to_string())) 181 + } 182 + 183 + /// Same as [`call`] but returns the raw `data` value. 184 + pub async fn call_raw( 185 + &mut self, 186 + req: &Request, 187 + ui: &mut dyn UiHandler, 188 + ) -> Result<Value, HostError> { 189 + let required_capability = op_capability(&req.op); 190 + if let Some(cap) = required_capability 191 + && !self.hello.capabilities.iter().any(|c| c == cap) 192 + { 193 + return Err(HostError::MissingCapability(cap.to_string())); 194 + } 195 + write_json(&mut self.writer, req).await?; 196 + 197 + loop { 198 + let line = self.read_line(&req.op).await?; 199 + let response: Response = serde_json::from_str(&line) 200 + .map_err(|e| HostError::Decode(format!("parsing `{line}`: {e}")))?; 201 + match response { 202 + Response::Terminal(Terminal::Ok { data, .. }) => return Ok(data), 203 + Response::Terminal(Terminal::Err { error, .. }) => { 204 + let ErrorBody { 205 + code, 206 + message, 207 + retryable, 208 + } = error; 209 + return Err(HostError::PluginError { 210 + code, 211 + message, 212 + retryable, 213 + }); 214 + } 215 + Response::Ask(Ask { kind }) => { 216 + let message = ask_to_message(kind.clone()); 217 + let resolved = ui.handle(message).await?; 218 + self.reply_to_ask(&kind, resolved).await?; 219 + } 220 + } 221 + } 222 + } 223 + 224 + async fn reply_to_ask( 225 + &mut self, 226 + kind: &AskKind, 227 + resolved: crate::ui::UiResponse, 228 + ) -> Result<(), HostError> { 229 + use crate::ui::UiResponse; 230 + match (kind, resolved) { 231 + (AskKind::Prompt { name, .. }, UiResponse::Answer(value)) 232 + | (AskKind::Select { name, .. }, UiResponse::Answer(value)) 233 + | (AskKind::Confirm { name, .. }, UiResponse::Answer(value)) => { 234 + write_json( 235 + &mut self.writer, 236 + &Answer { 237 + answer: json!({ name.clone(): value }), 238 + }, 239 + ) 240 + .await?; 241 + } 242 + (AskKind::OpenUrl { .. } | AskKind::Info { .. }, UiResponse::Ack) => { 243 + write_json(&mut self.writer, &Ack { ack: true }).await?; 244 + } 245 + (kind, resp) => { 246 + return Err(HostError::Decode(format!( 247 + "UiHandler returned {resp:?} for ask {kind:?} — expected matching kind", 248 + ))); 249 + } 250 + } 251 + Ok(()) 252 + } 253 + 254 + async fn read_terminal(&mut self, op: &str) -> Result<Value, HostError> { 255 + let line = self.read_line(op).await?; 256 + let t: Terminal = 257 + serde_json::from_str(&line).map_err(|e| HostError::Decode(e.to_string()))?; 258 + match t { 259 + Terminal::Ok { data, .. } => Ok(data), 260 + Terminal::Err { error, .. } => Err(HostError::PluginError { 261 + code: error.code, 262 + message: error.message, 263 + retryable: error.retryable, 264 + }), 265 + } 266 + } 267 + 268 + async fn read_line(&mut self, op: &str) -> Result<String, HostError> { 269 + let mut line = String::new(); 270 + let n = self.reader.read_line(&mut line).await?; 271 + if n == 0 { 272 + let status = self.try_exit_code().await; 273 + return Err(HostError::EarlyExit { status }); 274 + } 275 + let trimmed = line.trim_end_matches(['\n', '\r']).to_string(); 276 + if trimmed.is_empty() { 277 + return Err(HostError::Decode(format!( 278 + "plugin returned an empty line for op `{op}`" 279 + ))); 280 + } 281 + Ok(trimmed) 282 + } 283 + 284 + async fn try_exit_code(&mut self) -> Option<i32> { 285 + let child = self.child.as_mut()?; 286 + child.wait().await.ok().and_then(|s| s.code()) 287 + } 288 + 289 + /// Close stdin and wait for the plugin to exit. 290 + pub async fn shutdown(mut self) -> Result<(), HostError> { 291 + drop(self.writer); 292 + if let Some(mut child) = self.child.take() { 293 + let _ = child.wait().await; 294 + } 295 + Ok(()) 296 + } 297 + } 298 + 299 + fn op_capability(op: &str) -> Option<&'static str> { 300 + match op { 301 + "init" | "login" | "logout" | "hello" => None, 302 + "resolve_zone" => Some("resolve_zone"), 303 + "list_txt" => Some("list_txt"), 304 + "upsert_txt" => Some("upsert_txt"), 305 + "delete_txt" => Some("delete_txt"), 306 + _ => None, 307 + } 308 + } 309 + 310 + fn ask_to_message(kind: AskKind) -> UiMessage { 311 + match kind { 312 + AskKind::Prompt { 313 + name, 314 + label, 315 + secret, 316 + } => UiMessage::Prompt { 317 + name, 318 + label, 319 + secret, 320 + }, 321 + AskKind::Select { 322 + name, 323 + label, 324 + options, 325 + } => UiMessage::Select { 326 + name, 327 + label, 328 + options, 329 + }, 330 + AskKind::Confirm { name, label } => UiMessage::Confirm { name, label }, 331 + AskKind::OpenUrl { url } => UiMessage::OpenUrl { url }, 332 + AskKind::Info { text } => UiMessage::Info { text }, 333 + } 334 + } 335 + 336 + async fn write_json<W, T>(writer: &mut W, value: &T) -> std::io::Result<()> 337 + where 338 + W: AsyncWrite + Unpin + Send, 339 + T: serde::Serialize, 340 + { 341 + let mut s = serde_json::to_string(value).expect("message is JSON-serialisable"); 342 + s.push('\n'); 343 + writer.write_all(s.as_bytes()).await?; 344 + writer.flush().await?; 345 + Ok(()) 346 + } 347 + 348 + /// UiHandler used on calls that don't interact (`init` on a schema-driven 349 + /// plugin); refuses any `ask` message. 350 + struct NoopUi; 351 + #[async_trait::async_trait] 352 + impl UiHandler for NoopUi { 353 + async fn handle( 354 + &mut self, 355 + msg: UiMessage, 356 + ) -> Result<crate::ui::UiResponse, crate::ui::UiError> { 357 + Err(crate::ui::UiError::Unsupported { 358 + op: format!("non-interactive op asked for {msg:?}"), 359 + }) 360 + } 361 + }
+26
mlf-plugin-host/src/lib.rs
··· 1 + //! Subprocess plugin protocol + host runtime for MLF plugins. 2 + //! 3 + //! See [`protocol`] for the message shape (line-delimited JSON over 4 + //! stdin/stdout). [`host`] spawns a plugin binary and drives it through 5 + //! ops, including multi-turn `ask`/`answer` flows. [`ui`] defines the 6 + //! handler trait the host uses to service non-terminal `ask` messages 7 + //! (terminal prompts, browser-open requests, etc.). [`plugin`] is the 8 + //! small helper plugin authors can import to read ops and reply on the 9 + //! other side of the pipe. 10 + //! 11 + //! The protocol is shared across plugin kinds (DNS providers today, 12 + //! codegen later); per-kind ops live on top of the generic 13 + //! `hello` / `init` / multi-turn RPC envelope. 14 + 15 + pub mod discovery; 16 + pub mod host; 17 + pub mod plugin; 18 + pub mod protocol; 19 + pub mod ui; 20 + 21 + pub use host::{HostError, PluginHandle}; 22 + pub use protocol::{ 23 + Ack, Answer, Ask, AskKind, CapabilityMissing, Credentials, Hello, HelloData, OptionField, 24 + Request, Response, Terminal, 25 + }; 26 + pub use ui::{DenyInteractiveUi, TerminalUi, UiHandler, UiMessage};
+223
mlf-plugin-host/src/plugin.rs
··· 1 + //! Plugin-side helper: read ops from stdin, write responses to stdout. 2 + //! 3 + //! Plugins can technically implement the protocol directly — it's just 4 + //! line-delimited JSON — but every plugin needs the same small 5 + //! scaffolding around the handshake, the `init` op, and the 6 + //! one-line-per-message write loop. This module provides that so each 7 + //! plugin can focus on its per-op logic. 8 + 9 + use crate::protocol::{ 10 + Ack, Answer, Ask, AskKind, ErrorBody, Hello, HelloData, HelloOp, OkFalse, OkTrue, 11 + PROTOCOL_VERSION, Request, Terminal, 12 + }; 13 + use serde::de::DeserializeOwned; 14 + use serde_json::{Value, json}; 15 + use thiserror::Error; 16 + use tokio::io::{AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader, Stdin, Stdout}; 17 + 18 + #[derive(Error, Debug)] 19 + pub enum PluginError { 20 + #[error("Host closed stdin before completing the exchange")] 21 + HostClosed, 22 + 23 + #[error("Invalid JSON from host: {0}")] 24 + Decode(String), 25 + 26 + #[error("I/O error talking to host: {0}")] 27 + Io(#[from] std::io::Error), 28 + 29 + #[error("Unexpected message from host: {0}")] 30 + Unexpected(String), 31 + } 32 + 33 + /// Plugin server bound to stdin/stdout. Constructed once at startup; 34 + /// call [`Server::handshake`] to send the `hello` response, then loop 35 + /// on [`Server::next_request`] until the host closes stdin. 36 + pub struct Server<W: AsyncWrite + Unpin, R: AsyncBufReadExt + Unpin> { 37 + writer: W, 38 + reader: R, 39 + } 40 + 41 + impl Server<Stdout, BufReader<Stdin>> { 42 + /// Bind to the process's real stdin/stdout. 43 + pub fn stdio() -> Self { 44 + Self { 45 + writer: tokio::io::stdout(), 46 + reader: BufReader::new(tokio::io::stdin()), 47 + } 48 + } 49 + } 50 + 51 + impl<W, R> Server<W, R> 52 + where 53 + W: AsyncWrite + Unpin, 54 + R: AsyncBufReadExt + Unpin, 55 + { 56 + pub fn new(writer: W, reader: R) -> Self { 57 + Self { writer, reader } 58 + } 59 + 60 + /// Read the `hello` op from the host and reply with the plugin's 61 + /// [`HelloData`]. Must be the first call. 62 + pub async fn handshake(&mut self, identity: HelloData) -> Result<(), PluginError> { 63 + let line = self.read_line().await?; 64 + let hello: Hello = 65 + serde_json::from_str(&line).map_err(|e| PluginError::Decode(e.to_string()))?; 66 + if !matches!(hello.op, HelloOp::Hello) { 67 + return Err(PluginError::Unexpected(format!( 68 + "expected hello, got `{}`", 69 + line 70 + ))); 71 + } 72 + if hello.protocol_version != PROTOCOL_VERSION { 73 + return self 74 + .reply_err( 75 + "protocol_version", 76 + &format!( 77 + "host speaks v{}, plugin speaks v{PROTOCOL_VERSION}", 78 + hello.protocol_version 79 + ), 80 + false, 81 + ) 82 + .await; 83 + } 84 + self.reply_ok(serde_json::to_value(identity).expect("HelloData serialises")) 85 + .await 86 + } 87 + 88 + /// Read the next [`Request`] from the host. Returns `None` at EOF. 89 + pub async fn next_request(&mut self) -> Result<Option<Request>, PluginError> { 90 + let line = match self.read_line_opt().await? { 91 + Some(l) => l, 92 + None => return Ok(None), 93 + }; 94 + let req: Request = 95 + serde_json::from_str(&line).map_err(|e| PluginError::Decode(e.to_string()))?; 96 + Ok(Some(req)) 97 + } 98 + 99 + pub async fn reply_ok(&mut self, data: Value) -> Result<(), PluginError> { 100 + let t = Terminal::Ok { ok: OkTrue, data }; 101 + self.write_line(&t).await 102 + } 103 + 104 + pub async fn reply_err( 105 + &mut self, 106 + code: &str, 107 + message: &str, 108 + retryable: bool, 109 + ) -> Result<(), PluginError> { 110 + let t = Terminal::Err { 111 + ok: OkFalse, 112 + error: ErrorBody { 113 + code: code.to_string(), 114 + message: message.to_string(), 115 + retryable, 116 + }, 117 + }; 118 + self.write_line(&t).await 119 + } 120 + 121 + /// Send an `ask` and wait for the host's [`Answer`] or [`Ack`]. 122 + pub async fn ask(&mut self, kind: AskKind) -> Result<Value, PluginError> { 123 + let ask = Ask { kind: kind.clone() }; 124 + self.write_line(&ask).await?; 125 + let line = self.read_line().await?; 126 + // The reply is either an Answer (for prompt/select/confirm) or an 127 + // Ack (for open_url/info). The raw shape disambiguates: 128 + // { "answer": ... } vs { "ack": true }. 129 + let v: Value = 130 + serde_json::from_str(&line).map_err(|e| PluginError::Decode(e.to_string()))?; 131 + if let Ok(Answer { answer }) = serde_json::from_value::<Answer>(v.clone()) { 132 + Ok(answer) 133 + } else if let Ok(Ack { .. }) = serde_json::from_value::<Ack>(v) { 134 + Ok(Value::Null) 135 + } else { 136 + Err(PluginError::Unexpected(format!( 137 + "expected answer/ack, got `{line}`" 138 + ))) 139 + } 140 + } 141 + 142 + /// Convenience: ask a prompt and extract the single answered field. 143 + pub async fn ask_prompt( 144 + &mut self, 145 + name: &str, 146 + label: &str, 147 + secret: bool, 148 + ) -> Result<String, PluginError> { 149 + let kind = AskKind::Prompt { 150 + name: name.to_string(), 151 + label: label.to_string(), 152 + secret, 153 + }; 154 + let answer = self.ask(kind).await?; 155 + let obj = answer 156 + .as_object() 157 + .ok_or_else(|| PluginError::Unexpected("answer is not an object".into()))?; 158 + let value = obj 159 + .get(name) 160 + .and_then(|v| v.as_str()) 161 + .ok_or_else(|| PluginError::Unexpected(format!("no `{name}` in answer")))?; 162 + Ok(value.to_string()) 163 + } 164 + 165 + pub async fn ask_info(&mut self, text: &str) -> Result<(), PluginError> { 166 + let _ = self 167 + .ask(AskKind::Info { 168 + text: text.to_string(), 169 + }) 170 + .await?; 171 + Ok(()) 172 + } 173 + 174 + pub async fn ask_open_url(&mut self, url: &str) -> Result<(), PluginError> { 175 + let _ = self 176 + .ask(AskKind::OpenUrl { 177 + url: url.to_string(), 178 + }) 179 + .await?; 180 + Ok(()) 181 + } 182 + 183 + async fn write_line<T: serde::Serialize>(&mut self, value: &T) -> Result<(), PluginError> { 184 + let mut s = serde_json::to_string(value).expect("plugin message is JSON-serialisable"); 185 + s.push('\n'); 186 + self.writer.write_all(s.as_bytes()).await?; 187 + self.writer.flush().await?; 188 + Ok(()) 189 + } 190 + 191 + async fn read_line(&mut self) -> Result<String, PluginError> { 192 + self.read_line_opt().await?.ok_or(PluginError::HostClosed) 193 + } 194 + 195 + async fn read_line_opt(&mut self) -> Result<Option<String>, PluginError> { 196 + let mut line = String::new(); 197 + let n = self.reader.read_line(&mut line).await?; 198 + if n == 0 { 199 + return Ok(None); 200 + } 201 + let trimmed = line.trim_end_matches(['\n', '\r']); 202 + if trimmed.is_empty() { 203 + return Err(PluginError::Decode("empty line".into())); 204 + } 205 + Ok(Some(trimmed.to_string())) 206 + } 207 + } 208 + 209 + // --------------------------------------------------------------------------- 210 + // Convenience helpers for typed params 211 + // --------------------------------------------------------------------------- 212 + 213 + /// Deserialise a [`Request`]'s params into a typed struct. Returns 214 + /// `Err(PluginError::Decode)` if the shape is wrong for the op. 215 + pub fn params_as<T: DeserializeOwned>(req: &Request) -> Result<T, PluginError> { 216 + serde_json::from_value(req.params.clone()).map_err(|e| PluginError::Decode(e.to_string())) 217 + } 218 + 219 + /// Build the JSON object returned by ops that just want `{"ok": true}` 220 + /// semantics with no data payload. 221 + pub fn empty_data() -> Value { 222 + json!({}) 223 + }
+310
mlf-plugin-host/src/protocol.rs
··· 1 + //! JSON message envelope shared by host and plugin. 2 + //! 3 + //! Every exchange is one line of JSON per message: 4 + //! 5 + //! - Host → Plugin: a [`Request`] — `{"op": "...", ...params}`. 6 + //! - Plugin → Host (non-terminal): an [`Ask`] — `{"ask": "...", ...}` 7 + //! asking the host to render a UI element and reply with an 8 + //! [`Answer`] or [`Ack`]. 9 + //! - Plugin → Host (terminal): a [`Terminal`] — either 10 + //! `{"ok": true, "data": ...}` or `{"ok": false, "error": {...}}`. 11 + 12 + use serde::{Deserialize, Serialize}; 13 + use serde_json::Value; 14 + 15 + // --------------------------------------------------------------------------- 16 + // Handshake 17 + // --------------------------------------------------------------------------- 18 + 19 + /// The handshake request. Always the first message the host sends. 20 + #[derive(Debug, Clone, Serialize, Deserialize)] 21 + pub struct Hello { 22 + pub op: HelloOp, 23 + pub protocol_version: u32, 24 + } 25 + 26 + impl Hello { 27 + pub fn new(protocol_version: u32) -> Self { 28 + Self { 29 + op: HelloOp::Hello, 30 + protocol_version, 31 + } 32 + } 33 + } 34 + 35 + #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] 36 + #[serde(rename_all = "snake_case")] 37 + pub enum HelloOp { 38 + Hello, 39 + } 40 + 41 + /// Plugin's response to `hello`: its identity, protocol version, what 42 + /// ops it can service, and the schema of options (credentials + 43 + /// configuration) the host should collect before calling authed ops. 44 + #[derive(Debug, Clone, Serialize, Deserialize)] 45 + pub struct HelloData { 46 + pub name: String, 47 + pub protocol_version: u32, 48 + #[serde(default)] 49 + pub kind: Option<String>, 50 + #[serde(default)] 51 + pub capabilities: Vec<String>, 52 + #[serde(default)] 53 + pub options_schema: Vec<OptionField>, 54 + } 55 + 56 + /// One field in a plugin's options schema. 57 + #[derive(Debug, Clone, Serialize, Deserialize)] 58 + pub struct OptionField { 59 + pub name: String, 60 + pub label: String, 61 + #[serde(default)] 62 + pub help: Option<String>, 63 + /// `true` → mask on input, store in credentials file. 64 + #[serde(default)] 65 + pub secret: bool, 66 + /// `true` → prompt/flag must produce a value; the host refuses to 67 + /// call authed ops with a missing required field. 68 + #[serde(default)] 69 + pub required: bool, 70 + /// Optional default value for non-secret convenience options. 71 + #[serde(default)] 72 + pub default: Option<Value>, 73 + } 74 + 75 + // --------------------------------------------------------------------------- 76 + // Host → plugin messages 77 + // --------------------------------------------------------------------------- 78 + 79 + /// A request the host sends to the plugin after the handshake. 80 + /// 81 + /// The shape is `{"op": "<name>", ...per-op params}`. Responses to 82 + /// non-terminal `ask` messages use [`Answer`] / [`Ack`] instead — 83 + /// they're not `Request`s and don't carry an `op`. 84 + #[derive(Debug, Clone, Serialize, Deserialize)] 85 + pub struct Request { 86 + pub op: String, 87 + #[serde(flatten)] 88 + pub params: Value, 89 + } 90 + 91 + /// Convenience wrapper for a host request carrying the currently- 92 + /// assembled credentials for the session. The plugin is expected to 93 + /// have consumed this via the `init` op before any authed call. 94 + #[derive(Debug, Clone, Serialize, Deserialize)] 95 + pub struct Credentials(pub Value); 96 + 97 + impl Credentials { 98 + pub fn object(obj: serde_json::Map<String, Value>) -> Self { 99 + Self(Value::Object(obj)) 100 + } 101 + pub fn empty() -> Self { 102 + Self(Value::Object(serde_json::Map::new())) 103 + } 104 + } 105 + 106 + // --------------------------------------------------------------------------- 107 + // Plugin → host messages 108 + // --------------------------------------------------------------------------- 109 + 110 + /// Anything the plugin can send back for a given op: a terminal `ok` 111 + /// / `error`, or a non-terminal `ask` that waits on the host. 112 + #[derive(Debug, Clone, Serialize, Deserialize)] 113 + #[serde(untagged)] 114 + pub enum Response { 115 + Terminal(Terminal), 116 + Ask(Ask), 117 + } 118 + 119 + /// Terminal plugin response. 120 + #[derive(Debug, Clone, Serialize, Deserialize)] 121 + #[serde(untagged)] 122 + pub enum Terminal { 123 + Ok { 124 + ok: OkTrue, 125 + #[serde(default)] 126 + data: Value, 127 + }, 128 + Err { 129 + ok: OkFalse, 130 + error: ErrorBody, 131 + }, 132 + } 133 + 134 + #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] 135 + #[serde(try_from = "bool", into = "bool")] 136 + pub struct OkTrue; 137 + impl TryFrom<bool> for OkTrue { 138 + type Error = &'static str; 139 + fn try_from(v: bool) -> Result<Self, Self::Error> { 140 + if v { Ok(Self) } else { Err("expected true") } 141 + } 142 + } 143 + impl From<OkTrue> for bool { 144 + fn from(_: OkTrue) -> Self { 145 + true 146 + } 147 + } 148 + 149 + #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] 150 + #[serde(try_from = "bool", into = "bool")] 151 + pub struct OkFalse; 152 + impl TryFrom<bool> for OkFalse { 153 + type Error = &'static str; 154 + fn try_from(v: bool) -> Result<Self, Self::Error> { 155 + if !v { Ok(Self) } else { Err("expected false") } 156 + } 157 + } 158 + impl From<OkFalse> for bool { 159 + fn from(_: OkFalse) -> Self { 160 + false 161 + } 162 + } 163 + 164 + #[derive(Debug, Clone, Serialize, Deserialize)] 165 + pub struct ErrorBody { 166 + pub code: String, 167 + pub message: String, 168 + #[serde(default)] 169 + pub retryable: bool, 170 + } 171 + 172 + /// Non-terminal "I need something from the human" message. 173 + #[derive(Debug, Clone, Serialize, Deserialize)] 174 + pub struct Ask { 175 + #[serde(flatten)] 176 + pub kind: AskKind, 177 + } 178 + 179 + #[derive(Debug, Clone, Serialize, Deserialize)] 180 + #[serde(tag = "ask", rename_all = "snake_case")] 181 + pub enum AskKind { 182 + Prompt { 183 + name: String, 184 + label: String, 185 + #[serde(default)] 186 + secret: bool, 187 + }, 188 + Select { 189 + name: String, 190 + label: String, 191 + options: Vec<String>, 192 + }, 193 + Confirm { 194 + name: String, 195 + label: String, 196 + }, 197 + OpenUrl { 198 + url: String, 199 + }, 200 + Info { 201 + text: String, 202 + }, 203 + } 204 + 205 + /// Host's response to a [`AskKind::Prompt`] / [`AskKind::Select`] / 206 + /// [`AskKind::Confirm`]. Keyed by the `name` of the ask, mirroring the 207 + /// plugin's field names. 208 + #[derive(Debug, Clone, Serialize, Deserialize)] 209 + pub struct Answer { 210 + pub answer: Value, 211 + } 212 + 213 + /// Host's acknowledgement for a [`AskKind::OpenUrl`] / [`AskKind::Info`]. 214 + #[derive(Debug, Clone, Serialize, Deserialize)] 215 + pub struct Ack { 216 + pub ack: bool, 217 + } 218 + 219 + /// A capability the host needs that the plugin didn't advertise. 220 + #[derive(Debug, Clone, PartialEq, Eq)] 221 + pub struct CapabilityMissing { 222 + pub capability: String, 223 + } 224 + 225 + /// Current protocol version understood by this crate. 226 + pub const PROTOCOL_VERSION: u32 = 1; 227 + 228 + #[cfg(test)] 229 + mod tests { 230 + use super::*; 231 + use serde_json::json; 232 + 233 + #[test] 234 + fn hello_serialises_as_expected() { 235 + let m = Hello::new(1); 236 + let got = serde_json::to_value(&m).unwrap(); 237 + assert_eq!(got, json!({"op": "hello", "protocol_version": 1})); 238 + } 239 + 240 + #[test] 241 + fn ok_and_err_are_disambiguated_by_untagged_ok_bool() { 242 + let ok_json = json!({"ok": true, "data": {"x": 1}}); 243 + let err_json = json!({"ok": false, "error": {"code": "x", "message": "y"}}); 244 + let ok: Terminal = serde_json::from_value(ok_json).unwrap(); 245 + let err: Terminal = serde_json::from_value(err_json).unwrap(); 246 + assert!(matches!(ok, Terminal::Ok { .. })); 247 + assert!(matches!(err, Terminal::Err { .. })); 248 + } 249 + 250 + #[test] 251 + fn ask_prompt_round_trips() { 252 + let a = Ask { 253 + kind: AskKind::Prompt { 254 + name: "api_token".into(), 255 + label: "API token".into(), 256 + secret: true, 257 + }, 258 + }; 259 + let value = serde_json::to_value(&a).unwrap(); 260 + assert_eq!( 261 + value, 262 + json!({ 263 + "ask": "prompt", 264 + "name": "api_token", 265 + "label": "API token", 266 + "secret": true, 267 + }) 268 + ); 269 + let back: Ask = serde_json::from_value(value).unwrap(); 270 + match back.kind { 271 + AskKind::Prompt { 272 + name, 273 + label, 274 + secret, 275 + } => { 276 + assert_eq!(name, "api_token"); 277 + assert_eq!(label, "API token"); 278 + assert!(secret); 279 + } 280 + _ => panic!("expected prompt"), 281 + } 282 + } 283 + 284 + #[test] 285 + fn response_dispatches_ask_vs_terminal() { 286 + let ask: Response = serde_json::from_value(json!({"ask": "info", "text": "hi"})).unwrap(); 287 + assert!(matches!(ask, Response::Ask(_))); 288 + let ok: Response = serde_json::from_value(json!({"ok": true, "data": {}})).unwrap(); 289 + assert!(matches!(ok, Response::Terminal(Terminal::Ok { .. }))); 290 + } 291 + 292 + #[test] 293 + fn hello_data_parses_capabilities_and_schema() { 294 + let v = json!({ 295 + "name": "cloudflare", 296 + "protocol_version": 1, 297 + "kind": "dns", 298 + "capabilities": ["list_txt", "upsert_txt"], 299 + "options_schema": [ 300 + {"name": "api_token", "label": "API token", "secret": true, "required": true} 301 + ] 302 + }); 303 + let hd: HelloData = serde_json::from_value(v).unwrap(); 304 + assert_eq!(hd.name, "cloudflare"); 305 + assert_eq!(hd.capabilities, vec!["list_txt", "upsert_txt"]); 306 + assert_eq!(hd.options_schema.len(), 1); 307 + assert!(hd.options_schema[0].secret); 308 + assert!(hd.options_schema[0].required); 309 + } 310 + }
+190
mlf-plugin-host/src/ui.rs
··· 1 + //! UI handlers for multi-turn `ask` messages. 2 + //! 3 + //! The host always runs plugins through a [`UiHandler`]. Typical 4 + //! implementations: 5 + //! 6 + //! - [`TerminalUi`] renders prompts in the terminal (using dialoguer). 7 + //! - [`DenyInteractiveUi`] refuses every `ask` with a static error — 8 + //! the choice for `--non-interactive` runs. 9 + //! - Tests provide scripted handlers via inline impls of the trait. 10 + 11 + use async_trait::async_trait; 12 + use serde_json::{Value, json}; 13 + use std::fmt; 14 + use thiserror::Error; 15 + 16 + #[derive(Error, Debug)] 17 + pub enum UiError { 18 + #[error("Non-interactive mode: plugin asked for `{op}` but prompts are disabled")] 19 + NonInteractive { op: String }, 20 + 21 + #[error("Unsupported UI operation: {op}")] 22 + Unsupported { op: String }, 23 + 24 + #[error("UI I/O error: {0}")] 25 + Io(String), 26 + } 27 + 28 + /// A host-side view of a `Ask` message. Mirrors [`crate::protocol::AskKind`]. 29 + #[derive(Debug, Clone)] 30 + pub enum UiMessage { 31 + Prompt { 32 + name: String, 33 + label: String, 34 + secret: bool, 35 + }, 36 + Select { 37 + name: String, 38 + label: String, 39 + options: Vec<String>, 40 + }, 41 + Confirm { 42 + name: String, 43 + label: String, 44 + }, 45 + OpenUrl { 46 + url: String, 47 + }, 48 + Info { 49 + text: String, 50 + }, 51 + } 52 + 53 + /// Response the handler produces for the host to forward on. 54 + #[derive(Debug, Clone)] 55 + pub enum UiResponse { 56 + /// Reply to Prompt/Select/Confirm — the value keyed by the ask's `name`. 57 + Answer(Value), 58 + /// Acknowledgement for OpenUrl/Info. 59 + Ack, 60 + } 61 + 62 + impl fmt::Display for UiResponse { 63 + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 64 + match self { 65 + UiResponse::Answer(v) => write!(f, "Answer({v})"), 66 + UiResponse::Ack => write!(f, "Ack"), 67 + } 68 + } 69 + } 70 + 71 + #[async_trait] 72 + pub trait UiHandler: Send { 73 + async fn handle(&mut self, msg: UiMessage) -> Result<UiResponse, UiError>; 74 + } 75 + 76 + // --------------------------------------------------------------------------- 77 + // Implementations 78 + // --------------------------------------------------------------------------- 79 + 80 + /// Handler for `--non-interactive`: fails fast if the plugin tries to 81 + /// prompt. The op in the error points at which `ask` couldn't be serviced. 82 + pub struct DenyInteractiveUi; 83 + 84 + #[async_trait] 85 + impl UiHandler for DenyInteractiveUi { 86 + async fn handle(&mut self, msg: UiMessage) -> Result<UiResponse, UiError> { 87 + Err(UiError::NonInteractive { op: describe(&msg) }) 88 + } 89 + } 90 + 91 + /// Terminal-interactive handler: masked password prompts for `secret` 92 + /// prompts, plain text prompts otherwise, URL-open + message-info 93 + /// rendered as printed lines. 94 + pub struct TerminalUi; 95 + 96 + #[async_trait] 97 + impl UiHandler for TerminalUi { 98 + async fn handle(&mut self, msg: UiMessage) -> Result<UiResponse, UiError> { 99 + // dialoguer is blocking; trampoline through spawn_blocking so we 100 + // don't stall the runtime on stdin reads. 101 + let msg = msg.clone(); 102 + let resp = tokio::task::spawn_blocking(move || blocking_handle(msg)) 103 + .await 104 + .map_err(|e| UiError::Io(e.to_string()))??; 105 + Ok(resp) 106 + } 107 + } 108 + 109 + fn blocking_handle(msg: UiMessage) -> Result<UiResponse, UiError> { 110 + use dialoguer::{Input, Password}; 111 + use std::io::Write; 112 + let theme = dialoguer::theme::ColorfulTheme::default(); 113 + match msg { 114 + UiMessage::Prompt { 115 + label, 116 + secret: true, 117 + .. 118 + } => { 119 + let value: String = Password::with_theme(&theme) 120 + .with_prompt(label) 121 + .interact() 122 + .map_err(|e| UiError::Io(e.to_string()))?; 123 + Ok(UiResponse::Answer(Value::String(value))) 124 + } 125 + UiMessage::Prompt { 126 + label, 127 + secret: false, 128 + .. 129 + } => { 130 + let value: String = Input::with_theme(&theme) 131 + .with_prompt(label) 132 + .interact_text() 133 + .map_err(|e| UiError::Io(e.to_string()))?; 134 + Ok(UiResponse::Answer(Value::String(value))) 135 + } 136 + UiMessage::Select { label, options, .. } => { 137 + // Keep it dependency-light: render numbered options, read stdin. 138 + println!("{label}"); 139 + for (i, opt) in options.iter().enumerate() { 140 + println!(" [{i}] {opt}"); 141 + } 142 + print!("Choice: "); 143 + std::io::stdout() 144 + .flush() 145 + .map_err(|e| UiError::Io(e.to_string()))?; 146 + let mut buf = String::new(); 147 + std::io::stdin() 148 + .read_line(&mut buf) 149 + .map_err(|e| UiError::Io(e.to_string()))?; 150 + let idx: usize = buf 151 + .trim() 152 + .parse() 153 + .map_err(|_| UiError::Io("invalid selection".into()))?; 154 + let choice = options 155 + .get(idx) 156 + .ok_or_else(|| UiError::Io("selection out of range".into()))?; 157 + Ok(UiResponse::Answer(Value::String(choice.clone()))) 158 + } 159 + UiMessage::Confirm { label, .. } => { 160 + print!("{label} [y/N]: "); 161 + std::io::stdout() 162 + .flush() 163 + .map_err(|e| UiError::Io(e.to_string()))?; 164 + let mut buf = String::new(); 165 + std::io::stdin() 166 + .read_line(&mut buf) 167 + .map_err(|e| UiError::Io(e.to_string()))?; 168 + let yes = matches!(buf.trim().to_lowercase().as_str(), "y" | "yes"); 169 + Ok(UiResponse::Answer(json!(yes))) 170 + } 171 + UiMessage::OpenUrl { url } => { 172 + println!("Open in browser: {url}"); 173 + Ok(UiResponse::Ack) 174 + } 175 + UiMessage::Info { text } => { 176 + println!("{text}"); 177 + Ok(UiResponse::Ack) 178 + } 179 + } 180 + } 181 + 182 + fn describe(msg: &UiMessage) -> String { 183 + match msg { 184 + UiMessage::Prompt { name, .. } => format!("prompt:{name}"), 185 + UiMessage::Select { name, .. } => format!("select:{name}"), 186 + UiMessage::Confirm { name, .. } => format!("confirm:{name}"), 187 + UiMessage::OpenUrl { url } => format!("open_url:{url}"), 188 + UiMessage::Info { text } => format!("info:{text}"), 189 + } 190 + }
+333
mlf-plugin-host/tests/protocol_e2e.rs
··· 1 + //! End-to-end protocol tests. 2 + //! 3 + //! Each test wires a plugin's [`Server`] and the host's [`PluginHandle`] 4 + //! directly to each other over two `tokio::io::duplex` pipes — no 5 + //! subprocess, no real I/O. The plugin side is a small async task that 6 + //! implements whatever behaviour the test needs. 7 + 8 + use mlf_plugin_host::host::PluginHandle; 9 + use mlf_plugin_host::plugin::{Server, empty_data, params_as}; 10 + use mlf_plugin_host::protocol::{AskKind, HelloData, OptionField, PROTOCOL_VERSION}; 11 + use mlf_plugin_host::ui::{DenyInteractiveUi, UiHandler, UiMessage, UiResponse}; 12 + use serde_json::{Value, json}; 13 + use std::sync::{Arc, Mutex}; 14 + use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; 15 + 16 + fn identity() -> HelloData { 17 + HelloData { 18 + name: "testplugin".into(), 19 + protocol_version: PROTOCOL_VERSION, 20 + kind: Some("dns".into()), 21 + capabilities: vec!["list_txt".into(), "upsert_txt".into(), "delete_txt".into()], 22 + options_schema: vec![OptionField { 23 + name: "api_token".into(), 24 + label: "API token".into(), 25 + help: None, 26 + secret: true, 27 + required: true, 28 + default: None, 29 + }], 30 + } 31 + } 32 + 33 + /// Spawn a plugin task, return the host-side handle with the handshake complete. 34 + async fn spawn_paired<F, Fut>( 35 + run_plugin: F, 36 + ) -> PluginHandle< 37 + tokio::io::WriteHalf<tokio::io::DuplexStream>, 38 + BufReader<tokio::io::ReadHalf<tokio::io::DuplexStream>>, 39 + > 40 + where 41 + F: FnOnce( 42 + Server< 43 + tokio::io::WriteHalf<tokio::io::DuplexStream>, 44 + BufReader<tokio::io::ReadHalf<tokio::io::DuplexStream>>, 45 + >, 46 + ) -> Fut 47 + + Send 48 + + 'static, 49 + Fut: std::future::Future<Output = ()> + Send + 'static, 50 + { 51 + // Host↔plugin pipes, each direction one duplex. 52 + let (host_to_plugin, plugin_from_host) = tokio::io::duplex(8192); 53 + let (plugin_to_host, host_from_plugin) = tokio::io::duplex(8192); 54 + 55 + // Plugin side. 56 + let (plugin_in_r, _plugin_in_w) = tokio::io::split(plugin_from_host); 57 + let (_plugin_out_r, plugin_out_w) = tokio::io::split(plugin_to_host); 58 + let plugin_server = Server::new(plugin_out_w, BufReader::new(plugin_in_r)); 59 + tokio::spawn(run_plugin(plugin_server)); 60 + 61 + // Host side. 62 + let (_host_in_r, host_in_w) = tokio::io::split(host_to_plugin); 63 + let (host_out_r, _host_out_w) = tokio::io::split(host_from_plugin); 64 + PluginHandle::from_streams(host_in_w, BufReader::new(host_out_r), None) 65 + .await 66 + .expect("handshake") 67 + } 68 + 69 + // --------------------------------------------------------------------------- 70 + // Happy path 71 + // --------------------------------------------------------------------------- 72 + 73 + #[tokio::test] 74 + async fn handshake_exposes_plugin_identity() { 75 + let handle = spawn_paired(|mut server| async move { 76 + server.handshake(identity()).await.unwrap(); 77 + }) 78 + .await; 79 + assert_eq!(handle.hello().name, "testplugin"); 80 + assert!( 81 + handle 82 + .hello() 83 + .capabilities 84 + .contains(&"list_txt".to_string()) 85 + ); 86 + assert_eq!(handle.hello().options_schema.len(), 1); 87 + assert!(handle.hello().options_schema[0].secret); 88 + } 89 + 90 + #[tokio::test] 91 + async fn list_txt_roundtrips() { 92 + let mut handle = spawn_paired(|mut server| async move { 93 + server.handshake(identity()).await.unwrap(); 94 + // Read the list_txt op and reply with a canned set of records. 95 + let req = server.next_request().await.unwrap().unwrap(); 96 + assert_eq!(req.op, "list_txt"); 97 + assert_eq!(req.params["name"], "_lexicon.foo.example.com"); 98 + server 99 + .reply_ok(json!({ 100 + "records": [ 101 + {"id": "r1", "value": "did=did:plc:abc"} 102 + ] 103 + })) 104 + .await 105 + .unwrap(); 106 + }) 107 + .await; 108 + 109 + #[derive(serde::Deserialize)] 110 + struct Resp { 111 + records: Vec<R>, 112 + } 113 + #[derive(serde::Deserialize)] 114 + struct R { 115 + id: String, 116 + value: String, 117 + } 118 + let resp: Resp = handle 119 + .call( 120 + "list_txt", 121 + json!({"name": "_lexicon.foo.example.com"}), 122 + &mut DenyInteractiveUi, 123 + ) 124 + .await 125 + .unwrap(); 126 + assert_eq!(resp.records.len(), 1); 127 + assert_eq!(resp.records[0].id, "r1"); 128 + assert_eq!(resp.records[0].value, "did=did:plc:abc"); 129 + } 130 + 131 + // --------------------------------------------------------------------------- 132 + // Capability gating 133 + // --------------------------------------------------------------------------- 134 + 135 + #[tokio::test] 136 + async fn call_fails_when_capability_not_advertised() { 137 + let mut handle = spawn_paired(|mut server| async move { 138 + server.handshake(identity()).await.unwrap(); 139 + // Shouldn't get any request — host refuses before sending. 140 + }) 141 + .await; 142 + 143 + let err = handle 144 + .call::<Value>( 145 + "resolve_zone", 146 + json!({"domain": "example.com"}), 147 + &mut DenyInteractiveUi, 148 + ) 149 + .await 150 + .unwrap_err(); 151 + assert!(matches!( 152 + err, 153 + mlf_plugin_host::HostError::MissingCapability(ref cap) if cap == "resolve_zone" 154 + )); 155 + } 156 + 157 + // --------------------------------------------------------------------------- 158 + // Multi-turn RPC 159 + // --------------------------------------------------------------------------- 160 + 161 + #[tokio::test] 162 + async fn multi_turn_prompt_then_terminal_ok() { 163 + let mut handle = spawn_paired(|mut server| async move { 164 + server.handshake(identity()).await.unwrap(); 165 + let req = server.next_request().await.unwrap().unwrap(); 166 + assert_eq!(req.op, "login"); 167 + 168 + // One prompt, one info, then terminal. 169 + let token = server.ask_prompt("api_token", "Token", true).await.unwrap(); 170 + assert_eq!(token, "secret-abc"); 171 + server.ask_info("Validating...").await.unwrap(); 172 + server 173 + .reply_ok(json!({ 174 + "credentials": {"api_token": token}, 175 + "display_name": "scripted-user", 176 + })) 177 + .await 178 + .unwrap(); 179 + }) 180 + .await; 181 + 182 + // Scripted UI handler that answers the prompt and acks info messages. 183 + struct ScriptedUi { 184 + got_prompt: bool, 185 + got_info: bool, 186 + } 187 + #[async_trait::async_trait] 188 + impl UiHandler for ScriptedUi { 189 + async fn handle( 190 + &mut self, 191 + msg: UiMessage, 192 + ) -> Result<UiResponse, mlf_plugin_host::ui::UiError> { 193 + match msg { 194 + UiMessage::Prompt { name, .. } => { 195 + assert_eq!(name, "api_token"); 196 + self.got_prompt = true; 197 + Ok(UiResponse::Answer(json!("secret-abc"))) 198 + } 199 + UiMessage::Info { text } => { 200 + assert_eq!(text, "Validating..."); 201 + self.got_info = true; 202 + Ok(UiResponse::Ack) 203 + } 204 + other => panic!("unexpected ui: {other:?}"), 205 + } 206 + } 207 + } 208 + let mut ui = ScriptedUi { 209 + got_prompt: false, 210 + got_info: false, 211 + }; 212 + 213 + let resp: Value = handle.call("login", json!({}), &mut ui).await.unwrap(); 214 + assert_eq!(resp["display_name"], "scripted-user"); 215 + assert_eq!(resp["credentials"]["api_token"], "secret-abc"); 216 + assert!(ui.got_prompt); 217 + assert!(ui.got_info); 218 + } 219 + 220 + #[tokio::test] 221 + async fn non_interactive_refuses_ask() { 222 + let mut handle = spawn_paired(|mut server| async move { 223 + server.handshake(identity()).await.unwrap(); 224 + let _ = server.next_request().await.unwrap().unwrap(); 225 + // Send an ask; the host's DenyInteractiveUi will reject, so we 226 + // won't actually get back an answer/ack. Just send and exit. 227 + let _ = server 228 + .ask(AskKind::Prompt { 229 + name: "x".into(), 230 + label: "x".into(), 231 + secret: false, 232 + }) 233 + .await; 234 + }) 235 + .await; 236 + 237 + let err = handle 238 + .call::<Value>("login", json!({}), &mut DenyInteractiveUi) 239 + .await 240 + .unwrap_err(); 241 + match err { 242 + mlf_plugin_host::HostError::Ui(mlf_plugin_host::ui::UiError::NonInteractive { op }) => { 243 + assert!(op.starts_with("prompt:"), "got {op}"); 244 + } 245 + other => panic!("expected NonInteractive, got {other:?}"), 246 + } 247 + } 248 + 249 + // --------------------------------------------------------------------------- 250 + // Error paths 251 + // --------------------------------------------------------------------------- 252 + 253 + #[tokio::test] 254 + async fn terminal_err_is_plugin_error() { 255 + let mut handle = spawn_paired(|mut server| async move { 256 + server.handshake(identity()).await.unwrap(); 257 + let _ = server.next_request().await.unwrap().unwrap(); 258 + server 259 + .reply_err("auth_required", "no token", false) 260 + .await 261 + .unwrap(); 262 + }) 263 + .await; 264 + 265 + let err = handle 266 + .call::<Value>("upsert_txt", json!({}), &mut DenyInteractiveUi) 267 + .await 268 + .unwrap_err(); 269 + match err { 270 + mlf_plugin_host::HostError::PluginError { 271 + code, 272 + message, 273 + retryable, 274 + } => { 275 + assert_eq!(code, "auth_required"); 276 + assert_eq!(message, "no token"); 277 + assert!(!retryable); 278 + } 279 + other => panic!("expected PluginError, got {other:?}"), 280 + } 281 + } 282 + 283 + #[tokio::test] 284 + async fn early_plugin_exit_surfaces_error() { 285 + let handle_fut = spawn_paired(|mut server| async move { 286 + server.handshake(identity()).await.unwrap(); 287 + // Exit without responding to any op. 288 + drop(server); 289 + }); 290 + let mut handle = handle_fut.await; 291 + let err = handle 292 + .call::<Value>("list_txt", json!({"name": "x"}), &mut DenyInteractiveUi) 293 + .await 294 + .unwrap_err(); 295 + // Either the read sees EOF (EarlyExit) or the write fails first 296 + // (Io BrokenPipe) — both mean "plugin went away without finishing 297 + // this op." The host surfaces whichever happens first. 298 + match err { 299 + mlf_plugin_host::HostError::EarlyExit { .. } => {} 300 + mlf_plugin_host::HostError::Io(ref e) if e.kind() == std::io::ErrorKind::BrokenPipe => {} 301 + other => panic!("expected EarlyExit/BrokenPipe, got {other:?}"), 302 + } 303 + } 304 + 305 + // --------------------------------------------------------------------------- 306 + // Init 307 + // --------------------------------------------------------------------------- 308 + 309 + #[tokio::test] 310 + async fn init_forwards_credentials_to_plugin() { 311 + let seen = Arc::new(Mutex::new(None::<Value>)); 312 + let seen_plugin = seen.clone(); 313 + let mut handle = spawn_paired(move |mut server| async move { 314 + server.handshake(identity()).await.unwrap(); 315 + let req = server.next_request().await.unwrap().unwrap(); 316 + assert_eq!(req.op, "init"); 317 + let params: Value = params_as(&req).unwrap(); 318 + *seen_plugin.lock().unwrap() = Some(params.clone()); 319 + server.reply_ok(empty_data()).await.unwrap(); 320 + }) 321 + .await; 322 + 323 + handle 324 + .init(json!({"api_token": "TKN"})) 325 + .await 326 + .expect("init"); 327 + let got = seen.lock().unwrap().clone().unwrap(); 328 + assert_eq!(got["credentials"]["api_token"], "TKN"); 329 + } 330 + 331 + // Suppress unused import warnings when features change. 332 + #[allow(dead_code)] 333 + async fn _unused_use_keepers(_r: impl AsyncBufReadExt + Unpin, _w: impl AsyncWriteExt + Unpin) {}