MQTT 3.1 and 5 in OCaml using Eio
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fmt

+97 -56
+1 -1
.ocamlformat
··· 1 - version = 0.28.1 1 + version = 0.29.0 2 2 profile = default
+2 -2
dune-project
··· 19 19 (bytesrw (>= 0.1)) 20 20 (bytesrw-eio (>= 0.1)) 21 21 conpool 22 - ca-certs 22 + nox-ca-certs 23 23 (cmdliner (>= 1.2)) 24 - tls 24 + nox-tls 25 25 xdge 26 26 tomlt 27 27 (logs (>= 0.7))
+13 -1
lib/cmd/dune
··· 1 1 (library 2 2 (name mqtte_cmd) 3 3 (public_name mqtte.cmd) 4 - (libraries mqtte mqtte_eio xdge tomlt tomlt.eio eio conpool ca-certs tls cmdliner logs fmt)) 4 + (libraries 5 + mqtte 6 + mqtte_eio 7 + xdge 8 + tomlt 9 + tomlt.eio 10 + eio 11 + conpool 12 + nox-ca-certs 13 + nox-tls 14 + cmdliner 15 + logs 16 + fmt))
+52 -30
lib/cmd/mqtte_cmd.ml
··· 62 62 Tomlt.( 63 63 Table.( 64 64 obj 65 - (fun host port tls insecure client_id clean_session keep_alive 66 - username password protocol_version -> 65 + (fun 66 + host 67 + port 68 + tls 69 + insecure 70 + client_id 71 + clean_session 72 + keep_alive 73 + username 74 + password 75 + protocol_version 76 + -> 67 77 { 68 78 host; 69 79 port; ··· 106 116 mqtt = Option.value ~default:empty_mqtt_config mqtt; 107 117 pool = Option.value ~default:empty_pool_config pool; 108 118 }) 109 - |> opt_mem "mqtt" mqtt_codec 110 - ~enc:(fun c -> 111 - if c.mqtt = empty_mqtt_config then None else Some c.mqtt) 112 - |> opt_mem "pool" pool_codec 113 - ~enc:(fun c -> 114 - if c.pool = empty_pool_config then None else Some c.pool) 119 + |> opt_mem "mqtt" mqtt_codec ~enc:(fun c -> 120 + if c.mqtt = empty_mqtt_config then None else Some c.mqtt) 121 + |> opt_mem "pool" pool_codec ~enc:(fun c -> 122 + if c.pool = empty_pool_config then None else Some c.pool) 115 123 |> finish)) 116 124 117 125 let load_from_path path = ··· 193 201 "Skip TLS certificate verification (allows expired/self-signed \ 194 202 certificates)." 195 203 in 196 - Term.(const (fun flag -> flag || config_default) $ Arg.(value & flag & info [ "insecure" ] ~doc)) 204 + Term.( 205 + const (fun flag -> flag || config_default) 206 + $ Arg.(value & flag & info [ "insecure" ] ~doc)) 197 207 198 208 let client_id_term ~file_config = 199 209 let default = file_config.Config_file.mqtt.client_id in ··· 201 211 "MQTT client identifier. If not specified, a random ID is generated." 202 212 in 203 213 let env = Cmd.Env.info "MQTT_CLIENT_ID" ~doc in 204 - Arg.(value & opt (some string) default & info [ "client-id" ] ~docv:"ID" ~doc ~env) 214 + Arg.( 215 + value 216 + & opt (some string) default 217 + & info [ "client-id" ] ~docv:"ID" ~doc ~env) 205 218 206 219 let clean_session_term ~file_config = 207 - let config_default = or_else file_config.Config_file.mqtt.clean_session false in 220 + let config_default = 221 + or_else file_config.Config_file.mqtt.clean_session false 222 + in 208 223 let doc = 209 224 "Start with a clean session (discard any previous session state)." 210 225 in 211 - Term.(const (fun flag -> flag || config_default) $ Arg.(value & flag & info [ "clean-session" ] ~doc)) 226 + Term.( 227 + const (fun flag -> flag || config_default) 228 + $ Arg.(value & flag & info [ "clean-session" ] ~doc)) 212 229 213 230 let keep_alive_term ~file_config = 214 231 let default = or_else file_config.Config_file.mqtt.keep_alive 60 in ··· 223 240 in 224 241 let doc = "MQTT protocol version to use." in 225 242 let versions = [ ("3.1.1", `V3_1_1); ("5", `V5_0); ("5.0", `V5_0) ] in 226 - Arg.(value & opt (enum versions) default & info [ "mqtt-version" ] ~docv:"VERSION" ~doc) 243 + Arg.( 244 + value 245 + & opt (enum versions) default 246 + & info [ "mqtt-version" ] ~docv:"VERSION" ~doc) 227 247 228 248 let username_term ~file_config = 229 249 let default = file_config.Config_file.mqtt.username in 230 250 let doc = "MQTT username for authentication." in 231 251 let env = Cmd.Env.info "MQTT_USER" ~doc in 232 - Arg.(value & opt (some string) default & info [ "u"; "username" ] ~docv:"USER" ~doc ~env) 252 + Arg.( 253 + value 254 + & opt (some string) default 255 + & info [ "u"; "username" ] ~docv:"USER" ~doc ~env) 233 256 234 257 let password_term ~file_config = 235 258 let default = file_config.Config_file.mqtt.password in 236 259 let doc = "MQTT password for authentication." in 237 260 let env = Cmd.Env.info "MQTT_PASSWORD" ~doc in 238 - Arg.(value & opt (some string) default & info [ "password" ] ~docv:"PASS" ~doc ~env) 261 + Arg.( 262 + value 263 + & opt (some string) default 264 + & info [ "password" ] ~docv:"PASS" ~doc ~env) 239 265 240 266 (** {1 Combined Terms} *) 241 267 ··· 245 271 { host; port; tls; insecure } 246 272 in 247 273 Term.( 248 - const make 249 - $ host_term ~file_config 250 - $ port_term ~file_config 251 - $ tls_term ~file_config 252 - $ insecure_term ~file_config) 274 + const make $ host_term ~file_config $ port_term ~file_config 275 + $ tls_term ~file_config $ insecure_term ~file_config) 253 276 254 277 let config_term ~file_config = 255 278 let make client_id clean_session keep_alive version username password = ··· 275 298 $ clean_session_term ~file_config 276 299 $ keep_alive_term ~file_config 277 300 $ protocol_version_term ~file_config 278 - $ username_term ~file_config 279 - $ password_term ~file_config) 301 + $ username_term ~file_config $ password_term ~file_config) 280 302 281 303 let pool_config_term ~file_config = 282 304 let default_max = ··· 301 323 in 302 324 Term.( 303 325 const (fun max_connections_per_endpoint max_idle_time -> 304 - Conpool.Config.make ~max_connections_per_endpoint ~max_idle_time ()) 326 + Conpool.Config.v ~max_connections_per_endpoint ~max_idle_time ()) 305 327 $ max_size $ idle_timeout) 306 328 307 329 (** {1 Main Term} *) ··· 346 368 | Error (`Msg msg) -> failwith ("Failed to create TLS config: " ^ msg) 347 369 else None 348 370 in 349 - Conpool.create_basic ~sw ~net ~clock ?tls:tls_config ~config:pool_config () 371 + Conpool.basic ~sw ~net ~clock ?tls:tls_config ~config:pool_config () 350 372 351 - let endpoint conn = Conpool.Endpoint.make ~host:conn.host ~port:conn.port 373 + let endpoint conn = Conpool.Endpoint.v ~host:conn.host ~port:conn.port 352 374 353 375 (** {1 XDG Access} *) 354 376 ··· 394 416 `P "The following environment variables affect the program:"; 395 417 `I 396 418 ( "MQTT_HOST", 397 - "MQTT broker hostname or IP address. Overrides config file, \ 398 - overridden by --host." ); 419 + "MQTT broker hostname or IP address. Overrides config file, overridden \ 420 + by --host." ); 399 421 `I 400 - ("MQTT_PORT", "MQTT broker port. Overrides config file, overridden by --port."); 422 + ( "MQTT_PORT", 423 + "MQTT broker port. Overrides config file, overridden by --port." ); 401 424 `I ("MQTT_TLS", "Enable TLS if set to any value. Overridden by --tls."); 402 425 `I 403 426 ( "MQTT_CLIENT_ID", ··· 510 533 "Creates a default configuration file at ~/.config/%s/config.toml \ 511 534 with commented examples of all available options." 512 535 app_name); 513 - `P 514 - "If a configuration file already exists, use --force to overwrite it."; 536 + `P "If a configuration file already exists, use --force to overwrite it."; 515 537 ] 516 538 in 517 539 let info = Cmd.info "init-config" ~doc ~man in
+21 -14
lib/cmd/mqtte_cmd.mli
··· 45 45 (** {1 Connection Options} *) 46 46 47 47 type connection = { host : string; port : int; tls : bool; insecure : bool } 48 - (** Connection parameters parsed from command line, environment, or config file. *) 48 + (** Connection parameters parsed from command line, environment, or config file. 49 + *) 49 50 50 51 (** {1 Combined Configuration} *) 51 52 ··· 63 64 XDG context for accessing application directories. *) 64 65 65 66 val term : 66 - app_name:string -> fs:Eio.Fs.dir_ty Eio.Path.t -> unit -> parsed Cmdliner.Term.t 67 + app_name:string -> 68 + fs:Eio.Fs.dir_ty Eio.Path.t -> 69 + unit -> 70 + parsed Cmdliner.Term.t 67 71 (** [term ~app_name ~fs ()] creates a Cmdliner term that parses MQTT 68 72 configuration from command line arguments, environment variables, and TOML 69 73 config files. ··· 176 180 (** Empty configuration. *) 177 181 178 182 val codec : t Tomlt.t 179 - (** Tomlt codec for the full config file (both [[mqtt]] and [[pool]] sections). 180 - Use this when loading a standalone config.toml, or use {!mqtt_codec} and 181 - {!pool_codec} separately to compose with application-specific sections. *) 183 + (** Tomlt codec for the full config file (both [[mqtt]] and [[pool]] 184 + sections). Use this when loading a standalone config.toml, or use 185 + {!mqtt_codec} and {!pool_codec} separately to compose with 186 + application-specific sections. *) 182 187 183 188 val load : Xdge.t -> t option 184 189 (** [load xdg] attempts to load [config.toml] from the XDG config directory. ··· 271 276 272 277 Example usage: 273 278 {[ 274 - let main_cmd = Cmd.group info [ 275 - your_main_term; 276 - Mqtte_cmd.init_config_cmd ~app_name ~fs; 277 - Mqtte_cmd.show_config_cmd ~app_name ~fs; 278 - ] 279 + let main_cmd = 280 + Cmd.group info 281 + [ 282 + your_main_term; 283 + Mqtte_cmd.init_config_cmd ~app_name ~fs; 284 + Mqtte_cmd.show_config_cmd ~app_name ~fs; 285 + ] 279 286 ]} *) 280 287 281 288 val init_config_cmd : ··· 283 290 (** [init_config_cmd ~app_name ~fs] creates an [init-config] subcommand that 284 291 writes a default configuration file to the XDG config directory. 285 292 286 - Supports [--force] flag to overwrite existing configuration. 287 - Returns exit code 0 on success, 1 if file exists and --force not given. *) 293 + Supports [--force] flag to overwrite existing configuration. Returns exit 294 + code 0 on success, 1 if file exists and --force not given. *) 288 295 289 296 val show_config_cmd : 290 297 app_name:string -> fs:Eio.Fs.dir_ty Eio.Path.t -> int Cmdliner.Cmd.t 291 298 (** [show_config_cmd ~app_name ~fs] creates a [show-config] subcommand that 292 - displays the configuration file path and its contents if it exists. 293 - Returns exit code 0. *) 299 + displays the configuration file path and its contents if it exists. Returns 300 + exit code 0. *)
+6 -6
lib/eio/client.ml
··· 111 111 112 112 let handle_incoming_packet t packet = 113 113 match packet with 114 - | Transport.V3 pkt -> begin 115 - match pkt with 114 + | Transport.V3 pkt -> 115 + begin match pkt with 116 116 | Mqtte.V3.Packet.Publish p -> 117 117 let msg = 118 118 { ··· 161 161 t.connected <- false; 162 162 t.should_stop <- true 163 163 | _ -> Log.warn (fun m -> m "Unexpected packet type") 164 - end 165 - | Transport.V5 pkt -> begin 166 - match pkt with 164 + end 165 + | Transport.V5 pkt -> 166 + begin match pkt with 167 167 | Mqtte.V5.Packet.Publish p -> 168 168 let msg = 169 169 { ··· 247 247 t.connected <- false; 248 248 t.should_stop <- true 249 249 | _ -> Log.warn (fun m -> m "Unexpected packet type") 250 - end 250 + end 251 251 252 252 let reader_fiber t reader = 253 253 Log.debug (fun m -> m "Starting reader fiber");
+2 -2
mqtte.opam
··· 15 15 "bytesrw" {>= "0.1"} 16 16 "bytesrw-eio" {>= "0.1"} 17 17 "conpool" 18 - "ca-certs" 18 + "nox-ca-certs" 19 19 "cmdliner" {>= "1.2"} 20 - "tls" 20 + "nox-tls" 21 21 "xdge" 22 22 "tomlt" 23 23 "logs" {>= "0.7"}