MQTT 3.1 and 5 in OCaml using Eio
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Refactor Packet modules to use submodules and tidy code

- Convert V3.Packet and V5.Packet inline records to proper submodules
(Connect, Connack, Publish, Puback, etc. each with type t)
- Extract pp_semi formatter to Shared module to reduce duplication
- Simplify parser many1 to use many, eliminating duplicate loop logic
- Update client.ml and tests to use new submodule syntax

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+1096 -681
+245 -92
lib/core/mqtte.mli
··· 3 3 SPDX-License-Identifier: ISC 4 4 ---------------------------------------------------------------------------*) 5 5 6 - (** MQTTE Protocol Library 6 + (** MQTT Protocol Library 7 7 8 - Types and codecs for MQTTE v3.1.1 and v5.0 protocols. 8 + Types and codecs for MQTT v3.1.1 and v5.0 protocols. 9 9 10 10 @see <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html> 11 - MQTTE v3.1.1 Specification 11 + MQTT v3.1.1 Specification 12 12 @see <https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html> 13 - MQTTE v5.0 Specification *) 13 + MQTT v5.0 Specification *) 14 14 15 15 (** {1 Quality of Service} 16 16 ··· 104 104 end 105 105 end 106 106 107 - (** {1 MQTTE v3.1.1} 107 + (** {1 MQTT v3.1.1} 108 108 109 109 @see <http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html> 110 110 OASIS Standard *) ··· 154 154 Section 2.2 *) 155 155 156 156 module Packet : sig 157 + (** {3 Connect} *) 158 + 159 + module Connect : sig 160 + type t = { 161 + clean_session : bool; 162 + keep_alive : int; 163 + client_id : string; 164 + credentials : Credentials.t option; 165 + will : Will.t option; 166 + } 167 + 168 + val pp : Format.formatter -> t -> unit 169 + end 170 + 171 + (** {3 Connack} *) 172 + 173 + module Connack : sig 174 + type t = { session_present : bool; return_code : Return_code.t } 175 + 176 + val pp : Format.formatter -> t -> unit 177 + end 178 + 179 + (** {3 Publish} *) 180 + 181 + module Publish : sig 182 + type t = { 183 + dup : bool; 184 + qos : Qos.t; 185 + retain : bool; 186 + topic : Topic.Name.t; 187 + packet_id : Packet_id.t option; 188 + payload : string; 189 + } 190 + 191 + val pp : Format.formatter -> t -> unit 192 + end 193 + 194 + (** {3 Subscribe} *) 195 + 196 + module Subscribe : sig 197 + type t = { packet_id : Packet_id.t; topics : Subscription.t list } 198 + 199 + val pp : Format.formatter -> t -> unit 200 + end 201 + 202 + (** {3 Suback} *) 203 + 204 + module Suback : sig 205 + type t = { packet_id : Packet_id.t; return_codes : Suback_code.t list } 206 + 207 + val pp : Format.formatter -> t -> unit 208 + end 209 + 210 + (** {3 Unsubscribe} *) 211 + 212 + module Unsubscribe : sig 213 + type t = { packet_id : Packet_id.t; topics : Topic.Filter.t list } 214 + 215 + val pp : Format.formatter -> t -> unit 216 + end 217 + 218 + (** {3 Packet Type} *) 219 + 157 220 type t = 158 - | Connect of { 159 - clean_session : bool; 160 - keep_alive : int; 161 - client_id : string; 162 - credentials : Credentials.t option; 163 - will : Will.t option; 164 - } 165 - | Connack of { session_present : bool; return_code : Return_code.t } 166 - | Publish of { 167 - dup : bool; 168 - qos : Qos.t; 169 - retain : bool; 170 - topic : Topic.Name.t; 171 - packet_id : Packet_id.t option; 172 - payload : string; 173 - } 221 + | Connect of Connect.t 222 + | Connack of Connack.t 223 + | Publish of Publish.t 174 224 | Puback of Packet_id.t 175 225 | Pubrec of Packet_id.t 176 226 | Pubrel of Packet_id.t 177 227 | Pubcomp of Packet_id.t 178 - | Subscribe of { packet_id : Packet_id.t; topics : Subscription.t list } 179 - | Suback of { packet_id : Packet_id.t; return_codes : Suback_code.t list } 180 - | Unsubscribe of { packet_id : Packet_id.t; topics : Topic.Filter.t list } 228 + | Subscribe of Subscribe.t 229 + | Suback of Suback.t 230 + | Unsubscribe of Unsubscribe.t 181 231 | Unsuback of Packet_id.t 182 232 | Pingreq 183 233 | Pingresp ··· 189 239 end 190 240 end 191 241 192 - (** {1 MQTTE v5.0} 242 + (** {1 MQTT v5.0} 193 243 194 244 @see <https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html> 195 245 OASIS Standard *) ··· 333 383 Section 2.1 *) 334 384 335 385 module Packet : sig 386 + (** {3 Connect} *) 387 + 388 + module Connect : sig 389 + type t = { 390 + clean_start : bool; 391 + keep_alive : int; 392 + client_id : string; 393 + credentials : Credentials.t option; 394 + will : Will_properties.t option; 395 + properties : Property.t list; 396 + } 397 + 398 + val pp : Format.formatter -> t -> unit 399 + end 400 + 401 + (** {3 Connack} *) 402 + 403 + module Connack : sig 404 + type t = { 405 + session_present : bool; 406 + reason_code : Reason_code.t; 407 + properties : Property.t list; 408 + } 409 + 410 + val pp : Format.formatter -> t -> unit 411 + end 412 + 413 + (** {3 Publish} *) 414 + 415 + module Publish : sig 416 + type t = { 417 + dup : bool; 418 + qos : Qos.t; 419 + retain : bool; 420 + topic : Topic.Name.t; 421 + packet_id : Packet_id.t option; 422 + payload : string; 423 + properties : Property.t list; 424 + } 425 + 426 + val pp : Format.formatter -> t -> unit 427 + end 428 + 429 + (** {3 Puback} *) 430 + 431 + module Puback : sig 432 + type t = { 433 + packet_id : Packet_id.t; 434 + reason_code : Reason_code.t; 435 + properties : Property.t list; 436 + } 437 + 438 + val pp : Format.formatter -> t -> unit 439 + end 440 + 441 + (** {3 Pubrec} *) 442 + 443 + module Pubrec : sig 444 + type t = { 445 + packet_id : Packet_id.t; 446 + reason_code : Reason_code.t; 447 + properties : Property.t list; 448 + } 449 + 450 + val pp : Format.formatter -> t -> unit 451 + end 452 + 453 + (** {3 Pubrel} *) 454 + 455 + module Pubrel : sig 456 + type t = { 457 + packet_id : Packet_id.t; 458 + reason_code : Reason_code.t; 459 + properties : Property.t list; 460 + } 461 + 462 + val pp : Format.formatter -> t -> unit 463 + end 464 + 465 + (** {3 Pubcomp} *) 466 + 467 + module Pubcomp : sig 468 + type t = { 469 + packet_id : Packet_id.t; 470 + reason_code : Reason_code.t; 471 + properties : Property.t list; 472 + } 473 + 474 + val pp : Format.formatter -> t -> unit 475 + end 476 + 477 + (** {3 Subscribe} *) 478 + 479 + module Subscribe : sig 480 + type t = { 481 + packet_id : Packet_id.t; 482 + properties : Property.t list; 483 + topics : Subscription.t list; 484 + } 485 + 486 + val pp : Format.formatter -> t -> unit 487 + end 488 + 489 + (** {3 Suback} *) 490 + 491 + module Suback : sig 492 + type t = { 493 + packet_id : Packet_id.t; 494 + properties : Property.t list; 495 + reason_codes : Reason_code.t list; 496 + } 497 + 498 + val pp : Format.formatter -> t -> unit 499 + end 500 + 501 + (** {3 Unsubscribe} *) 502 + 503 + module Unsubscribe : sig 504 + type t = { 505 + packet_id : Packet_id.t; 506 + properties : Property.t list; 507 + topics : Topic.Filter.t list; 508 + } 509 + 510 + val pp : Format.formatter -> t -> unit 511 + end 512 + 513 + (** {3 Unsuback} *) 514 + 515 + module Unsuback : sig 516 + type t = { 517 + packet_id : Packet_id.t; 518 + properties : Property.t list; 519 + reason_codes : Reason_code.t list; 520 + } 521 + 522 + val pp : Format.formatter -> t -> unit 523 + end 524 + 525 + (** {3 Disconnect} *) 526 + 527 + module Disconnect : sig 528 + type t = { reason_code : Reason_code.t; properties : Property.t list } 529 + 530 + val pp : Format.formatter -> t -> unit 531 + end 532 + 533 + (** {3 Auth} *) 534 + 535 + module Auth : sig 536 + type t = { reason_code : Reason_code.t; properties : Property.t list } 537 + 538 + val pp : Format.formatter -> t -> unit 539 + end 540 + 541 + (** {3 Packet Type} *) 542 + 336 543 type t = 337 - | Connect of { 338 - clean_start : bool; 339 - keep_alive : int; 340 - client_id : string; 341 - credentials : Credentials.t option; 342 - will : Will_properties.t option; 343 - properties : Property.t list; 344 - } 345 - | Connack of { 346 - session_present : bool; 347 - reason_code : Reason_code.t; 348 - properties : Property.t list; 349 - } 350 - | Publish of { 351 - dup : bool; 352 - qos : Qos.t; 353 - retain : bool; 354 - topic : Topic.Name.t; 355 - packet_id : Packet_id.t option; 356 - payload : string; 357 - properties : Property.t list; 358 - } 359 - | Puback of { 360 - packet_id : Packet_id.t; 361 - reason_code : Reason_code.t; 362 - properties : Property.t list; 363 - } 364 - | Pubrec of { 365 - packet_id : Packet_id.t; 366 - reason_code : Reason_code.t; 367 - properties : Property.t list; 368 - } 369 - | Pubrel of { 370 - packet_id : Packet_id.t; 371 - reason_code : Reason_code.t; 372 - properties : Property.t list; 373 - } 374 - | Pubcomp of { 375 - packet_id : Packet_id.t; 376 - reason_code : Reason_code.t; 377 - properties : Property.t list; 378 - } 379 - | Subscribe of { 380 - packet_id : Packet_id.t; 381 - properties : Property.t list; 382 - topics : Subscription.t list; 383 - } 384 - | Suback of { 385 - packet_id : Packet_id.t; 386 - properties : Property.t list; 387 - reason_codes : Reason_code.t list; 388 - } 389 - | Unsubscribe of { 390 - packet_id : Packet_id.t; 391 - properties : Property.t list; 392 - topics : Topic.Filter.t list; 393 - } 394 - | Unsuback of { 395 - packet_id : Packet_id.t; 396 - properties : Property.t list; 397 - reason_codes : Reason_code.t list; 398 - } 544 + | Connect of Connect.t 545 + | Connack of Connack.t 546 + | Publish of Publish.t 547 + | Puback of Puback.t 548 + | Pubrec of Pubrec.t 549 + | Pubrel of Pubrel.t 550 + | Pubcomp of Pubcomp.t 551 + | Subscribe of Subscribe.t 552 + | Suback of Suback.t 553 + | Unsubscribe of Unsubscribe.t 554 + | Unsuback of Unsuback.t 399 555 | Pingreq 400 556 | Pingresp 401 - | Disconnect of { 402 - reason_code : Reason_code.t; 403 - properties : Property.t list; 404 - } 405 - | Auth of { reason_code : Reason_code.t; properties : Property.t list } 557 + | Disconnect of Disconnect.t 558 + | Auth of Auth.t 406 559 407 560 val pp : Format.formatter -> t -> unit 408 561 val write : Bytesrw.Bytes.Writer.t -> t -> unit
+7 -9
lib/core/parser.ml
··· 165 165 166 166 (** {1 Combinators} *) 167 167 168 + let rec many f reader = 169 + if is_eod reader then [] 170 + else 171 + let v = f reader in 172 + v :: many f reader 173 + 168 174 let many1 f reader = 169 175 let first = f reader in 170 - let rec loop acc = 171 - if is_eod reader then List.rev acc 172 - else 173 - let v = f reader in 174 - loop (v :: acc) 175 - in 176 - first :: loop [] 177 - 178 - let many f reader = if is_eod reader then [] else many1 f reader 176 + first :: many f reader 179 177 180 178 let count n f reader = 181 179 let rec loop acc n =
+4
lib/core/shared.ml
··· 117 117 end 118 118 end 119 119 120 + (** {1 Formatting Helpers} *) 121 + 122 + let pp_semi ppf () = Format.fprintf ppf "; " 123 + 120 124 module Packet_type = struct 121 125 type t = 122 126 [ `RESERVED
+125 -105
lib/core/v3.ml
··· 79 79 module Packet = struct 80 80 module P = Parser 81 81 82 + module Connect = struct 83 + type t = { 84 + clean_session : bool; 85 + keep_alive : int; 86 + client_id : string; 87 + credentials : Shared.Credentials.t option; 88 + will : Shared.Will.t option; 89 + } 90 + 91 + let pp ppf t = 92 + Format.fprintf ppf 93 + "Connect{client_id=%s; clean_session=%b; keep_alive=%d}" t.client_id 94 + t.clean_session t.keep_alive 95 + end 96 + 97 + module Connack = struct 98 + type t = { session_present : bool; return_code : Return_code.t } 99 + 100 + let pp ppf t = 101 + Format.fprintf ppf "Connack{session_present=%b; return_code=%a}" 102 + t.session_present Return_code.pp t.return_code 103 + end 104 + 105 + module Publish = struct 106 + type t = { 107 + dup : bool; 108 + qos : Shared.Qos.t; 109 + retain : bool; 110 + topic : Shared.Topic.Name.t; 111 + packet_id : Shared.Packet_id.t option; 112 + payload : string; 113 + } 114 + 115 + let pp ppf t = 116 + Format.fprintf ppf 117 + "Publish{topic=%s; qos=%a; dup=%b; retain=%b; payload=<%d bytes>}" 118 + t.topic Shared.Qos.pp t.qos t.dup t.retain (String.length t.payload) 119 + end 120 + 121 + module Subscribe = struct 122 + type t = { packet_id : Shared.Packet_id.t; topics : Subscription.t list } 123 + 124 + let pp ppf t = 125 + Format.fprintf ppf "Subscribe{packet_id=%d; topics=[%a]}" t.packet_id 126 + (Format.pp_print_list ~pp_sep:Shared.pp_semi Subscription.pp) 127 + t.topics 128 + end 129 + 130 + module Suback = struct 131 + type t = { 132 + packet_id : Shared.Packet_id.t; 133 + return_codes : Suback_code.t list; 134 + } 135 + 136 + let pp ppf t = 137 + Format.fprintf ppf "Suback{packet_id=%d; return_codes=[%a]}" t.packet_id 138 + (Format.pp_print_list ~pp_sep:Shared.pp_semi Suback_code.pp) 139 + t.return_codes 140 + end 141 + 142 + module Unsubscribe = struct 143 + type t = { 144 + packet_id : Shared.Packet_id.t; 145 + topics : Shared.Topic.Filter.t list; 146 + } 147 + 148 + let pp ppf t = 149 + Format.fprintf ppf "Unsubscribe{packet_id=%d; topics=[%a]}" t.packet_id 150 + (Format.pp_print_list ~pp_sep:Shared.pp_semi Shared.Topic.Filter.pp) 151 + t.topics 152 + end 153 + 82 154 type t = 83 - | Connect of { 84 - clean_session : bool; 85 - keep_alive : int; 86 - client_id : string; 87 - credentials : Shared.Credentials.t option; 88 - will : Shared.Will.t option; 89 - } 90 - | Connack of { session_present : bool; return_code : Return_code.t } 91 - | Publish of { 92 - dup : bool; 93 - qos : Shared.Qos.t; 94 - retain : bool; 95 - topic : Shared.Topic.Name.t; 96 - packet_id : Shared.Packet_id.t option; 97 - payload : string; 98 - } 155 + | Connect of Connect.t 156 + | Connack of Connack.t 157 + | Publish of Publish.t 99 158 | Puback of Shared.Packet_id.t 100 159 | Pubrec of Shared.Packet_id.t 101 160 | Pubrel of Shared.Packet_id.t 102 161 | Pubcomp of Shared.Packet_id.t 103 - | Subscribe of { 104 - packet_id : Shared.Packet_id.t; 105 - topics : Subscription.t list; 106 - } 107 - | Suback of { 108 - packet_id : Shared.Packet_id.t; 109 - return_codes : Suback_code.t list; 110 - } 111 - | Unsubscribe of { 112 - packet_id : Shared.Packet_id.t; 113 - topics : Shared.Topic.Filter.t list; 114 - } 162 + | Subscribe of Subscribe.t 163 + | Suback of Suback.t 164 + | Unsubscribe of Unsubscribe.t 115 165 | Unsuback of Shared.Packet_id.t 116 166 | Pingreq 117 167 | Pingresp 118 168 | Disconnect 119 169 120 170 let pp ppf = function 121 - | Connect c -> 122 - Format.fprintf ppf 123 - "Connect{client_id=%s; clean_session=%b; keep_alive=%d}" c.client_id 124 - c.clean_session c.keep_alive 125 - | Connack c -> 126 - Format.fprintf ppf "Connack{session_present=%b; return_code=%a}" 127 - c.session_present Return_code.pp c.return_code 128 - | Publish p -> 129 - Format.fprintf ppf 130 - "Publish{topic=%s; qos=%a; dup=%b; retain=%b; payload=<%d bytes>}" 131 - p.topic Shared.Qos.pp p.qos p.dup p.retain (String.length p.payload) 171 + | Connect c -> Connect.pp ppf c 172 + | Connack c -> Connack.pp ppf c 173 + | Publish p -> Publish.pp ppf p 132 174 | Puback id -> Format.fprintf ppf "Puback(%d)" id 133 175 | Pubrec id -> Format.fprintf ppf "Pubrec(%d)" id 134 176 | Pubrel id -> Format.fprintf ppf "Pubrel(%d)" id 135 177 | Pubcomp id -> Format.fprintf ppf "Pubcomp(%d)" id 136 - | Subscribe s -> 137 - Format.fprintf ppf "Subscribe{packet_id=%d; topics=[%a]}" s.packet_id 138 - (Format.pp_print_list 139 - ~pp_sep:(fun ppf () -> Format.fprintf ppf "; ") 140 - Subscription.pp) 141 - s.topics 142 - | Suback s -> 143 - Format.fprintf ppf "Suback{packet_id=%d; return_codes=[%a]}" s.packet_id 144 - (Format.pp_print_list 145 - ~pp_sep:(fun ppf () -> Format.fprintf ppf "; ") 146 - Suback_code.pp) 147 - s.return_codes 148 - | Unsubscribe u -> 149 - Format.fprintf ppf "Unsubscribe{packet_id=%d; topics=[%a]}" u.packet_id 150 - (Format.pp_print_list 151 - ~pp_sep:(fun ppf () -> Format.fprintf ppf "; ") 152 - Shared.Topic.Filter.pp) 153 - u.topics 178 + | Subscribe s -> Subscribe.pp ppf s 179 + | Suback s -> Suback.pp ppf s 180 + | Unsubscribe u -> Unsubscribe.pp ppf u 154 181 | Unsuback id -> Format.fprintf ppf "Unsuback(%d)" id 155 182 | Pingreq -> Format.fprintf ppf "Pingreq" 156 183 | Pingresp -> Format.fprintf ppf "Pingresp" ··· 158 185 159 186 (** {1 Encoding} *) 160 187 161 - let write_connect writer ~clean_session ~keep_alive ~client_id ~credentials 162 - ~will = 188 + let write_connect writer (c : Connect.t) = 163 189 let payload = 164 190 P.to_string (fun w -> 165 191 P.write_mqtt_string w "MQTT"; 166 192 P.write_uint8 w 4; 167 193 let flags = ref 0 in 168 - if clean_session then flags := !flags lor 0x02; 194 + if c.clean_session then flags := !flags lor 0x02; 169 195 Option.iter 170 196 (fun w' -> 171 197 flags := !flags lor 0x04; 172 198 flags := !flags lor (Shared.Qos.to_int (Shared.Will.qos w') lsl 3); 173 199 if Shared.Will.retain w' then flags := !flags lor 0x20) 174 - will; 175 - (match credentials with 200 + c.will; 201 + (match c.credentials with 176 202 | Some (`Username _) -> flags := !flags lor 0x80 177 203 | Some (`Username_password _) -> flags := !flags lor 0xC0 178 204 | None -> ()); 179 205 P.write_uint8 w !flags; 180 - P.write_uint16_be w keep_alive; 181 - P.write_mqtt_string w client_id; 206 + P.write_uint16_be w c.keep_alive; 207 + P.write_mqtt_string w c.client_id; 182 208 Option.iter 183 209 (fun w' -> 184 210 P.write_mqtt_string w (Shared.Will.topic w'); 185 211 P.write_mqtt_string w (Shared.Will.payload w')) 186 - will; 187 - match credentials with 212 + c.will; 213 + match c.credentials with 188 214 | Some (`Username u) -> P.write_mqtt_string w u 189 215 | Some (`Username_password (username, password)) -> 190 216 P.write_mqtt_string w username; ··· 194 220 P.write_fixed_header writer `CONNECT 0 (String.length payload); 195 221 P.write_string writer payload 196 222 197 - let write_connack writer ~session_present ~return_code = 223 + let write_connack writer (c : Connack.t) = 198 224 P.write_fixed_header writer `CONNACK 0 2; 199 - P.write_uint8 writer (if session_present then 0x01 else 0x00); 200 - P.write_uint8 writer (Return_code.to_int return_code) 225 + P.write_uint8 writer (if c.session_present then 0x01 else 0x00); 226 + P.write_uint8 writer (Return_code.to_int c.return_code) 201 227 202 - let write_publish writer ~dup ~qos ~retain ~topic ~packet_id ~payload:msg = 228 + let write_publish writer (p : Publish.t) = 203 229 let body = 204 230 P.to_string (fun w -> 205 - P.write_mqtt_string w topic; 206 - (match packet_id with 207 - | Some id when qos <> `At_most_once -> P.write_uint16_be w id 231 + P.write_mqtt_string w p.topic; 232 + (match p.packet_id with 233 + | Some id when p.qos <> `At_most_once -> P.write_uint16_be w id 208 234 | _ -> ()); 209 - P.write_string w msg) 235 + P.write_string w p.payload) 210 236 in 211 237 let flags = 212 - (if dup then 0x08 else 0) 213 - lor (Shared.Qos.to_int qos lsl 1) 214 - lor if retain then 0x01 else 0 238 + (if p.dup then 0x08 else 0) 239 + lor (Shared.Qos.to_int p.qos lsl 1) 240 + lor if p.retain then 0x01 else 0 215 241 in 216 242 P.write_fixed_header writer `PUBLISH flags (String.length body); 217 243 P.write_string writer body ··· 225 251 let write_pubrel writer id = write_pubx writer `PUBREL ~flags:0x02 id 226 252 let write_pubcomp writer id = write_pubx writer `PUBCOMP id 227 253 228 - let write_subscribe writer ~packet_id ~topics = 254 + let write_subscribe writer (s : Subscribe.t) = 229 255 let payload = 230 256 P.to_string (fun w -> 231 - P.write_uint16_be w packet_id; 257 + P.write_uint16_be w s.packet_id; 232 258 List.iter 233 259 (fun (t : Subscription.t) -> 234 260 P.write_mqtt_string w t.filter; 235 261 P.write_uint8 w (Shared.Qos.to_int t.qos)) 236 - topics) 262 + s.topics) 237 263 in 238 264 P.write_fixed_header writer `SUBSCRIBE 0x02 (String.length payload); 239 265 P.write_string writer payload 240 266 241 - let write_suback writer ~packet_id ~return_codes = 267 + let write_suback writer (s : Suback.t) = 242 268 let payload = 243 269 P.to_string (fun w -> 244 - P.write_uint16_be w packet_id; 270 + P.write_uint16_be w s.packet_id; 245 271 List.iter 246 272 (fun rc -> P.write_uint8 w (Suback_code.to_int rc)) 247 - return_codes) 273 + s.return_codes) 248 274 in 249 275 P.write_fixed_header writer `SUBACK 0 (String.length payload); 250 276 P.write_string writer payload 251 277 252 - let write_unsubscribe writer ~packet_id ~topics = 278 + let write_unsubscribe writer (u : Unsubscribe.t) = 253 279 let payload = 254 280 P.to_string (fun w -> 255 - P.write_uint16_be w packet_id; 256 - List.iter (fun topic -> P.write_mqtt_string w topic) topics) 281 + P.write_uint16_be w u.packet_id; 282 + List.iter (fun topic -> P.write_mqtt_string w topic) u.topics) 257 283 in 258 284 P.write_fixed_header writer `UNSUBSCRIBE 0x02 (String.length payload); 259 285 P.write_string writer payload ··· 267 293 let write_disconnect writer = P.write_fixed_header writer `DISCONNECT 0 0 268 294 269 295 let write writer = function 270 - | Connect { clean_session; keep_alive; client_id; credentials; will } -> 271 - write_connect writer ~clean_session ~keep_alive ~client_id ~credentials 272 - ~will 273 - | Connack { session_present; return_code } -> 274 - write_connack writer ~session_present ~return_code 275 - | Publish { dup; qos; retain; topic; packet_id; payload } -> 276 - write_publish writer ~dup ~qos ~retain ~topic ~packet_id ~payload 296 + | Connect c -> write_connect writer c 297 + | Connack c -> write_connack writer c 298 + | Publish p -> write_publish writer p 277 299 | Puback id -> write_puback writer id 278 300 | Pubrec id -> write_pubrec writer id 279 301 | Pubrel id -> write_pubrel writer id 280 302 | Pubcomp id -> write_pubcomp writer id 281 - | Subscribe { packet_id; topics } -> 282 - write_subscribe writer ~packet_id ~topics 283 - | Suback { packet_id; return_codes } -> 284 - write_suback writer ~packet_id ~return_codes 285 - | Unsubscribe { packet_id; topics } -> 286 - write_unsubscribe writer ~packet_id ~topics 303 + | Subscribe s -> write_subscribe writer s 304 + | Suback s -> write_suback writer s 305 + | Unsubscribe u -> write_unsubscribe writer u 287 306 | Unsuback id -> write_unsuback writer id 288 307 | Pingreq -> write_pingreq writer 289 308 | Pingresp -> write_pingresp writer ··· 322 341 else Some (`Username username) 323 342 else None 324 343 in 325 - Connect { client_id; clean_session; keep_alive; credentials; will } 344 + Connect Connect.{ client_id; clean_session; keep_alive; credentials; will } 326 345 327 346 let read_connack reader = 328 347 let flags = P.uint8 reader in 329 348 let session_present = flags land 0x01 <> 0 in 330 349 let return_code = P.uint8 reader in 331 - Connack { session_present; return_code = Return_code.of_int return_code } 350 + Connack 351 + Connack.{ session_present; return_code = Return_code.of_int return_code } 332 352 333 353 let read_publish ~flags reader = 334 354 let dup = flags land 0x08 <> 0 in ··· 339 359 if qos <> `At_most_once then Some (P.uint16_be reader) else None 340 360 in 341 361 let payload = P.take_rest reader in 342 - Publish { dup; qos; retain; topic; packet_id; payload } 362 + Publish Publish.{ dup; qos; retain; topic; packet_id; payload } 343 363 344 364 let read_puback reader = Puback (P.uint16_be reader) 345 365 let read_pubrec reader = Pubrec (P.uint16_be reader) ··· 355 375 Subscription.{ filter; qos } 356 376 in 357 377 let topics = P.many1 read_topic reader in 358 - Subscribe { packet_id; topics } 378 + Subscribe Subscribe.{ packet_id; topics } 359 379 360 380 let read_suback ~remaining_length reader = 361 381 let packet_id = P.uint16_be reader in 362 382 let num_codes = remaining_length - 2 in 363 383 let read_code reader = Suback_code.of_int (P.uint8 reader) in 364 384 let return_codes = P.count num_codes read_code reader in 365 - Suback { packet_id; return_codes } 385 + Suback Suback.{ packet_id; return_codes } 366 386 367 387 let read_unsubscribe reader = 368 388 let packet_id = P.uint16_be reader in 369 389 let topics = P.many1 P.mqtt_string reader in 370 - Unsubscribe { packet_id; topics } 390 + Unsubscribe Unsubscribe.{ packet_id; topics } 371 391 372 392 let read_unsuback reader = Unsuback (P.uint16_be reader) 373 393
+75 -28
lib/core/v3.mli
··· 52 52 Section 2.2 *) 53 53 54 54 module Packet : sig 55 + (** {2 Connect} *) 56 + 57 + module Connect : sig 58 + type t = { 59 + clean_session : bool; 60 + keep_alive : int; 61 + client_id : string; 62 + credentials : Shared.Credentials.t option; 63 + will : Shared.Will.t option; 64 + } 65 + 66 + val pp : Format.formatter -> t -> unit 67 + end 68 + 69 + (** {2 Connack} *) 70 + 71 + module Connack : sig 72 + type t = { session_present : bool; return_code : Return_code.t } 73 + 74 + val pp : Format.formatter -> t -> unit 75 + end 76 + 77 + (** {2 Publish} *) 78 + 79 + module Publish : sig 80 + type t = { 81 + dup : bool; 82 + qos : Shared.Qos.t; 83 + retain : bool; 84 + topic : Shared.Topic.Name.t; 85 + packet_id : Shared.Packet_id.t option; 86 + payload : string; 87 + } 88 + 89 + val pp : Format.formatter -> t -> unit 90 + end 91 + 92 + (** {2 Subscribe} *) 93 + 94 + module Subscribe : sig 95 + type t = { packet_id : Shared.Packet_id.t; topics : Subscription.t list } 96 + 97 + val pp : Format.formatter -> t -> unit 98 + end 99 + 100 + (** {2 Suback} *) 101 + 102 + module Suback : sig 103 + type t = { 104 + packet_id : Shared.Packet_id.t; 105 + return_codes : Suback_code.t list; 106 + } 107 + 108 + val pp : Format.formatter -> t -> unit 109 + end 110 + 111 + (** {2 Unsubscribe} *) 112 + 113 + module Unsubscribe : sig 114 + type t = { 115 + packet_id : Shared.Packet_id.t; 116 + topics : Shared.Topic.Filter.t list; 117 + } 118 + 119 + val pp : Format.formatter -> t -> unit 120 + end 121 + 122 + (** {2 Packet Type} *) 123 + 55 124 type t = 56 - | Connect of { 57 - clean_session : bool; 58 - keep_alive : int; 59 - client_id : string; 60 - credentials : Shared.Credentials.t option; 61 - will : Shared.Will.t option; 62 - } 63 - | Connack of { session_present : bool; return_code : Return_code.t } 64 - | Publish of { 65 - dup : bool; 66 - qos : Shared.Qos.t; 67 - retain : bool; 68 - topic : Shared.Topic.Name.t; 69 - packet_id : Shared.Packet_id.t option; 70 - payload : string; 71 - } 125 + | Connect of Connect.t 126 + | Connack of Connack.t 127 + | Publish of Publish.t 72 128 | Puback of Shared.Packet_id.t 73 129 | Pubrec of Shared.Packet_id.t 74 130 | Pubrel of Shared.Packet_id.t 75 131 | Pubcomp of Shared.Packet_id.t 76 - | Subscribe of { 77 - packet_id : Shared.Packet_id.t; 78 - topics : Subscription.t list; 79 - } 80 - | Suback of { 81 - packet_id : Shared.Packet_id.t; 82 - return_codes : Suback_code.t list; 83 - } 84 - | Unsubscribe of { 85 - packet_id : Shared.Packet_id.t; 86 - topics : Shared.Topic.Filter.t list; 87 - } 132 + | Subscribe of Subscribe.t 133 + | Suback of Suback.t 134 + | Unsubscribe of Unsubscribe.t 88 135 | Unsuback of Shared.Packet_id.t 89 136 | Pingreq 90 137 | Pingresp
+279 -218
lib/core/v5.ml
··· 531 531 module Packet = struct 532 532 module P = Parser 533 533 534 + module Connect = struct 535 + type t = { 536 + clean_start : bool; 537 + keep_alive : int; 538 + client_id : string; 539 + credentials : Shared.Credentials.t option; 540 + will : Will_properties.t option; 541 + properties : Property.t list; 542 + } 543 + 544 + let pp ppf t = 545 + Format.fprintf ppf "Connect{client_id=%s; clean_start=%b; keep_alive=%d}" 546 + t.client_id t.clean_start t.keep_alive 547 + end 548 + 549 + module Connack = struct 550 + type t = { 551 + session_present : bool; 552 + reason_code : Reason_code.t; 553 + properties : Property.t list; 554 + } 555 + 556 + let pp ppf t = 557 + Format.fprintf ppf "Connack{session_present=%b; reason_code=%a}" 558 + t.session_present Reason_code.pp t.reason_code 559 + end 560 + 561 + module Publish = struct 562 + type t = { 563 + dup : bool; 564 + qos : Shared.Qos.t; 565 + retain : bool; 566 + topic : Shared.Topic.Name.t; 567 + packet_id : Shared.Packet_id.t option; 568 + payload : string; 569 + properties : Property.t list; 570 + } 571 + 572 + let pp ppf t = 573 + Format.fprintf ppf 574 + "Publish{topic=%s; qos=%a; dup=%b; retain=%b; payload=<%d bytes>}" 575 + t.topic Shared.Qos.pp t.qos t.dup t.retain (String.length t.payload) 576 + end 577 + 578 + module Puback = struct 579 + type t = { 580 + packet_id : Shared.Packet_id.t; 581 + reason_code : Reason_code.t; 582 + properties : Property.t list; 583 + } 584 + 585 + let pp ppf t = 586 + Format.fprintf ppf "Puback{packet_id=%d; reason_code=%a}" t.packet_id 587 + Reason_code.pp t.reason_code 588 + end 589 + 590 + module Pubrec = struct 591 + type t = { 592 + packet_id : Shared.Packet_id.t; 593 + reason_code : Reason_code.t; 594 + properties : Property.t list; 595 + } 596 + 597 + let pp ppf t = 598 + Format.fprintf ppf "Pubrec{packet_id=%d; reason_code=%a}" t.packet_id 599 + Reason_code.pp t.reason_code 600 + end 601 + 602 + module Pubrel = struct 603 + type t = { 604 + packet_id : Shared.Packet_id.t; 605 + reason_code : Reason_code.t; 606 + properties : Property.t list; 607 + } 608 + 609 + let pp ppf t = 610 + Format.fprintf ppf "Pubrel{packet_id=%d; reason_code=%a}" t.packet_id 611 + Reason_code.pp t.reason_code 612 + end 613 + 614 + module Pubcomp = struct 615 + type t = { 616 + packet_id : Shared.Packet_id.t; 617 + reason_code : Reason_code.t; 618 + properties : Property.t list; 619 + } 620 + 621 + let pp ppf t = 622 + Format.fprintf ppf "Pubcomp{packet_id=%d; reason_code=%a}" t.packet_id 623 + Reason_code.pp t.reason_code 624 + end 625 + 626 + module Subscribe = struct 627 + type t = { 628 + packet_id : Shared.Packet_id.t; 629 + properties : Property.t list; 630 + topics : Subscription.t list; 631 + } 632 + 633 + let pp ppf t = 634 + Format.fprintf ppf "Subscribe{packet_id=%d; topics=[%a]}" t.packet_id 635 + (Format.pp_print_list ~pp_sep:Shared.pp_semi Subscription.pp) 636 + t.topics 637 + end 638 + 639 + module Suback = struct 640 + type t = { 641 + packet_id : Shared.Packet_id.t; 642 + properties : Property.t list; 643 + reason_codes : Reason_code.t list; 644 + } 645 + 646 + let pp ppf t = 647 + Format.fprintf ppf "Suback{packet_id=%d; reason_codes=[%a]}" t.packet_id 648 + (Format.pp_print_list ~pp_sep:Shared.pp_semi Reason_code.pp) 649 + t.reason_codes 650 + end 651 + 652 + module Unsubscribe = struct 653 + type t = { 654 + packet_id : Shared.Packet_id.t; 655 + properties : Property.t list; 656 + topics : Shared.Topic.Filter.t list; 657 + } 658 + 659 + let pp ppf t = 660 + Format.fprintf ppf "Unsubscribe{packet_id=%d; topics=[%a]}" t.packet_id 661 + (Format.pp_print_list ~pp_sep:Shared.pp_semi Shared.Topic.Filter.pp) 662 + t.topics 663 + end 664 + 665 + module Unsuback = struct 666 + type t = { 667 + packet_id : Shared.Packet_id.t; 668 + properties : Property.t list; 669 + reason_codes : Reason_code.t list; 670 + } 671 + 672 + let pp ppf t = 673 + Format.fprintf ppf "Unsuback{packet_id=%d; reason_codes=[%a]}" t.packet_id 674 + (Format.pp_print_list ~pp_sep:Shared.pp_semi Reason_code.pp) 675 + t.reason_codes 676 + end 677 + 678 + module Disconnect = struct 679 + type t = { reason_code : Reason_code.t; properties : Property.t list } 680 + 681 + let pp ppf t = 682 + Format.fprintf ppf "Disconnect{reason_code=%a}" Reason_code.pp 683 + t.reason_code 684 + end 685 + 686 + module Auth = struct 687 + type t = { reason_code : Reason_code.t; properties : Property.t list } 688 + 689 + let pp ppf t = 690 + Format.fprintf ppf "Auth{reason_code=%a}" Reason_code.pp t.reason_code 691 + end 692 + 534 693 type t = 535 - | Connect of { 536 - clean_start : bool; 537 - keep_alive : int; 538 - client_id : string; 539 - credentials : Shared.Credentials.t option; 540 - will : Will_properties.t option; 541 - properties : Property.t list; 542 - } 543 - | Connack of { 544 - session_present : bool; 545 - reason_code : Reason_code.t; 546 - properties : Property.t list; 547 - } 548 - | Publish of { 549 - dup : bool; 550 - qos : Shared.Qos.t; 551 - retain : bool; 552 - topic : Shared.Topic.Name.t; 553 - packet_id : Shared.Packet_id.t option; 554 - payload : string; 555 - properties : Property.t list; 556 - } 557 - | Puback of { 558 - packet_id : Shared.Packet_id.t; 559 - reason_code : Reason_code.t; 560 - properties : Property.t list; 561 - } 562 - | Pubrec of { 563 - packet_id : Shared.Packet_id.t; 564 - reason_code : Reason_code.t; 565 - properties : Property.t list; 566 - } 567 - | Pubrel of { 568 - packet_id : Shared.Packet_id.t; 569 - reason_code : Reason_code.t; 570 - properties : Property.t list; 571 - } 572 - | Pubcomp of { 573 - packet_id : Shared.Packet_id.t; 574 - reason_code : Reason_code.t; 575 - properties : Property.t list; 576 - } 577 - | Subscribe of { 578 - packet_id : Shared.Packet_id.t; 579 - properties : Property.t list; 580 - topics : Subscription.t list; 581 - } 582 - | Suback of { 583 - packet_id : Shared.Packet_id.t; 584 - properties : Property.t list; 585 - reason_codes : Reason_code.t list; 586 - } 587 - | Unsubscribe of { 588 - packet_id : Shared.Packet_id.t; 589 - properties : Property.t list; 590 - topics : Shared.Topic.Filter.t list; 591 - } 592 - | Unsuback of { 593 - packet_id : Shared.Packet_id.t; 594 - properties : Property.t list; 595 - reason_codes : Reason_code.t list; 596 - } 694 + | Connect of Connect.t 695 + | Connack of Connack.t 696 + | Publish of Publish.t 697 + | Puback of Puback.t 698 + | Pubrec of Pubrec.t 699 + | Pubrel of Pubrel.t 700 + | Pubcomp of Pubcomp.t 701 + | Subscribe of Subscribe.t 702 + | Suback of Suback.t 703 + | Unsubscribe of Unsubscribe.t 704 + | Unsuback of Unsuback.t 597 705 | Pingreq 598 706 | Pingresp 599 - | Disconnect of { 600 - reason_code : Reason_code.t; 601 - properties : Property.t list; 602 - } 603 - | Auth of { reason_code : Reason_code.t; properties : Property.t list } 707 + | Disconnect of Disconnect.t 708 + | Auth of Auth.t 604 709 605 710 let pp ppf = function 606 - | Connect c -> 607 - Format.fprintf ppf 608 - "Connect{client_id=%s; clean_start=%b; keep_alive=%d}" c.client_id 609 - c.clean_start c.keep_alive 610 - | Connack c -> 611 - Format.fprintf ppf "Connack{session_present=%b; reason_code=%a}" 612 - c.session_present Reason_code.pp c.reason_code 613 - | Publish p -> 614 - Format.fprintf ppf 615 - "Publish{topic=%s; qos=%a; dup=%b; retain=%b; payload=<%d bytes>}" 616 - p.topic Shared.Qos.pp p.qos p.dup p.retain (String.length p.payload) 617 - | Puback p -> 618 - Format.fprintf ppf "Puback{packet_id=%d; reason_code=%a}" p.packet_id 619 - Reason_code.pp p.reason_code 620 - | Pubrec p -> 621 - Format.fprintf ppf "Pubrec{packet_id=%d; reason_code=%a}" p.packet_id 622 - Reason_code.pp p.reason_code 623 - | Pubrel p -> 624 - Format.fprintf ppf "Pubrel{packet_id=%d; reason_code=%a}" p.packet_id 625 - Reason_code.pp p.reason_code 626 - | Pubcomp p -> 627 - Format.fprintf ppf "Pubcomp{packet_id=%d; reason_code=%a}" p.packet_id 628 - Reason_code.pp p.reason_code 629 - | Subscribe s -> 630 - Format.fprintf ppf "Subscribe{packet_id=%d; topics=[%a]}" s.packet_id 631 - (Format.pp_print_list 632 - ~pp_sep:(fun ppf () -> Format.fprintf ppf "; ") 633 - Subscription.pp) 634 - s.topics 635 - | Suback s -> 636 - Format.fprintf ppf "Suback{packet_id=%d; reason_codes=[%a]}" s.packet_id 637 - (Format.pp_print_list 638 - ~pp_sep:(fun ppf () -> Format.fprintf ppf "; ") 639 - Reason_code.pp) 640 - s.reason_codes 641 - | Unsubscribe u -> 642 - Format.fprintf ppf "Unsubscribe{packet_id=%d; topics=[%a]}" u.packet_id 643 - (Format.pp_print_list 644 - ~pp_sep:(fun ppf () -> Format.fprintf ppf "; ") 645 - Shared.Topic.Filter.pp) 646 - u.topics 647 - | Unsuback u -> 648 - Format.fprintf ppf "Unsuback{packet_id=%d; reason_codes=[%a]}" 649 - u.packet_id 650 - (Format.pp_print_list 651 - ~pp_sep:(fun ppf () -> Format.fprintf ppf "; ") 652 - Reason_code.pp) 653 - u.reason_codes 711 + | Connect c -> Connect.pp ppf c 712 + | Connack c -> Connack.pp ppf c 713 + | Publish p -> Publish.pp ppf p 714 + | Puback p -> Puback.pp ppf p 715 + | Pubrec p -> Pubrec.pp ppf p 716 + | Pubrel p -> Pubrel.pp ppf p 717 + | Pubcomp p -> Pubcomp.pp ppf p 718 + | Subscribe s -> Subscribe.pp ppf s 719 + | Suback s -> Suback.pp ppf s 720 + | Unsubscribe u -> Unsubscribe.pp ppf u 721 + | Unsuback u -> Unsuback.pp ppf u 654 722 | Pingreq -> Format.fprintf ppf "Pingreq" 655 723 | Pingresp -> Format.fprintf ppf "Pingresp" 656 - | Disconnect d -> 657 - Format.fprintf ppf "Disconnect{reason_code=%a}" Reason_code.pp 658 - d.reason_code 659 - | Auth a -> 660 - Format.fprintf ppf "Auth{reason_code=%a}" Reason_code.pp a.reason_code 724 + | Disconnect d -> Disconnect.pp ppf d 725 + | Auth a -> Auth.pp ppf a 661 726 662 727 (** {1 Encoding} *) 663 728 664 - let write_connect writer ~clean_start ~keep_alive ~client_id ~credentials 665 - ~will ~properties = 729 + let write_connect writer (c : Connect.t) = 666 730 let payload = 667 731 P.to_string (fun w -> 668 732 P.write_mqtt_string w "MQTT"; 669 733 P.write_uint8 w 5; 670 734 let flags = ref 0 in 671 - if clean_start then flags := !flags lor 0x02; 672 - (match will with 735 + if c.clean_start then flags := !flags lor 0x02; 736 + (match c.will with 673 737 | Some (will : Will_properties.t) -> 674 738 flags := !flags lor 0x04; 675 739 flags := !flags lor (Shared.Qos.to_int will.will_qos lsl 3); 676 740 if will.will_retain then flags := !flags lor 0x20 677 741 | None -> ()); 678 - (match credentials with 742 + (match c.credentials with 679 743 | Some (`Username _) -> flags := !flags lor 0x80 680 744 | Some (`Username_password _) -> flags := !flags lor 0xC0 681 745 | None -> ()); 682 746 P.write_uint8 w !flags; 683 - P.write_uint16_be w keep_alive; 684 - Property.write_properties w properties; 685 - P.write_mqtt_string w client_id; 686 - (match will with 747 + P.write_uint16_be w c.keep_alive; 748 + Property.write_properties w c.properties; 749 + P.write_mqtt_string w c.client_id; 750 + (match c.will with 687 751 | Some (will : Will_properties.t) -> 688 752 Property.write_properties w will.will_properties; 689 753 P.write_mqtt_string w will.will_topic; 690 754 P.write_mqtt_string w will.will_payload 691 755 | None -> ()); 692 - match credentials with 756 + match c.credentials with 693 757 | Some (`Username u) -> P.write_mqtt_string w u 694 758 | Some (`Username_password (username, password)) -> 695 759 P.write_mqtt_string w username; ··· 699 763 P.write_fixed_header writer `CONNECT 0 (String.length payload); 700 764 P.write_string writer payload 701 765 702 - let write_connack writer ~session_present ~reason_code ~properties = 766 + let write_connack writer (c : Connack.t) = 703 767 let payload = 704 768 P.to_string (fun w -> 705 - P.write_uint8 w (if session_present then 0x01 else 0x00); 706 - P.write_uint8 w (Reason_code.to_int reason_code); 707 - Property.write_properties w properties) 769 + P.write_uint8 w (if c.session_present then 0x01 else 0x00); 770 + P.write_uint8 w (Reason_code.to_int c.reason_code); 771 + Property.write_properties w c.properties) 708 772 in 709 773 P.write_fixed_header writer `CONNACK 0 (String.length payload); 710 774 P.write_string writer payload 711 775 712 - let write_publish writer ~dup ~qos ~retain ~topic ~packet_id ~payload:msg 713 - ~properties = 776 + let write_publish writer (p : Publish.t) = 714 777 let body = 715 778 P.to_string (fun w -> 716 - P.write_mqtt_string w topic; 717 - (match packet_id with 718 - | Some id when qos <> `At_most_once -> P.write_uint16_be w id 779 + P.write_mqtt_string w p.topic; 780 + (match p.packet_id with 781 + | Some id when p.qos <> `At_most_once -> P.write_uint16_be w id 719 782 | _ -> ()); 720 - Property.write_properties w properties; 721 - P.write_string w msg) 783 + Property.write_properties w p.properties; 784 + P.write_string w p.payload) 722 785 in 723 786 let flags = 724 - (if dup then 0x08 else 0) 725 - lor (Shared.Qos.to_int qos lsl 1) 726 - lor if retain then 0x01 else 0 787 + (if p.dup then 0x08 else 0) 788 + lor (Shared.Qos.to_int p.qos lsl 1) 789 + lor if p.retain then 0x01 else 0 727 790 in 728 791 P.write_fixed_header writer `PUBLISH flags (String.length body); 729 792 P.write_string writer body ··· 736 799 if properties <> [] then Property.write_properties w properties 737 800 end) 738 801 739 - let write_puback writer ~packet_id ~reason_code ~properties = 740 - let payload = write_pubx_payload ~packet_id ~reason_code ~properties in 802 + let write_puback writer (p : Puback.t) = 803 + let payload = 804 + write_pubx_payload ~packet_id:p.packet_id ~reason_code:p.reason_code 805 + ~properties:p.properties 806 + in 741 807 P.write_fixed_header writer `PUBACK 0 (String.length payload); 742 808 P.write_string writer payload 743 809 744 - let write_pubrec writer ~packet_id ~reason_code ~properties = 745 - let payload = write_pubx_payload ~packet_id ~reason_code ~properties in 810 + let write_pubrec writer (p : Pubrec.t) = 811 + let payload = 812 + write_pubx_payload ~packet_id:p.packet_id ~reason_code:p.reason_code 813 + ~properties:p.properties 814 + in 746 815 P.write_fixed_header writer `PUBREC 0 (String.length payload); 747 816 P.write_string writer payload 748 817 749 - let write_pubrel writer ~packet_id ~reason_code ~properties = 750 - let payload = write_pubx_payload ~packet_id ~reason_code ~properties in 818 + let write_pubrel writer (p : Pubrel.t) = 819 + let payload = 820 + write_pubx_payload ~packet_id:p.packet_id ~reason_code:p.reason_code 821 + ~properties:p.properties 822 + in 751 823 P.write_fixed_header writer `PUBREL 0x02 (String.length payload); 752 824 P.write_string writer payload 753 825 754 - let write_pubcomp writer ~packet_id ~reason_code ~properties = 755 - let payload = write_pubx_payload ~packet_id ~reason_code ~properties in 826 + let write_pubcomp writer (p : Pubcomp.t) = 827 + let payload = 828 + write_pubx_payload ~packet_id:p.packet_id ~reason_code:p.reason_code 829 + ~properties:p.properties 830 + in 756 831 P.write_fixed_header writer `PUBCOMP 0 (String.length payload); 757 832 P.write_string writer payload 758 833 759 - let write_subscribe writer ~packet_id ~properties ~topics = 834 + let write_subscribe writer (s : Subscribe.t) = 760 835 let payload = 761 836 P.to_string (fun w -> 762 - P.write_uint16_be w packet_id; 763 - Property.write_properties w properties; 837 + P.write_uint16_be w s.packet_id; 838 + Property.write_properties w s.properties; 764 839 List.iter 765 840 (fun (t : Subscription.t) -> 766 841 P.write_mqtt_string w t.filter; ··· 771 846 lor (t.options.retain_handling lsl 4) 772 847 in 773 848 P.write_uint8 w opts) 774 - topics) 849 + s.topics) 775 850 in 776 851 P.write_fixed_header writer `SUBSCRIBE 0x02 (String.length payload); 777 852 P.write_string writer payload 778 853 779 - let write_suback writer ~packet_id ~properties ~reason_codes = 854 + let write_suback writer (s : Suback.t) = 780 855 let payload = 781 856 P.to_string (fun w -> 782 - P.write_uint16_be w packet_id; 783 - Property.write_properties w properties; 857 + P.write_uint16_be w s.packet_id; 858 + Property.write_properties w s.properties; 784 859 List.iter 785 860 (fun rc -> P.write_uint8 w (Reason_code.to_int rc)) 786 - reason_codes) 861 + s.reason_codes) 787 862 in 788 863 P.write_fixed_header writer `SUBACK 0 (String.length payload); 789 864 P.write_string writer payload 790 865 791 - let write_unsubscribe writer ~packet_id ~properties ~topics = 866 + let write_unsubscribe writer (u : Unsubscribe.t) = 792 867 let payload = 793 868 P.to_string (fun w -> 794 - P.write_uint16_be w packet_id; 795 - Property.write_properties w properties; 796 - List.iter (fun topic -> P.write_mqtt_string w topic) topics) 869 + P.write_uint16_be w u.packet_id; 870 + Property.write_properties w u.properties; 871 + List.iter (fun topic -> P.write_mqtt_string w topic) u.topics) 797 872 in 798 873 P.write_fixed_header writer `UNSUBSCRIBE 0x02 (String.length payload); 799 874 P.write_string writer payload 800 875 801 - let write_unsuback writer ~packet_id ~properties ~reason_codes = 876 + let write_unsuback writer (u : Unsuback.t) = 802 877 let payload = 803 878 P.to_string (fun w -> 804 - P.write_uint16_be w packet_id; 805 - Property.write_properties w properties; 879 + P.write_uint16_be w u.packet_id; 880 + Property.write_properties w u.properties; 806 881 List.iter 807 882 (fun rc -> P.write_uint8 w (Reason_code.to_int rc)) 808 - reason_codes) 883 + u.reason_codes) 809 884 in 810 885 P.write_fixed_header writer `UNSUBACK 0 (String.length payload); 811 886 P.write_string writer payload ··· 813 888 let write_pingreq writer = P.write_fixed_header writer `PINGREQ 0 0 814 889 let write_pingresp writer = P.write_fixed_header writer `PINGRESP 0 0 815 890 816 - let write_disconnect writer ~reason_code ~properties = 891 + let write_disconnect writer (d : Disconnect.t) = 817 892 let payload = 818 893 P.to_string (fun w -> 819 - if reason_code <> `Normal_disconnection || properties <> [] then begin 820 - P.write_uint8 w (Reason_code.to_int reason_code); 821 - if properties <> [] then Property.write_properties w properties 894 + if d.reason_code <> `Normal_disconnection || d.properties <> [] then begin 895 + P.write_uint8 w (Reason_code.to_int d.reason_code); 896 + if d.properties <> [] then Property.write_properties w d.properties 822 897 end) 823 898 in 824 899 P.write_fixed_header writer `DISCONNECT 0 (String.length payload); 825 900 P.write_string writer payload 826 901 827 - let write_auth writer ~reason_code ~properties = 902 + let write_auth writer (a : Auth.t) = 828 903 let payload = 829 904 P.to_string (fun w -> 830 - P.write_uint8 w (Reason_code.to_int reason_code); 831 - Property.write_properties w properties) 905 + P.write_uint8 w (Reason_code.to_int a.reason_code); 906 + Property.write_properties w a.properties) 832 907 in 833 908 P.write_fixed_header writer `AUTH 0 (String.length payload); 834 909 P.write_string writer payload 835 910 836 911 let write writer = function 837 - | Connect 838 - { clean_start; keep_alive; client_id; credentials; will; properties } -> 839 - write_connect writer ~clean_start ~keep_alive ~client_id ~credentials 840 - ~will ~properties 841 - | Connack { session_present; reason_code; properties } -> 842 - write_connack writer ~session_present ~reason_code ~properties 843 - | Publish { dup; qos; retain; topic; packet_id; payload; properties } -> 844 - write_publish writer ~dup ~qos ~retain ~topic ~packet_id ~payload 845 - ~properties 846 - | Puback { packet_id; reason_code; properties } -> 847 - write_puback writer ~packet_id ~reason_code ~properties 848 - | Pubrec { packet_id; reason_code; properties } -> 849 - write_pubrec writer ~packet_id ~reason_code ~properties 850 - | Pubrel { packet_id; reason_code; properties } -> 851 - write_pubrel writer ~packet_id ~reason_code ~properties 852 - | Pubcomp { packet_id; reason_code; properties } -> 853 - write_pubcomp writer ~packet_id ~reason_code ~properties 854 - | Subscribe { packet_id; properties; topics } -> 855 - write_subscribe writer ~packet_id ~properties ~topics 856 - | Suback { packet_id; properties; reason_codes } -> 857 - write_suback writer ~packet_id ~properties ~reason_codes 858 - | Unsubscribe { packet_id; properties; topics } -> 859 - write_unsubscribe writer ~packet_id ~properties ~topics 860 - | Unsuback { packet_id; properties; reason_codes } -> 861 - write_unsuback writer ~packet_id ~properties ~reason_codes 912 + | Connect c -> write_connect writer c 913 + | Connack c -> write_connack writer c 914 + | Publish p -> write_publish writer p 915 + | Puback p -> write_puback writer p 916 + | Pubrec p -> write_pubrec writer p 917 + | Pubrel p -> write_pubrel writer p 918 + | Pubcomp p -> write_pubcomp writer p 919 + | Subscribe s -> write_subscribe writer s 920 + | Suback s -> write_suback writer s 921 + | Unsubscribe u -> write_unsubscribe writer u 922 + | Unsuback u -> write_unsuback writer u 862 923 | Pingreq -> write_pingreq writer 863 924 | Pingresp -> write_pingresp writer 864 - | Disconnect { reason_code; properties } -> 865 - write_disconnect writer ~reason_code ~properties 866 - | Auth { reason_code; properties } -> 867 - write_auth writer ~reason_code ~properties 925 + | Disconnect d -> write_disconnect writer d 926 + | Auth a -> write_auth writer a 868 927 869 928 (** {1 Decoding} *) 870 929 ··· 903 962 else None 904 963 in 905 964 Connect 906 - { clean_start; keep_alive; client_id; credentials; will; properties } 965 + Connect. 966 + { clean_start; keep_alive; client_id; credentials; will; properties } 907 967 908 968 let read_connack ~remaining_length reader = 909 969 let flags = P.uint8 reader in ··· 913 973 let properties = 914 974 if remaining_length > 2 then Property.read_properties reader else [] 915 975 in 916 - Connack { session_present; reason_code; properties } 976 + Connack Connack.{ session_present; reason_code; properties } 917 977 918 978 let read_publish ~flags reader = 919 979 let dup = flags land 0x08 <> 0 in ··· 925 985 in 926 986 let properties = Property.read_properties reader in 927 987 let payload = P.take_rest reader in 928 - Publish { dup; qos; retain; topic; packet_id; payload; properties } 988 + Publish Publish.{ dup; qos; retain; topic; packet_id; payload; properties } 929 989 930 990 let read_pubx_common ~remaining_length reader = 931 991 let packet_id = P.uint16_be reader in ··· 942 1002 let packet_id, reason_code, properties = 943 1003 read_pubx_common ~remaining_length reader 944 1004 in 945 - Puback { packet_id; reason_code; properties } 1005 + Puback Puback.{ packet_id; reason_code; properties } 946 1006 947 1007 let read_pubrec ~remaining_length reader = 948 1008 let packet_id, reason_code, properties = 949 1009 read_pubx_common ~remaining_length reader 950 1010 in 951 - Pubrec { packet_id; reason_code; properties } 1011 + Pubrec Pubrec.{ packet_id; reason_code; properties } 952 1012 953 1013 let read_pubrel ~remaining_length reader = 954 1014 let packet_id, reason_code, properties = 955 1015 read_pubx_common ~remaining_length reader 956 1016 in 957 - Pubrel { packet_id; reason_code; properties } 1017 + Pubrel Pubrel.{ packet_id; reason_code; properties } 958 1018 959 1019 let read_pubcomp ~remaining_length reader = 960 1020 let packet_id, reason_code, properties = 961 1021 read_pubx_common ~remaining_length reader 962 1022 in 963 - Pubcomp { packet_id; reason_code; properties } 1023 + Pubcomp Pubcomp.{ packet_id; reason_code; properties } 964 1024 965 1025 let read_subscribe reader = 966 1026 let packet_id = P.uint16_be reader in ··· 980 1040 Subscription.{ filter; options } 981 1041 in 982 1042 let topics = P.many1 read_topic reader in 983 - Subscribe { packet_id; properties; topics } 1043 + Subscribe Subscribe.{ packet_id; properties; topics } 984 1044 985 1045 let read_suback reader = 986 1046 let packet_id = P.uint16_be reader in 987 1047 let properties = Property.read_properties reader in 988 1048 let read_reason_code reader = Reason_code.of_int (P.uint8 reader) in 989 1049 let reason_codes = P.many read_reason_code reader in 990 - Suback { packet_id; properties; reason_codes } 1050 + Suback Suback.{ packet_id; properties; reason_codes } 991 1051 992 1052 let read_unsubscribe reader = 993 1053 let packet_id = P.uint16_be reader in 994 1054 let properties = Property.read_properties reader in 995 1055 let topics = P.many1 P.mqtt_string reader in 996 - Unsubscribe { packet_id; properties; topics } 1056 + Unsubscribe Unsubscribe.{ packet_id; properties; topics } 997 1057 998 1058 let read_unsuback reader = 999 1059 let packet_id = P.uint16_be reader in 1000 1060 let properties = Property.read_properties reader in 1001 1061 let read_reason_code reader = Reason_code.of_int (P.uint8 reader) in 1002 1062 let reason_codes = P.many read_reason_code reader in 1003 - Unsuback { packet_id; properties; reason_codes } 1063 + Unsuback Unsuback.{ packet_id; properties; reason_codes } 1004 1064 1005 1065 let read_disconnect ~remaining_length reader = 1006 1066 if remaining_length = 0 then 1007 - Disconnect { reason_code = `Normal_disconnection; properties = [] } 1067 + Disconnect 1068 + Disconnect.{ reason_code = `Normal_disconnection; properties = [] } 1008 1069 else 1009 1070 let reason_code_byte = P.uint8 reader in 1010 1071 let reason_code = Reason_code.of_int reason_code_byte in 1011 1072 let properties = 1012 1073 if remaining_length > 1 then Property.read_properties reader else [] 1013 1074 in 1014 - Disconnect { reason_code; properties } 1075 + Disconnect Disconnect.{ reason_code; properties } 1015 1076 1016 1077 let read_auth reader = 1017 1078 let reason_code_byte = P.uint8 reader in 1018 1079 let reason_code = Reason_code.of_int reason_code_byte in 1019 1080 let properties = Property.read_properties reader in 1020 - Auth { reason_code; properties } 1081 + Auth Auth.{ reason_code; properties } 1021 1082 1022 1083 let read reader = 1023 1084 let first_byte = P.uint8 reader in
+170 -67
lib/core/v5.mli
··· 146 146 Section 2.1 *) 147 147 148 148 module Packet : sig 149 + (** {2 Connect} *) 150 + 151 + module Connect : sig 152 + type t = { 153 + clean_start : bool; 154 + keep_alive : int; 155 + client_id : string; 156 + credentials : Shared.Credentials.t option; 157 + will : Will_properties.t option; 158 + properties : Property.t list; 159 + } 160 + 161 + val pp : Format.formatter -> t -> unit 162 + end 163 + 164 + (** {2 Connack} *) 165 + 166 + module Connack : sig 167 + type t = { 168 + session_present : bool; 169 + reason_code : Reason_code.t; 170 + properties : Property.t list; 171 + } 172 + 173 + val pp : Format.formatter -> t -> unit 174 + end 175 + 176 + (** {2 Publish} *) 177 + 178 + module Publish : sig 179 + type t = { 180 + dup : bool; 181 + qos : Shared.Qos.t; 182 + retain : bool; 183 + topic : Shared.Topic.Name.t; 184 + packet_id : Shared.Packet_id.t option; 185 + payload : string; 186 + properties : Property.t list; 187 + } 188 + 189 + val pp : Format.formatter -> t -> unit 190 + end 191 + 192 + (** {2 Puback} *) 193 + 194 + module Puback : sig 195 + type t = { 196 + packet_id : Shared.Packet_id.t; 197 + reason_code : Reason_code.t; 198 + properties : Property.t list; 199 + } 200 + 201 + val pp : Format.formatter -> t -> unit 202 + end 203 + 204 + (** {2 Pubrec} *) 205 + 206 + module Pubrec : sig 207 + type t = { 208 + packet_id : Shared.Packet_id.t; 209 + reason_code : Reason_code.t; 210 + properties : Property.t list; 211 + } 212 + 213 + val pp : Format.formatter -> t -> unit 214 + end 215 + 216 + (** {2 Pubrel} *) 217 + 218 + module Pubrel : sig 219 + type t = { 220 + packet_id : Shared.Packet_id.t; 221 + reason_code : Reason_code.t; 222 + properties : Property.t list; 223 + } 224 + 225 + val pp : Format.formatter -> t -> unit 226 + end 227 + 228 + (** {2 Pubcomp} *) 229 + 230 + module Pubcomp : sig 231 + type t = { 232 + packet_id : Shared.Packet_id.t; 233 + reason_code : Reason_code.t; 234 + properties : Property.t list; 235 + } 236 + 237 + val pp : Format.formatter -> t -> unit 238 + end 239 + 240 + (** {2 Subscribe} *) 241 + 242 + module Subscribe : sig 243 + type t = { 244 + packet_id : Shared.Packet_id.t; 245 + properties : Property.t list; 246 + topics : Subscription.t list; 247 + } 248 + 249 + val pp : Format.formatter -> t -> unit 250 + end 251 + 252 + (** {2 Suback} *) 253 + 254 + module Suback : sig 255 + type t = { 256 + packet_id : Shared.Packet_id.t; 257 + properties : Property.t list; 258 + reason_codes : Reason_code.t list; 259 + } 260 + 261 + val pp : Format.formatter -> t -> unit 262 + end 263 + 264 + (** {2 Unsubscribe} *) 265 + 266 + module Unsubscribe : sig 267 + type t = { 268 + packet_id : Shared.Packet_id.t; 269 + properties : Property.t list; 270 + topics : Shared.Topic.Filter.t list; 271 + } 272 + 273 + val pp : Format.formatter -> t -> unit 274 + end 275 + 276 + (** {2 Unsuback} *) 277 + 278 + module Unsuback : sig 279 + type t = { 280 + packet_id : Shared.Packet_id.t; 281 + properties : Property.t list; 282 + reason_codes : Reason_code.t list; 283 + } 284 + 285 + val pp : Format.formatter -> t -> unit 286 + end 287 + 288 + (** {2 Disconnect} *) 289 + 290 + module Disconnect : sig 291 + type t = { reason_code : Reason_code.t; properties : Property.t list } 292 + 293 + val pp : Format.formatter -> t -> unit 294 + end 295 + 296 + (** {2 Auth} *) 297 + 298 + module Auth : sig 299 + type t = { reason_code : Reason_code.t; properties : Property.t list } 300 + 301 + val pp : Format.formatter -> t -> unit 302 + end 303 + 304 + (** {2 Packet Type} *) 305 + 149 306 type t = 150 - | Connect of { 151 - clean_start : bool; 152 - keep_alive : int; 153 - client_id : string; 154 - credentials : Shared.Credentials.t option; 155 - will : Will_properties.t option; 156 - properties : Property.t list; 157 - } 158 - | Connack of { 159 - session_present : bool; 160 - reason_code : Reason_code.t; 161 - properties : Property.t list; 162 - } 163 - | Publish of { 164 - dup : bool; 165 - qos : Shared.Qos.t; 166 - retain : bool; 167 - topic : Shared.Topic.Name.t; 168 - packet_id : Shared.Packet_id.t option; 169 - payload : string; 170 - properties : Property.t list; 171 - } 172 - | Puback of { 173 - packet_id : Shared.Packet_id.t; 174 - reason_code : Reason_code.t; 175 - properties : Property.t list; 176 - } 177 - | Pubrec of { 178 - packet_id : Shared.Packet_id.t; 179 - reason_code : Reason_code.t; 180 - properties : Property.t list; 181 - } 182 - | Pubrel of { 183 - packet_id : Shared.Packet_id.t; 184 - reason_code : Reason_code.t; 185 - properties : Property.t list; 186 - } 187 - | Pubcomp of { 188 - packet_id : Shared.Packet_id.t; 189 - reason_code : Reason_code.t; 190 - properties : Property.t list; 191 - } 192 - | Subscribe of { 193 - packet_id : Shared.Packet_id.t; 194 - properties : Property.t list; 195 - topics : Subscription.t list; 196 - } 197 - | Suback of { 198 - packet_id : Shared.Packet_id.t; 199 - properties : Property.t list; 200 - reason_codes : Reason_code.t list; 201 - } 202 - | Unsubscribe of { 203 - packet_id : Shared.Packet_id.t; 204 - properties : Property.t list; 205 - topics : Shared.Topic.Filter.t list; 206 - } 207 - | Unsuback of { 208 - packet_id : Shared.Packet_id.t; 209 - properties : Property.t list; 210 - reason_codes : Reason_code.t list; 211 - } 307 + | Connect of Connect.t 308 + | Connack of Connack.t 309 + | Publish of Publish.t 310 + | Puback of Puback.t 311 + | Pubrec of Pubrec.t 312 + | Pubrel of Pubrel.t 313 + | Pubcomp of Pubcomp.t 314 + | Subscribe of Subscribe.t 315 + | Suback of Suback.t 316 + | Unsubscribe of Unsubscribe.t 317 + | Unsuback of Unsuback.t 212 318 | Pingreq 213 319 | Pingresp 214 - | Disconnect of { 215 - reason_code : Reason_code.t; 216 - properties : Property.t list; 217 - } 218 - | Auth of { reason_code : Reason_code.t; properties : Property.t list } 320 + | Disconnect of Disconnect.t 321 + | Auth of Auth.t 219 322 220 323 val pp : Format.formatter -> t -> unit 221 324 val write : Bytesrw.Bytes.Writer.t -> t -> unit
+70 -53
lib/eio/client.ml
··· 100 100 match version with 101 101 | `V3_1_1 -> 102 102 Transport.V3 103 - (Mqtte.V3.Packet.Publish { dup; qos; retain; topic; packet_id; payload }) 103 + (Mqtte.V3.Packet.Publish 104 + Mqtte.V3.Packet.Publish. 105 + { dup; qos; retain; topic; packet_id; payload }) 104 106 | `V5_0 -> 105 107 Transport.V5 106 108 (Mqtte.V5.Packet.Publish 107 - { dup; qos; retain; topic; packet_id; properties = []; payload }) 109 + Mqtte.V5.Packet.Publish. 110 + { dup; qos; retain; topic; packet_id; properties = []; payload }) 108 111 109 112 let handle_incoming_packet t packet = 110 113 match packet with ··· 175 178 send_packet t 176 179 (Transport.V5 177 180 (Mqtte.V5.Packet.Puback 178 - { 179 - packet_id = id; 180 - reason_code = `Success; 181 - properties = []; 182 - })) 181 + Mqtte.V5.Packet.Puback. 182 + { 183 + packet_id = id; 184 + reason_code = `Success; 185 + properties = []; 186 + })) 183 187 | `Exactly_once, Some id -> 184 188 send_packet t 185 189 (Transport.V5 186 190 (Mqtte.V5.Packet.Pubrec 187 - { 188 - packet_id = id; 189 - reason_code = `Success; 190 - properties = []; 191 - })); 191 + Mqtte.V5.Packet.Pubrec. 192 + { 193 + packet_id = id; 194 + reason_code = `Success; 195 + properties = []; 196 + })); 192 197 let state = Protocol.Qos2_receiver.create id in 193 198 Protocol.Inflight.add t.session.qos2_recv_inflight id state 194 199 | _ -> ()); ··· 201 206 send_packet t 202 207 (Transport.V5 203 208 (Mqtte.V5.Packet.Pubrel 204 - { 205 - packet_id = p.packet_id; 206 - reason_code = `Success; 207 - properties = []; 208 - })) 209 + Mqtte.V5.Packet.Pubrel. 210 + { 211 + packet_id = p.packet_id; 212 + reason_code = `Success; 213 + properties = []; 214 + })) 209 215 | Mqtte.V5.Packet.Pubrel p -> 210 216 Log.debug (fun m -> m "Received PUBREL for %d" p.packet_id); 211 217 send_packet t 212 218 (Transport.V5 213 219 (Mqtte.V5.Packet.Pubcomp 214 - { 215 - packet_id = p.packet_id; 216 - reason_code = `Success; 217 - properties = []; 218 - })); 220 + Mqtte.V5.Packet.Pubcomp. 221 + { 222 + packet_id = p.packet_id; 223 + reason_code = `Success; 224 + properties = []; 225 + })); 219 226 Protocol.Inflight.remove t.session.qos2_recv_inflight p.packet_id 220 227 |> ignore 221 228 | Mqtte.V5.Packet.Pubcomp p -> ··· 319 326 | `V3_1_1 -> 320 327 Transport.V3 321 328 (Mqtte.V3.Packet.Connect 322 - { 323 - client_id = config.client_id; 324 - clean_session = config.clean_session; 325 - keep_alive = config.keep_alive; 326 - credentials = config.credentials; 327 - will = config.will; 328 - }) 329 + Mqtte.V3.Packet.Connect. 330 + { 331 + client_id = config.client_id; 332 + clean_session = config.clean_session; 333 + keep_alive = config.keep_alive; 334 + credentials = config.credentials; 335 + will = config.will; 336 + }) 329 337 | `V5_0 -> 330 338 Transport.V5 331 339 (Mqtte.V5.Packet.Connect 332 - { 333 - client_id = config.client_id; 334 - clean_start = config.clean_session; 335 - keep_alive = config.keep_alive; 336 - properties = []; 337 - credentials = config.credentials; 338 - will = 339 - Option.map 340 - (fun w -> 341 - Mqtte.V5.Will_properties. 342 - { 343 - will_properties = []; 344 - will_topic = Mqtte.Will.topic w; 345 - will_payload = Mqtte.Will.payload w; 346 - will_qos = Mqtte.Will.qos w; 347 - will_retain = Mqtte.Will.retain w; 348 - }) 349 - config.will; 350 - }) 340 + Mqtte.V5.Packet.Connect. 341 + { 342 + client_id = config.client_id; 343 + clean_start = config.clean_session; 344 + keep_alive = config.keep_alive; 345 + properties = []; 346 + credentials = config.credentials; 347 + will = 348 + Option.map 349 + (fun w -> 350 + Mqtte.V5.Will_properties. 351 + { 352 + will_properties = []; 353 + will_topic = Mqtte.Will.topic w; 354 + will_payload = Mqtte.Will.payload w; 355 + will_qos = Mqtte.Will.qos w; 356 + will_retain = Mqtte.Will.retain w; 357 + }) 358 + config.will; 359 + }) 351 360 in 352 361 353 362 (* Send CONNECT packet *) ··· 419 428 Option.value reason_code ~default:`Normal_disconnection 420 429 in 421 430 Transport.V5 422 - (Mqtte.V5.Packet.Disconnect { reason_code; properties = [] }) 431 + (Mqtte.V5.Packet.Disconnect 432 + Mqtte.V5.Packet.Disconnect.{ reason_code; properties = [] }) 423 433 in 424 434 send_packet t disconnect_packet; 425 435 t.sleep 0.1; ··· 479 489 let topics = 480 490 List.map (fun filter -> Mqtte.V3.Subscription.{ filter; qos }) topics 481 491 in 482 - Transport.V3 (Mqtte.V3.Packet.Subscribe { packet_id = id; topics }) 492 + Transport.V3 493 + (Mqtte.V3.Packet.Subscribe 494 + Mqtte.V3.Packet.Subscribe.{ packet_id = id; topics }) 483 495 | `V5_0 -> 484 496 let topics = 485 497 List.map ··· 489 501 topics 490 502 in 491 503 Transport.V5 492 - (Mqtte.V5.Packet.Subscribe { packet_id = id; properties = []; topics }) 504 + (Mqtte.V5.Packet.Subscribe 505 + Mqtte.V5.Packet.Subscribe. 506 + { packet_id = id; properties = []; topics }) 493 507 in 494 508 send_packet t packet; 495 509 Eio.Promise.await promise ··· 503 517 let packet = 504 518 match t.config.version with 505 519 | `V3_1_1 -> 506 - Transport.V3 (Mqtte.V3.Packet.Unsubscribe { packet_id = id; topics }) 520 + Transport.V3 521 + (Mqtte.V3.Packet.Unsubscribe 522 + Mqtte.V3.Packet.Unsubscribe.{ packet_id = id; topics }) 507 523 | `V5_0 -> 508 524 Transport.V5 509 525 (Mqtte.V5.Packet.Unsubscribe 510 - { packet_id = id; properties = []; topics }) 526 + Mqtte.V5.Packet.Unsubscribe. 527 + { packet_id = id; properties = []; topics }) 511 528 in 512 529 send_packet t packet
+121 -109
test/test_mqtt.ml
··· 89 89 let test_v3_connect () = 90 90 let packet = 91 91 V3.Connect 92 - { 93 - clean_session = true; 94 - keep_alive = 60; 95 - client_id = "test-client"; 96 - will = None; 97 - credentials = Some (`Username_password ("user", "pass")); 98 - } 92 + V3.Connect. 93 + { 94 + clean_session = true; 95 + keep_alive = 60; 96 + client_id = "test-client"; 97 + will = None; 98 + credentials = Some (`Username_password ("user", "pass")); 99 + } 99 100 in 100 101 let decoded = roundtrip_v3 packet in 101 102 match decoded with ··· 111 112 | _ -> Alcotest.fail "Expected Connect packet" 112 113 113 114 let test_v3_connack () = 114 - let packet = V3.Connack { session_present = true; return_code = `Accepted } in 115 + let packet = 116 + V3.Connack V3.Connack.{ session_present = true; return_code = `Accepted } 117 + in 115 118 let decoded = roundtrip_v3 packet in 116 119 match decoded with 117 120 | V3.Connack c -> ··· 124 127 let test_v3_publish_qos0 () = 125 128 let packet = 126 129 V3.Publish 127 - { 128 - dup = false; 129 - qos = `At_most_once; 130 - retain = false; 131 - topic = "test/topic"; 132 - packet_id = None; 133 - payload = "Hello, MQTT!"; 134 - } 130 + V3.Publish. 131 + { 132 + dup = false; 133 + qos = `At_most_once; 134 + retain = false; 135 + topic = "test/topic"; 136 + packet_id = None; 137 + payload = "Hello, MQTT!"; 138 + } 135 139 in 136 140 let decoded = roundtrip_v3 packet in 137 141 match decoded with ··· 145 149 let test_v3_publish_qos1 () = 146 150 let packet = 147 151 V3.Publish 148 - { 149 - dup = false; 150 - qos = `At_least_once; 151 - retain = true; 152 - topic = "test/retained"; 153 - packet_id = Some 1234; 154 - payload = "Retained message"; 155 - } 152 + V3.Publish. 153 + { 154 + dup = false; 155 + qos = `At_least_once; 156 + retain = true; 157 + topic = "test/retained"; 158 + packet_id = Some 1234; 159 + payload = "Retained message"; 160 + } 156 161 in 157 162 let decoded = roundtrip_v3 packet in 158 163 match decoded with ··· 165 170 let test_v3_subscribe () = 166 171 let packet = 167 172 V3.Subscribe 168 - { 169 - packet_id = 42; 170 - topics = 171 - [ 172 - Mqtte.V3.Subscription.{ filter = "topic/a"; qos = `At_most_once }; 173 - Mqtte.V3.Subscription.{ filter = "topic/b"; qos = `At_least_once }; 174 - Mqtte.V3.Subscription.{ filter = "topic/c"; qos = `Exactly_once }; 175 - ]; 176 - } 173 + V3.Subscribe. 174 + { 175 + packet_id = 42; 176 + topics = 177 + [ 178 + Mqtte.V3.Subscription.{ filter = "topic/a"; qos = `At_most_once }; 179 + Mqtte.V3.Subscription.{ filter = "topic/b"; qos = `At_least_once }; 180 + Mqtte.V3.Subscription.{ filter = "topic/c"; qos = `Exactly_once }; 181 + ]; 182 + } 177 183 in 178 184 let decoded = roundtrip_v3 packet in 179 185 match decoded with ··· 185 191 let test_v3_suback () = 186 192 let packet = 187 193 V3.Suback 188 - { 189 - packet_id = 42; 190 - return_codes = 191 - [ `Granted_qos_0; `Granted_qos_1; `Granted_qos_2; `Failure ]; 192 - } 194 + V3.Suback. 195 + { 196 + packet_id = 42; 197 + return_codes = 198 + [ `Granted_qos_0; `Granted_qos_1; `Granted_qos_2; `Failure ]; 199 + } 193 200 in 194 201 let decoded = roundtrip_v3 packet in 195 202 match decoded with ··· 224 231 let test_v5_connect () = 225 232 let packet = 226 233 V5.Connect 227 - { 228 - clean_start = true; 229 - keep_alive = 120; 230 - client_id = "v5-client"; 231 - will = None; 232 - credentials = Some (`Username_password ("admin", "secret")); 233 - properties = 234 - [ 235 - Mqtte.V5.Property.Session_expiry_interval 3600l; 236 - Mqtte.V5.Property.Receive_maximum 100; 237 - ]; 238 - } 234 + V5.Connect. 235 + { 236 + clean_start = true; 237 + keep_alive = 120; 238 + client_id = "v5-client"; 239 + will = None; 240 + credentials = Some (`Username_password ("admin", "secret")); 241 + properties = 242 + [ 243 + Mqtte.V5.Property.Session_expiry_interval 3600l; 244 + Mqtte.V5.Property.Receive_maximum 100; 245 + ]; 246 + } 239 247 in 240 248 let decoded = roundtrip_v5 packet in 241 249 match decoded with ··· 249 257 let test_v5_connack () = 250 258 let packet = 251 259 V5.Connack 252 - { 253 - session_present = false; 254 - reason_code = `Success; 255 - properties = 256 - [ 257 - Mqtte.V5.Property.Topic_alias_maximum 10; 258 - Mqtte.V5.Property.Maximum_packet_size 65536l; 259 - ]; 260 - } 260 + V5.Connack. 261 + { 262 + session_present = false; 263 + reason_code = `Success; 264 + properties = 265 + [ 266 + Mqtte.V5.Property.Topic_alias_maximum 10; 267 + Mqtte.V5.Property.Maximum_packet_size 65536l; 268 + ]; 269 + } 261 270 in 262 271 let decoded = roundtrip_v5 packet in 263 272 match decoded with ··· 268 277 let test_v5_publish_with_properties () = 269 278 let packet = 270 279 V5.Publish 271 - { 272 - dup = false; 273 - qos = `At_least_once; 274 - retain = false; 275 - topic = "sensor/data"; 276 - packet_id = Some 100; 277 - payload = "{\"temp\": 22.5}"; 278 - properties = 279 - [ 280 - Mqtte.V5.Property.Content_type "application/json"; 281 - Mqtte.V5.Property.Message_expiry_interval 3600l; 282 - Mqtte.V5.Property.Topic_alias 5; 283 - ]; 284 - } 280 + V5.Publish. 281 + { 282 + dup = false; 283 + qos = `At_least_once; 284 + retain = false; 285 + topic = "sensor/data"; 286 + packet_id = Some 100; 287 + payload = "{\"temp\": 22.5}"; 288 + properties = 289 + [ 290 + Mqtte.V5.Property.Content_type "application/json"; 291 + Mqtte.V5.Property.Message_expiry_interval 3600l; 292 + Mqtte.V5.Property.Topic_alias 5; 293 + ]; 294 + } 285 295 in 286 296 let decoded = roundtrip_v5 packet in 287 297 match decoded with ··· 294 304 let test_v5_subscribe () = 295 305 let packet = 296 306 V5.Subscribe 297 - { 298 - packet_id = 55; 299 - properties = []; 300 - topics = 301 - [ 302 - Mqtte.V5.Subscription. 303 - { 304 - filter = "home/+/temperature"; 305 - options = 306 - Mqtte.V5.Subscription_options. 307 - { 308 - qos = `At_least_once; 309 - no_local = true; 310 - retain_as_published = false; 311 - retain_handling = 0; 312 - }; 313 - }; 314 - Mqtte.V5.Subscription. 315 - { 316 - filter = "home/#"; 317 - options = 318 - Mqtte.V5.Subscription_options. 319 - { 320 - qos = `Exactly_once; 321 - no_local = false; 322 - retain_as_published = true; 323 - retain_handling = 1; 324 - }; 325 - }; 326 - ]; 327 - } 307 + V5.Subscribe. 308 + { 309 + packet_id = 55; 310 + properties = []; 311 + topics = 312 + [ 313 + Mqtte.V5.Subscription. 314 + { 315 + filter = "home/+/temperature"; 316 + options = 317 + Mqtte.V5.Subscription_options. 318 + { 319 + qos = `At_least_once; 320 + no_local = true; 321 + retain_as_published = false; 322 + retain_handling = 0; 323 + }; 324 + }; 325 + Mqtte.V5.Subscription. 326 + { 327 + filter = "home/#"; 328 + options = 329 + Mqtte.V5.Subscription_options. 330 + { 331 + qos = `Exactly_once; 332 + no_local = false; 333 + retain_as_published = true; 334 + retain_handling = 1; 335 + }; 336 + }; 337 + ]; 338 + } 328 339 in 329 340 let decoded = roundtrip_v5 packet in 330 341 match decoded with ··· 336 347 let test_v5_disconnect_with_reason () = 337 348 let packet = 338 349 V5.Disconnect 339 - { 340 - reason_code = `Session_taken_over; 341 - properties = 342 - [ Mqtte.V5.Property.Reason_string "Another client connected" ]; 343 - } 350 + V5.Disconnect. 351 + { 352 + reason_code = `Session_taken_over; 353 + properties = 354 + [ Mqtte.V5.Property.Reason_string "Another client connected" ]; 355 + } 344 356 in 345 357 let decoded = roundtrip_v5 packet in 346 358 match decoded with