DTN controller and policy language for satellite networks
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

borealis: Fix build errors and simplify module structure

- Add (wrapped false) to all library dune files for direct module access
- Rewrite admin.ml to use Cbort codecs instead of manual CBOR encoding
- Fix Eio API: stream_socket_ty type parameter, Cstruct for read_exact
- Split mutable_stats from immutable stats type in engine
- Rename config constructor to make_config to avoid shadowing
- Fix existential type pattern in CLA interface
- Fix && operator shadowing in predicate.ml with if-then-else
- Fix route test to use current time for contact window
- Update ocamlformat to 0.28.1

All 18 tests pass.

+526 -559
+1 -1
.ocamlformat
··· 1 - version=0.27.0 1 + version=0.28.1
+3
adapters/cla.ml
··· 15 15 16 16 type t = T : (module S with type t = 'a) * 'a -> t 17 17 18 + let make (type a) (m : (module S with type t = a)) (instance : a) : t = 19 + T (m, instance) 20 + 18 21 let send (T ((module M), t)) bundle next_hop = M.send t bundle next_hop 19 22 let close (T ((module M), t)) = M.close t 20 23 let name (T ((module M), _)) = M.name
+7 -5
adapters/cla.mli
··· 5 5 6 6 (** Convergence Layer Adapter interface. 7 7 8 - CLAs provide the transport layer for bundle transfer. Different CLAs 9 - support different transports (TCP, UDP, serial, etc.) *) 8 + CLAs provide the transport layer for bundle transfer. Different CLAs support 9 + different transports (TCP, UDP, serial, etc.) *) 10 10 11 11 (** {1 CLA Interface} *) 12 12 13 - type send_result = Ok | Error of string 14 - (** Result of sending a bundle. *) 13 + type send_result = Ok | Error of string (** Result of sending a bundle. *) 15 14 16 15 module type S = sig 17 16 type t ··· 27 26 (** [close t] closes the CLA. *) 28 27 end 29 28 30 - type t = (module S with type t = 'a) * 'a 29 + type t 31 30 (** Existential CLA wrapper. *) 31 + 32 + val make : (module S with type t = 'a) -> 'a -> t 33 + (** [make (module M) instance] creates a CLA wrapper. *) 32 34 33 35 val send : t -> Bundle.t -> Bundle.eid -> send_result Eio.Promise.t 34 36 (** [send cla bundle next_hop] sends using the CLA. *)
+2 -1
adapters/dune
··· 1 1 (library 2 2 (name adapters) 3 3 (public_name borealis.adapters) 4 - (libraries borealis.forwarding bundle tcpcl eio fmt logs)) 4 + (wrapped false) 5 + (libraries borealis.forwarding bundle tcpcl eio cstruct fmt logs))
+44 -38
adapters/tcpcl_adapter.ml
··· 8 8 type state = Connecting | Connected | Closed 9 9 10 10 type t = { 11 - socket : Eio.Net.stream_socket_ty Eio.Resource.t; 11 + socket : [ `Generic ] Eio.Net.stream_socket_ty Eio.Resource.t; 12 12 local_eid : Bundle.eid; 13 13 mutable peer_eid : string option; 14 14 mutable state : state; ··· 24 24 | Bundle.Dtn s -> "dtn://" ^ s 25 25 | Bundle.Ipn (node, service) -> Printf.sprintf "ipn:%Ld.%Ld" node service 26 26 27 - let send_bytes t data = 28 - Eio.Flow.copy_string data (t.socket :> _ Eio.Flow.sink) 27 + let send_bytes t data = Eio.Flow.copy_string data (t.socket :> _ Eio.Flow.sink) 29 28 30 29 let recv_bytes t n = 31 - let buf = Bytes.create n in 32 - Eio.Flow.read_exact (t.socket :> _ Eio.Flow.source) buf; 33 - Bytes.to_string buf 30 + let cs = Cstruct.create n in 31 + Eio.Flow.read_exact (t.socket :> _ Eio.Flow.source) cs; 32 + Cstruct.to_string cs 34 33 35 34 let send_contact_header t = 36 35 let header = ··· 67 66 (* Read SESS_INIT: 2+8+8 = 18 bytes header + variable *) 68 67 let fixed = recv_bytes t 18 in 69 68 let keepalive = (Char.code fixed.[0] lsl 8) lor Char.code fixed.[1] in 70 - let node_id_len = 71 - (Char.code fixed.[16] lsl 8) lor Char.code fixed.[17] 72 - in 69 + let node_id_len = (Char.code fixed.[16] lsl 8) lor Char.code fixed.[17] in 73 70 let node_id = recv_bytes t node_id_len in 74 71 let _ext_len = recv_bytes t 4 in 75 72 Ok ··· 94 91 let transfer_id = 95 92 let open Int64 in 96 93 let b i = of_int (Char.code header.[i + 1]) in 97 - logor (shift_left (b 0) 56) 98 - (logor (shift_left (b 1) 48) 99 - (logor (shift_left (b 2) 40) 100 - (logor (shift_left (b 3) 32) 101 - (logor (shift_left (b 4) 24) 102 - (logor (shift_left (b 5) 16) 94 + logor 95 + (shift_left (b 0) 56) 96 + (logor 97 + (shift_left (b 1) 48) 98 + (logor 99 + (shift_left (b 2) 40) 100 + (logor 101 + (shift_left (b 3) 32) 102 + (logor 103 + (shift_left (b 4) 24) 104 + (logor 105 + (shift_left (b 5) 16) 103 106 (logor (shift_left (b 6) 8) (b 7))))))) 104 107 in 105 108 let len_bytes = recv_bytes t 8 in ··· 107 110 let open Int64 in 108 111 let b i = of_int (Char.code len_bytes.[i]) in 109 112 to_int 110 - (logor (shift_left (b 0) 56) 111 - (logor (shift_left (b 1) 48) 112 - (logor (shift_left (b 2) 40) 113 - (logor (shift_left (b 3) 32) 114 - (logor (shift_left (b 4) 24) 115 - (logor (shift_left (b 5) 16) 113 + (logor 114 + (shift_left (b 0) 56) 115 + (logor 116 + (shift_left (b 1) 48) 117 + (logor 118 + (shift_left (b 2) 40) 119 + (logor 120 + (shift_left (b 3) 32) 121 + (logor 122 + (shift_left (b 4) 24) 123 + (logor 124 + (shift_left (b 5) 16) 116 125 (logor (shift_left (b 6) 8) (b 7)))))))) 117 126 in 118 127 let data = recv_bytes t data_len in ··· 126 135 is_end = flags_byte land 0x01 <> 0; 127 136 } 128 137 in 129 - Ok 130 - (Tcpcl.Xfer_ack 131 - { flags; transfer_id = 0L; ack_length = 0L }) 138 + Ok (Tcpcl.Xfer_ack { flags; transfer_id = 0L; ack_length = 0L }) 132 139 | Some Tcpcl.Keepalive -> Ok Tcpcl.Keepalive 133 140 | Some Tcpcl.Sess_term -> 134 141 Ok ··· 192 199 let peer_eid t = t.peer_eid 193 200 let is_connected t = t.state = Connected 194 201 195 - let send t bundle = 202 + let send_bundle t bundle = 196 203 if t.state <> Connected then Error "not connected" 197 204 else 198 205 try ··· 201 208 t.next_transfer_id <- Int64.succ t.next_transfer_id; 202 209 let segment = 203 210 Tcpcl.Xfer_segment 204 - { 205 - flags = { is_start = true; is_end = true }; 206 - transfer_id; 207 - data; 208 - } 211 + { flags = { is_start = true; is_end = true }; transfer_id; data } 209 212 in 210 213 send_bytes t (Tcpcl.encode_message segment); 211 214 (* Wait for ACK *) 212 - (match recv_message t with 215 + match recv_message t with 213 216 | Ok (Tcpcl.Xfer_ack _) -> Ok () 214 217 | Ok msg -> 215 218 Log.warn (fun m -> m "Expected ACK, got %a" Tcpcl.pp_message msg); 216 219 Ok () 217 - | Error e -> Error e) 220 + | Error e -> Error e 218 221 with exn -> Error (Printexc.to_string exn) 219 222 220 223 let receive t = ··· 232 235 } 233 236 in 234 237 send_bytes t (Tcpcl.encode_message ack); 235 - Bundle.decode seg.data |> Result.map_error Bundle.pp_error |> fun r -> 236 - Result.map_error (fun pp -> Fmt.str "%a" pp ()) r 237 - | Ok msg -> Error (Fmt.str "expected XFER_SEGMENT, got %a" Tcpcl.pp_message msg) 238 + Bundle.decode seg.data 239 + |> Result.map_error (fun e -> Fmt.str "%a" Bundle.pp_error e) 240 + | Ok msg -> 241 + Error (Fmt.str "expected XFER_SEGMENT, got %a" Tcpcl.pp_message msg) 238 242 | Error e -> Error e 239 243 240 - let close t = 244 + let close_connection t = 241 245 if t.state = Connected then ( 242 246 let term = 243 247 Tcpcl.Sess_term ··· 247 251 t.state <- Closed) 248 252 249 253 (* CLA interface *) 250 - let send_cla t bundle _next_hop = 254 + let send t bundle _next_hop = 251 255 let p, r = Eio.Promise.create () in 252 - (match send t bundle with 256 + (match send_bundle t bundle with 253 257 | Ok () -> Eio.Promise.resolve r Cla.Ok 254 258 | Error e -> Eio.Promise.resolve r (Cla.Error e)); 255 259 p 260 + 261 + let close t = close_connection t
+6 -6
adapters/tcpcl_adapter.mli
··· 15 15 16 16 val connect : 17 17 sw:Eio.Switch.t -> 18 - net:_ Eio.Net.t -> 18 + net:[> [ `Generic ] Eio.Net.ty ] Eio.Net.t -> 19 19 local_eid:Bundle.eid -> 20 20 addr:Eio.Net.Sockaddr.stream -> 21 21 t ··· 25 25 val accept : 26 26 sw:Eio.Switch.t -> 27 27 local_eid:Bundle.eid -> 28 - _ Eio.Net.stream_socket -> 28 + [ `Generic ] Eio.Net.stream_socket_ty Eio.Resource.t -> 29 29 t 30 30 (** [accept ~sw ~local_eid socket] accepts a TCPCL connection. *) 31 31 ··· 34 34 35 35 (** {1 Transfer} *) 36 36 37 - val send : t -> Bundle.t -> (unit, string) result 38 - (** [send t bundle] sends a bundle over the connection. *) 37 + val send_bundle : t -> Bundle.t -> (unit, string) result 38 + (** [send_bundle t bundle] sends a bundle over the connection. *) 39 39 40 40 val receive : t -> (Bundle.t, string) result 41 41 (** [receive t] receives a bundle from the connection. *) 42 42 43 43 (** {1 Session Management} *) 44 44 45 - val close : t -> unit 46 - (** [close t] gracefully terminates the session. *) 45 + val close_connection : t -> unit 46 + (** [close_connection t] gracefully terminates the session. *) 47 47 48 48 val is_connected : t -> bool 49 49 (** [is_connected t] returns true if the connection is active. *)
+20 -46
bin/borctl.ml
··· 22 22 23 23 let output_file = 24 24 let doc = "Output file for admin bundle (default: stdout)." in 25 - Arg.(value & opt (some string) None & info [ "o"; "output" ] ~docv:"FILE" ~doc) 25 + Arg.( 26 + value & opt (some string) None & info [ "o"; "output" ] ~docv:"FILE" ~doc) 26 27 27 28 (* Create admin bundle *) 28 29 let make_admin_bundle ~source ~dest record = 29 30 let timestamp : Bundle.timestamp = 30 - { 31 - time = Int64.of_float (Unix.gettimeofday () *. 1000.); 32 - seq = 0L; 33 - } 31 + { time = Int64.of_float (Unix.gettimeofday () *. 1000.); seq = 0L } 34 32 in 35 - Admin.Admin.make_bundle ~source ~destination:dest ~timestamp record 33 + Admin.make_bundle ~source ~destination:dest ~timestamp record 36 34 37 35 let write_bundle bundle output = 38 36 let data = Bundle.encode bundle in ··· 46 44 47 45 (* Status command *) 48 46 let status source dest output () = 49 - let record = Admin.Admin.Query Admin.Admin.Query_status in 47 + let record = Admin.Query Admin.Query_status in 50 48 let bundle = make_admin_bundle ~source ~dest record in 51 49 write_bundle bundle output; 52 50 Log.info (fun m -> m "Generated status query bundle") ··· 54 52 let status_cmd = 55 53 let doc = "Query node status." in 56 54 let info = Cmd.info "status" ~doc in 57 - let dest = 58 - Term.( 59 - const (fun n -> Bundle.Ipn (n, 0L)) 60 - $ node_id) 61 - in 62 - let source = 63 - Term.( 64 - const (fun n -> Bundle.Ipn (n, 0L)) 65 - $ source_node) 66 - in 55 + let dest = Term.(const (fun n -> Bundle.Ipn (n, 0L)) $ node_id) in 56 + let source = Term.(const (fun n -> Bundle.Ipn (n, 0L)) $ source_node) in 67 57 Cmd.v info 68 58 Term.(const status $ source $ dest $ output_file $ Vlog.setup "borctl") 69 59 70 60 (* Contacts command *) 71 61 let contacts source dest output () = 72 - let record = Admin.Admin.Query Admin.Admin.Query_contacts in 62 + let record = Admin.Query Admin.Query_contacts in 73 63 let bundle = make_admin_bundle ~source ~dest record in 74 64 write_bundle bundle output; 75 65 Log.info (fun m -> m "Generated contacts query bundle") ··· 77 67 let contacts_cmd = 78 68 let doc = "Query contact plan." in 79 69 let info = Cmd.info "contacts" ~doc in 80 - let dest = 81 - Term.( 82 - const (fun n -> Bundle.Ipn (n, 0L)) 83 - $ node_id) 84 - in 85 - let source = 86 - Term.( 87 - const (fun n -> Bundle.Ipn (n, 0L)) 88 - $ source_node) 89 - in 70 + let dest = Term.(const (fun n -> Bundle.Ipn (n, 0L)) $ node_id) in 71 + let source = Term.(const (fun n -> Bundle.Ipn (n, 0L)) $ source_node) in 90 72 Cmd.v info 91 73 Term.(const contacts $ source $ dest $ output_file $ Vlog.setup "borctl") 92 74 ··· 98 80 99 81 (* Bundles command *) 100 82 let bundles source dest filter output () = 101 - let record = Admin.Admin.Query (Admin.Admin.Query_bundles { filter }) in 83 + let record = Admin.Query (Admin.Query_bundles { filter }) in 102 84 let bundle = make_admin_bundle ~source ~dest record in 103 85 write_bundle bundle output; 104 86 Log.info (fun m -> m "Generated bundles query bundle") 105 87 106 88 let filter_arg = 107 89 let doc = "Filter expression for bundle listing." in 108 - Arg.(value & opt (some string) None & info [ "f"; "filter" ] ~docv:"FILTER" ~doc) 90 + Arg.( 91 + value & opt (some string) None & info [ "f"; "filter" ] ~docv:"FILTER" ~doc) 109 92 110 93 let bundles_cmd = 111 94 let doc = "List stored bundles." in 112 95 let info = Cmd.info "bundles" ~doc in 113 - let dest = 114 - Term.( 115 - const (fun n -> Bundle.Ipn (n, 0L)) 116 - $ node_id) 117 - in 118 - let source = 119 - Term.( 120 - const (fun n -> Bundle.Ipn (n, 0L)) 121 - $ source_node) 122 - in 96 + let dest = Term.(const (fun n -> Bundle.Ipn (n, 0L)) $ node_id) in 97 + let source = Term.(const (fun n -> Bundle.Ipn (n, 0L)) $ source_node) in 123 98 Cmd.v info 124 99 Term.( 125 100 const bundles $ source $ dest $ filter_arg $ output_file ··· 150 125 exit 1 151 126 | Ok bundle -> ( 152 127 Fmt.pr "@[<v>Bundle:@, %a@]@." Bundle.pp bundle; 153 - match Admin.Admin.extract bundle with 128 + match Admin.extract bundle with 154 129 | Error e -> Log.err (fun m -> m "Not an admin bundle: %s" e) 155 - | Ok record -> Fmt.pr "@[<v>Admin record:@, %a@]@." Admin.Admin.pp record) 130 + | Ok record -> Fmt.pr "@[<v>Admin record:@, %a@]@." Admin.pp record) 156 131 157 132 let input_file = 158 133 let doc = "Input file (default: stdin)." in ··· 170 145 [ 171 146 `S Manpage.s_description; 172 147 `P 173 - "borctl generates admin bundles for controlling borealis nodes. \ 174 - These bundles can be injected into the network for delivery."; 148 + "borctl generates admin bundles for controlling borealis nodes. These \ 149 + bundles can be injected into the network for delivery."; 175 150 `S Manpage.s_commands; 176 151 `P "$(b,borctl status) - Query node status"; 177 152 `P "$(b,borctl contacts) - Query contact plan"; ··· 183 158 in 184 159 Cmd.info "borctl" ~version:"0.1.0" ~doc ~man 185 160 186 - let default = 187 - Term.(ret (const (fun () -> `Help (`Pager, None)) $ const ())) 161 + let default = Term.(ret (const (fun () -> `Help (`Pager, None)) $ const ())) 188 162 189 163 let () = 190 164 let cmd =
+15 -11
bin/borealis.ml
··· 13 13 14 14 (* Default policy: forward via CGR if route exists, otherwise store *) 15 15 let default_policy = 16 - let open Policy in 17 - let open Policy.Policy in 18 - if_pred Predicate.is_admin (action Action.accept_custody) 19 - (when_temporal (Temporal.route_exists (Bundle.Ipn (0L, 0L))) forward_route 20 - store_until_route) 16 + Policy.if_pred Predicate.is_admin 17 + (Policy.action Action.accept_custody) 18 + (Policy.when_temporal 19 + (Temporal.route_exists (Bundle.Ipn (0L, 0L))) 20 + Policy.forward_route Policy.store_until_route) 21 21 22 22 let run node_id listen_port () = 23 23 Eio_main.run @@ fun env -> 24 24 let net = Eio.Stdenv.net env in 25 25 let sw = Eio.Stdenv.fs env in 26 26 ignore (net, sw); 27 - let config = Engine.config ~node_id in 27 + let config = Engine.make_config ~node_id in 28 28 let contact_plan = Cgr.Contact_plan.empty in 29 29 let engine = Engine.create ~config ~policy:default_policy ~contact_plan in 30 30 Log.info (fun m -> ··· 38 38 Eio.Time.sleep (Eio.Stdenv.clock env) 10.0; 39 39 let stats = Engine.stats engine in 40 40 Log.info (fun m -> m "Stats: %a" Engine.pp_stats stats); 41 - let expired = Engine.cleanup_expired engine ~current_time:(Unix.gettimeofday ()) in 42 - if expired > 0 then Log.info (fun m -> m "Cleaned up %d expired bundles" expired); 41 + let expired = 42 + Engine.cleanup_expired engine ~current_time:(Unix.gettimeofday ()) 43 + in 44 + if expired > 0 then 45 + Log.info (fun m -> m "Cleaned up %d expired bundles" expired); 43 46 loop () 44 47 in 45 48 loop () ··· 59 62 Cmd.v info Term.(const run $ node_id $ listen_port $ Vlog.setup "borealis") 60 63 61 64 let info = 62 - let doc = "DTN daemon with policy DSL for software-defined satellite networking" in 65 + let doc = 66 + "DTN daemon with policy DSL for software-defined satellite networking" 67 + in 63 68 let man = 64 69 [ 65 70 `S Manpage.s_description; ··· 74 79 in 75 80 Cmd.info "borealis" ~version:"0.1.0" ~doc ~man 76 81 77 - let default = 78 - Term.(ret (const (fun () -> `Help (`Pager, None)) $ const ())) 82 + let default = Term.(ret (const (fun () -> `Help (`Pager, None)) $ const ())) 79 83 80 84 let () = 81 85 let cmd = Cmd.group info ~default [ run_cmd ] in
+17 -17
fuzz/fuzz_admin.ml
··· 15 15 16 16 let gen_status = 17 17 map 18 - [ gen_eid; float; range 1000; range 1000; range 1000; range 1000; range 100 ] 18 + [ 19 + gen_eid; float; range 1000; range 1000; range 1000; range 1000; range 100; 20 + ] 19 21 (fun node_id uptime_secs stored fwd del drop contacts -> 20 - Admin.Admin.Status_report 22 + Admin.Status_report 21 23 { 22 24 node_id; 23 25 uptime_secs; ··· 31 33 let gen_query = 32 34 choose 33 35 [ 34 - const (Admin.Admin.Query Admin.Admin.Query_status); 35 - const (Admin.Admin.Query Admin.Admin.Query_contacts); 36 - const (Admin.Admin.Query Admin.Admin.Query_policy); 37 - map [ option bytes ] (fun filter -> 38 - Admin.Admin.Query (Admin.Admin.Query_bundles { filter })); 36 + const (Admin.Query Admin.Query_status); 37 + const (Admin.Query Admin.Query_contacts); 38 + const (Admin.Query Admin.Query_policy); 39 + map 40 + [ option bytes ] 41 + (fun filter -> Admin.Query (Admin.Query_bundles { filter })); 39 42 ] 40 43 41 44 let gen_contact = 42 - map 43 - [ bytes; bytes; float; float; float; float ] 45 + map [ bytes; bytes; float; float; float; float ] 44 46 (fun from_node to_node start_time stop_time rate_bps owlt_secs -> 45 - { Admin.Admin.from_node; to_node; start_time; stop_time; rate_bps; owlt_secs }) 47 + { Admin.from_node; to_node; start_time; stop_time; rate_bps; owlt_secs }) 46 48 47 49 let gen_contact_plan = 48 - map [ list gen_contact ] (fun contacts -> 49 - Admin.Admin.Contact_update { contacts }) 50 + map [ list gen_contact ] (fun contacts -> Admin.Contact_update { contacts }) 50 51 51 52 let gen_record = choose [ gen_status; gen_query; gen_contact_plan ] 52 53 53 54 let test_roundtrip record = 54 - let encoded = Admin.Admin.encode record in 55 - match Admin.Admin.decode encoded with 55 + let encoded = Admin.encode record in 56 + match Admin.decode encoded with 56 57 | Error _ -> () 57 58 | Ok decoded -> 58 - let re_encoded = Admin.Admin.encode decoded in 59 + let re_encoded = Admin.encode decoded in 59 60 check_eq ~pp:Fmt.string encoded re_encoded 60 61 61 - let () = 62 - add_test ~name:"admin roundtrip" [ gen_record ] test_roundtrip 62 + let () = add_test ~name:"admin roundtrip" [ gen_record ] test_roundtrip
+187 -188
lib/admin/admin.ml
··· 3 3 SPDX-License-Identifier: ISC 4 4 ---------------------------------------------------------------------------*) 5 5 6 + (** Admin record CBOR schema. 7 + 8 + Uses Cbort codecs for type-safe encoding/decoding. Wire format uses 9 + integer-keyed maps for compactness (following COSE/CWT conventions). *) 10 + 6 11 type status = { 7 12 node_id : Bundle.eid; 8 13 uptime_secs : float; ··· 23 28 } 24 29 25 30 type contact_plan = { contacts : contact list } 26 - 27 31 type config_delta = { key : string; value : string option } 28 32 29 33 type query = ··· 47 51 | Query of query 48 52 | Response of response 49 53 50 - module C = Cbort.Cbor 54 + (** {1 CBOR Schema Definitions} 51 55 52 - let eid_to_cbor = Bundle.eid_to_cbor 56 + All records use integer keys for wire compactness. *) 53 57 54 - let eid_of_cbor cbor = 55 - match Bundle.eid_of_cbor cbor with 56 - | Ok eid -> eid 57 - | Error msg -> failwith msg 58 + (** EID codec - delegates to Bundle module's CBOR representation. *) 59 + let eid_codec : Bundle.eid Cbort.t = 60 + Cbort.conv 61 + (fun cbor -> 62 + match Bundle.eid_of_cbor cbor with 63 + | Ok eid -> Ok eid 64 + | Error msg -> Error msg) 65 + Bundle.eid_to_cbor Cbort.any 58 66 59 - let status_to_cbor s = 60 - C.Map 61 - [ 62 - (C.Text "node_id", eid_to_cbor s.node_id); 63 - (C.Text "uptime", C.Float s.uptime_secs); 64 - (C.Text "stored", C.Int (Z.of_int s.bundles_stored)); 65 - (C.Text "forwarded", C.Int (Z.of_int s.bundles_forwarded)); 66 - (C.Text "delivered", C.Int (Z.of_int s.bundles_delivered)); 67 - (C.Text "dropped", C.Int (Z.of_int s.bundles_dropped)); 68 - (C.Text "active_contacts", C.Int (Z.of_int s.active_contacts)); 69 - ] 67 + (** Status record codec. 70 68 71 - let status_of_cbor = function 72 - | C.Map pairs -> 73 - let find key = 74 - List.find_map 75 - (function C.Text k, v when k = key -> Some v | _ -> None) 76 - pairs 77 - in 78 - let get_int key = 79 - match find key with 80 - | Some (C.Int n) -> Z.to_int n 81 - | _ -> failwith ("missing " ^ key) 82 - in 83 - let get_float key = 84 - match find key with 85 - | Some (C.Float f) -> f 86 - | Some (C.Int n) -> Z.to_float n 87 - | _ -> failwith ("missing " ^ key) 88 - in 89 - { 90 - node_id = 91 - (match find "node_id" with 92 - | Some c -> eid_of_cbor c 93 - | None -> failwith "missing node_id"); 94 - uptime_secs = get_float "uptime"; 95 - bundles_stored = get_int "stored"; 96 - bundles_forwarded = get_int "forwarded"; 97 - bundles_delivered = get_int "delivered"; 98 - bundles_dropped = get_int "dropped"; 99 - active_contacts = get_int "active_contacts"; 100 - } 101 - | _ -> failwith "expected map for status" 69 + Wire format (integer-keyed map): 70 + {v 71 + { 72 + 1: eid, // node_id 73 + 2: float, // uptime_secs 74 + 3: int, // bundles_stored 75 + 4: int, // bundles_forwarded 76 + 5: int, // bundles_delivered 77 + 6: int, // bundles_dropped 78 + 7: int // active_contacts 79 + } 80 + v} *) 81 + let status_codec : status Cbort.t = 82 + let open Cbort.Obj_int in 83 + finish 84 + (let* node_id = mem 1 (fun s -> s.node_id) eid_codec in 85 + let* uptime_secs = mem 2 (fun s -> s.uptime_secs) Cbort.float in 86 + let* bundles_stored = mem 3 (fun s -> s.bundles_stored) Cbort.int in 87 + let* bundles_forwarded = mem 4 (fun s -> s.bundles_forwarded) Cbort.int in 88 + let* bundles_delivered = mem 5 (fun s -> s.bundles_delivered) Cbort.int in 89 + let* bundles_dropped = mem 6 (fun s -> s.bundles_dropped) Cbort.int in 90 + let* active_contacts = mem 7 (fun s -> s.active_contacts) Cbort.int in 91 + return 92 + { 93 + node_id; 94 + uptime_secs; 95 + bundles_stored; 96 + bundles_forwarded; 97 + bundles_delivered; 98 + bundles_dropped; 99 + active_contacts; 100 + }) 102 101 103 - let contact_to_cbor c = 104 - C.Array 105 - [ 106 - C.Text c.from_node; 107 - C.Text c.to_node; 108 - C.Float c.start_time; 109 - C.Float c.stop_time; 110 - C.Float c.rate_bps; 111 - C.Float c.owlt_secs; 112 - ] 102 + (** Contact record codec. 113 103 114 - let contact_of_cbor = function 115 - | C.Array [ C.Text from_node; C.Text to_node; start; stop; rate; owlt ] -> 116 - let to_float = function 117 - | C.Float f -> f 118 - | C.Int n -> Z.to_float n 119 - | _ -> failwith "expected number" 120 - in 121 - { 122 - from_node; 123 - to_node; 124 - start_time = to_float start; 125 - stop_time = to_float stop; 126 - rate_bps = to_float rate; 127 - owlt_secs = to_float owlt; 128 - } 129 - | _ -> failwith "expected array for contact" 104 + Wire format (array for compactness): 105 + {v [from_node, to_node, start_time, stop_time, rate_bps, owlt_secs] v} *) 106 + let contact_codec : contact Cbort.t = 107 + let open Cbort in 108 + let tuple6 a b c d e f = 109 + conv 110 + (fun (a, (b, (c, (d, (e, f))))) -> 111 + Ok 112 + { 113 + from_node = a; 114 + to_node = b; 115 + start_time = c; 116 + stop_time = d; 117 + rate_bps = e; 118 + owlt_secs = f; 119 + }) 120 + (fun r -> 121 + ( r.from_node, 122 + (r.to_node, (r.start_time, (r.stop_time, (r.rate_bps, r.owlt_secs)))) 123 + )) 124 + (tuple2 a (tuple2 b (tuple2 c (tuple2 d (tuple2 e f))))) 125 + in 126 + tuple6 string string float float float float 130 127 131 - let contact_plan_to_cbor cp = 132 - C.Array (List.map contact_to_cbor cp.contacts) 128 + (** Contact plan codec. *) 129 + let contact_plan_codec : contact_plan Cbort.t = 130 + Cbort.map 131 + (fun contacts -> { contacts }) 132 + (fun cp -> cp.contacts) 133 + (Cbort.array contact_codec) 133 134 134 - let contact_plan_of_cbor = function 135 - | C.Array items -> { contacts = List.map contact_of_cbor items } 136 - | _ -> failwith "expected array for contact_plan" 135 + (** Config delta codec. 136 + 137 + Wire format: [key, value] where value is text or null. *) 138 + let config_delta_codec : config_delta Cbort.t = 139 + Cbort.map 140 + (fun (key, value) -> { key; value }) 141 + (fun d -> (d.key, d.value)) 142 + (Cbort.tuple2 Cbort.string (Cbort.nullable Cbort.string)) 143 + 144 + (** Query codec using tag-based variants. 137 145 138 - let config_delta_to_cbor d = 139 - C.Array 146 + Wire format: 147 + - Tag 0: Query_status (null payload) 148 + - Tag 1: Query_contacts (null payload) 149 + - Tag 2: Query_policy (null payload) 150 + - Tag 3: Query_bundles (filter: text?) *) 151 + let query_codec : query Cbort.t = 152 + let open Cbort.Variant in 153 + variant 140 154 [ 141 - C.Text d.key; 142 - (match d.value with Some v -> C.Text v | None -> C.Simple 22); 143 - (* null *) 155 + case0 0 Query_status (function Query_status -> true | _ -> false); 156 + case0 1 Query_contacts (function Query_contacts -> true | _ -> false); 157 + case0 2 Query_policy (function Query_policy -> true | _ -> false); 158 + case 3 159 + (Cbort.nullable Cbort.string) 160 + (fun filter -> Query_bundles { filter }) 161 + (function Query_bundles { filter } -> Some filter | _ -> None); 144 162 ] 145 163 146 - let config_delta_of_cbor = function 147 - | C.Array [ C.Text key; C.Text v ] -> { key; value = Some v } 148 - | C.Array [ C.Text key; C.Simple 22 ] -> { key; value = None } 149 - | _ -> failwith "expected array for config_delta" 164 + (** Response codec using tag-based variants. 150 165 151 - let query_to_cbor = function 152 - | Query_status -> C.Array [ C.Int Z.zero ] 153 - | Query_contacts -> C.Array [ C.Int Z.one ] 154 - | Query_policy -> C.Array [ C.Int (Z.of_int 2) ] 155 - | Query_bundles { filter } -> 156 - C.Array 157 - [ 158 - C.Int (Z.of_int 3); 159 - (match filter with Some f -> C.Text f | None -> C.Simple 22); 160 - ] 166 + Wire format: 167 + - Tag 0: Response_status (status) 168 + - Tag 1: Response_contacts (contact_plan) 169 + - Tag 2: Response_policy (source: text) 170 + - Tag 3: Response_bundles (count: int, bundle_ids: text list) 171 + - Tag 255: Response_error (code: int, message: text) *) 172 + let response_codec : response Cbort.t = 173 + let open Cbort.Variant in 174 + variant 175 + [ 176 + case 0 status_codec 177 + (fun s -> Response_status s) 178 + (function Response_status s -> Some s | _ -> None); 179 + case 1 contact_plan_codec 180 + (fun cp -> Response_contacts cp) 181 + (function Response_contacts cp -> Some cp | _ -> None); 182 + case 2 Cbort.string 183 + (fun source -> Response_policy { source }) 184 + (function Response_policy { source } -> Some source | _ -> None); 185 + case 3 186 + (Cbort.tuple2 Cbort.int (Cbort.array Cbort.string)) 187 + (fun (count, bundle_ids) -> Response_bundles { count; bundle_ids }) 188 + (function 189 + | Response_bundles { count; bundle_ids } -> Some (count, bundle_ids) 190 + | _ -> None); 191 + case 255 192 + (Cbort.tuple2 Cbort.int Cbort.string) 193 + (fun (code, message) -> Response_error { code; message }) 194 + (function 195 + | Response_error { code; message } -> Some (code, message) | _ -> None); 196 + ] 161 197 162 - let z0 = Z.zero 163 - let z1 = Z.one 164 - let z2 = Z.of_int 2 165 - let z3 = Z.of_int 3 166 - let z255 = Z.of_int 255 198 + (** Admin record codec using tag-based variants. 167 199 168 - let query_of_cbor = function 169 - | C.Array [ C.Int n ] when Z.equal n z0 -> Query_status 170 - | C.Array [ C.Int n ] when Z.equal n z1 -> Query_contacts 171 - | C.Array [ C.Int n ] when Z.equal n z2 -> Query_policy 172 - | C.Array [ C.Int n; C.Text f ] when Z.equal n z3 -> 173 - Query_bundles { filter = Some f } 174 - | C.Array [ C.Int n; C.Simple 22 ] when Z.equal n z3 -> 175 - Query_bundles { filter = None } 176 - | _ -> failwith "invalid query" 200 + Wire format: 201 + - Tag 1: Status_report (status) 202 + - Tag 2: Policy_update (compiled: bytes, source: text) 203 + - Tag 3: Contact_update (contact_plan) 204 + - Tag 4: Config_update (config_delta list) 205 + - Tag 5: Query (query) 206 + - Tag 6: Response (response) *) 207 + let codec : t Cbort.t = 208 + let open Cbort.Variant in 209 + variant 210 + [ 211 + case 1 status_codec 212 + (fun s -> Status_report s) 213 + (function Status_report s -> Some s | _ -> None); 214 + case 2 215 + (Cbort.tuple2 Cbort.bytes Cbort.string) 216 + (fun (compiled, source) -> Policy_update { compiled; source }) 217 + (function 218 + | Policy_update { compiled; source } -> Some (compiled, source) 219 + | _ -> None); 220 + case 3 contact_plan_codec 221 + (fun cp -> Contact_update cp) 222 + (function Contact_update cp -> Some cp | _ -> None); 223 + case 4 224 + (Cbort.array config_delta_codec) 225 + (fun deltas -> Config_update deltas) 226 + (function Config_update deltas -> Some deltas | _ -> None); 227 + case 5 query_codec 228 + (fun q -> Query q) 229 + (function Query q -> Some q | _ -> None); 230 + case 6 response_codec 231 + (fun r -> Response r) 232 + (function Response r -> Some r | _ -> None); 233 + ] 177 234 178 - let response_to_cbor = function 179 - | Response_status s -> C.Array [ C.Int z0; status_to_cbor s ] 180 - | Response_contacts cp -> C.Array [ C.Int z1; contact_plan_to_cbor cp ] 181 - | Response_policy { source } -> C.Array [ C.Int z2; C.Text source ] 182 - | Response_bundles { count; bundle_ids } -> 183 - C.Array 184 - [ 185 - C.Int z3; 186 - C.Int (Z.of_int count); 187 - C.Array (List.map (fun s -> C.Text s) bundle_ids); 188 - ] 189 - | Response_error { code; message } -> 190 - C.Array [ C.Int z255; C.Int (Z.of_int code); C.Text message ] 235 + (** {1 Encoding/Decoding} *) 191 236 192 - let response_of_cbor = function 193 - | C.Array [ C.Int n; s ] when Z.equal n z0 -> 194 - Response_status (status_of_cbor s) 195 - | C.Array [ C.Int n; cp ] when Z.equal n z1 -> 196 - Response_contacts (contact_plan_of_cbor cp) 197 - | C.Array [ C.Int n; C.Text source ] when Z.equal n z2 -> 198 - Response_policy { source } 199 - | C.Array [ C.Int n; C.Int count; C.Array ids ] when Z.equal n z3 -> 200 - Response_bundles 201 - { 202 - count = Z.to_int count; 203 - bundle_ids = 204 - List.map 205 - (function C.Text s -> s | _ -> failwith "expected text") 206 - ids; 207 - } 208 - | C.Array [ C.Int n; C.Int code; C.Text message ] when Z.equal n z255 -> 209 - Response_error { code = Z.to_int code; message } 210 - | _ -> failwith "invalid response" 237 + let encode t = Cbort.encode_string codec t 211 238 212 - let to_cbor = function 213 - | Status_report s -> C.Array [ C.Int z1; status_to_cbor s ] 214 - | Policy_update { compiled; source } -> 215 - C.Array [ C.Int z2; C.Bytes compiled; C.Text source ] 216 - | Contact_update cp -> C.Array [ C.Int z3; contact_plan_to_cbor cp ] 217 - | Config_update deltas -> 218 - C.Array [ C.Int (Z.of_int 4); C.Array (List.map config_delta_to_cbor deltas) ] 219 - | Query q -> C.Array [ C.Int (Z.of_int 5); query_to_cbor q ] 220 - | Response r -> C.Array [ C.Int (Z.of_int 6); response_to_cbor r ] 239 + let decode bytes = 240 + match Cbort.decode_string codec bytes with 241 + | Ok v -> Ok v 242 + | Error e -> Error (Cbort.Error.to_string e) 221 243 222 - let z4 = Z.of_int 4 223 - let z5 = Z.of_int 5 224 - let z6 = Z.of_int 6 225 - 226 - let of_cbor cbor = 227 - try 228 - Ok 229 - (match cbor with 230 - | C.Array [ C.Int n; s ] when Z.equal n z1 -> 231 - Status_report (status_of_cbor s) 232 - | C.Array [ C.Int n; C.Bytes compiled; C.Text source ] when Z.equal n z2 -> 233 - Policy_update { compiled; source } 234 - | C.Array [ C.Int n; cp ] when Z.equal n z3 -> 235 - Contact_update (contact_plan_of_cbor cp) 236 - | C.Array [ C.Int n; C.Array deltas ] when Z.equal n z4 -> 237 - Config_update (List.map config_delta_of_cbor deltas) 238 - | C.Array [ C.Int n; q ] when Z.equal n z5 -> Query (query_of_cbor q) 239 - | C.Array [ C.Int n; r ] when Z.equal n z6 -> Response (response_of_cbor r) 240 - | _ -> failwith "invalid admin record") 241 - with Failure msg -> Error msg 242 - 243 - let encode t = Cbort.Cbor.to_string (to_cbor t) 244 - 245 - let decode bytes = 246 - match Cbort.Cbor.of_string bytes with 247 - | Ok cbor -> of_cbor cbor 248 - | Error msg -> Error msg 244 + (** {1 Bundle Helpers} *) 249 245 250 246 let make_bundle ~source ~destination ~timestamp record = 251 247 let payload = encode record in 252 248 let flags = { Bundle.bundle_flags_default with is_admin_record = true } in 253 - Bundle.v ~flags ~source ~destination ~creation_timestamp:timestamp 254 - ~payload () 249 + Bundle.v ~flags ~source ~destination ~creation_timestamp:timestamp ~payload () 255 250 256 251 let extract bundle = 257 252 if not bundle.Bundle.primary.flags.is_admin_record then ··· 263 258 264 259 let is_admin_bundle bundle = bundle.Bundle.primary.flags.is_admin_record 265 260 261 + (** {1 Pretty Printers} *) 262 + 266 263 let pp_status ppf s = 267 264 Fmt.pf ppf 268 265 "@[<v>node: %a@,\ ··· 271 268 forwarded: %d@,\ 272 269 delivered: %d@,\ 273 270 dropped: %d@,\ 274 - contacts: %d@]" Bundle.pp_eid s.node_id s.uptime_secs s.bundles_stored 275 - s.bundles_forwarded s.bundles_delivered s.bundles_dropped s.active_contacts 271 + contacts: %d@]" 272 + Bundle.pp_eid s.node_id s.uptime_secs s.bundles_stored s.bundles_forwarded 273 + s.bundles_delivered s.bundles_dropped s.active_contacts 276 274 277 275 let pp_contact ppf c = 278 276 Fmt.pf ppf "%s->%s [%.0f-%.0f] @%.0f bps (owlt: %.3fs)" c.from_node c.to_node ··· 291 289 Fmt.pf ppf "contacts: %a" Fmt.(list pp_contact) cp.contacts 292 290 | Response_policy { source } -> Fmt.pf ppf "policy: %s" source 293 291 | Response_bundles { count; bundle_ids } -> 294 - Fmt.pf ppf "bundles: %d [%a]" count Fmt.(list ~sep:comma string) bundle_ids 295 - | Response_error { code; message } -> 296 - Fmt.pf ppf "error(%d): %s" code message 292 + Fmt.pf ppf "bundles: %d [%a]" count 293 + Fmt.(list ~sep:comma string) 294 + bundle_ids 295 + | Response_error { code; message } -> Fmt.pf ppf "error(%d): %s" code message 297 296 298 297 let pp ppf = function 299 298 | Status_report s -> Fmt.pf ppf "status_report(%a)" pp_status s
+8 -14
lib/admin/admin.mli
··· 50 50 | Query_status 51 51 | Query_contacts 52 52 | Query_policy 53 - | Query_bundles of { filter : string option } 54 - (** Query types. *) 53 + | Query_bundles of { filter : string option } (** Query types. *) 55 54 56 55 type response = 57 56 | Response_status of status 58 57 | Response_contacts of contact_plan 59 58 | Response_policy of { source : string } 60 59 | Response_bundles of { count : int; bundle_ids : string list } 61 - | Response_error of { code : int; message : string } 62 - (** Query responses. *) 60 + | Response_error of { code : int; message : string } (** Query responses. *) 63 61 64 62 type t = 65 63 | Status_report of status ··· 67 65 | Contact_update of contact_plan 68 66 | Config_update of config_delta list 69 67 | Query of query 70 - | Response of response 71 - (** Administrative record types. *) 68 + | Response of response (** Administrative record types. *) 72 69 73 - (** {1 Encoding} *) 70 + (** {1 CBOR Schema} *) 74 71 75 - val to_cbor : t -> Cbort.Cbor.t 76 - (** [to_cbor record] encodes the admin record as CBOR. *) 72 + val codec : t Cbort.t 73 + (** CBOR codec for admin records. *) 74 + 75 + (** {1 Encoding/Decoding} *) 77 76 78 77 val encode : t -> string 79 78 (** [encode record] encodes to CBOR bytes. *) 80 - 81 - (** {1 Decoding} *) 82 - 83 - val of_cbor : Cbort.Cbor.t -> (t, string) result 84 - (** [of_cbor cbor] decodes an admin record. *) 85 79 86 80 val decode : string -> (t, string) result 87 81 (** [decode bytes] decodes from CBOR bytes. *)
+1
lib/admin/dune
··· 1 1 (library 2 2 (name admin) 3 3 (public_name borealis.admin) 4 + (wrapped false) 4 5 (libraries borealis.policy bundle cgr cbort fmt logs))
+3 -1
lib/forwarding/dune
··· 1 1 (library 2 2 (name forwarding) 3 3 (public_name borealis.forwarding) 4 + (wrapped false) 4 5 (libraries 5 6 borealis.policy 6 7 borealis.store ··· 8 9 bundle 9 10 cgr 10 11 fmt 11 - logs)) 12 + logs 13 + unix))
+31 -30
lib/forwarding/engine.ml
··· 11 11 admin_eid : Bundle.eid; 12 12 } 13 13 14 - let config ~node_id = 14 + let make_config ~node_id = 15 15 { 16 16 local_node = Cgr.Node.v (Int64.to_string node_id); 17 17 local_eid = Bundle.Ipn (node_id, 1L); 18 18 admin_eid = Bundle.Ipn (node_id, 0L); 19 19 } 20 20 21 - type stats = { 21 + type mutable_stats = { 22 22 mutable bundles_received : int; 23 23 mutable bundles_forwarded : int; 24 24 mutable bundles_delivered : int; ··· 31 31 store : Store.t; 32 32 mutable contact_plan : Cgr.Contact_plan.t; 33 33 mutable policy : Policy.t; 34 - stats : stats; 34 + mstats : mutable_stats; 35 35 start_time : float; 36 36 } 37 37 ··· 41 41 store = Store.create (); 42 42 contact_plan; 43 43 policy; 44 - stats = 44 + mstats = 45 45 { 46 46 bundles_received = 0; 47 47 bundles_forwarded = 0; ··· 98 98 let to_node = Cgr.Contact.to_ first_contact in 99 99 Some (Bundle.Dtn (Cgr.Node.name to_node)) 100 100 101 - let execute_action t bundle = function 101 + let rec execute_action t bundle = function 102 102 | Action.Forward { next_hop; custody = _; via } -> 103 - t.stats.bundles_forwarded <- t.stats.bundles_forwarded + 1; 103 + t.mstats.bundles_forwarded <- t.mstats.bundles_forwarded + 1; 104 104 Forward [ { bundle; next_hop; via } ] 105 105 | Action.Forward_route -> ( 106 106 match find_route t bundle.Bundle.primary.destination with ··· 113 113 match route_to_next_hop route with 114 114 | None -> Dropped "empty route" 115 115 | Some next_hop -> 116 - t.stats.bundles_forwarded <- t.stats.bundles_forwarded + 1; 116 + t.mstats.bundles_forwarded <- t.mstats.bundles_forwarded + 1; 117 117 Forward [ { bundle; next_hop; via = None } ])) 118 118 | Action.Store { until } -> 119 119 Store.store t.store ~bundle ~condition:until ~custody:false; 120 120 Stored 121 121 | Action.Drop { reason } -> 122 - t.stats.bundles_dropped <- t.stats.bundles_dropped + 1; 122 + t.mstats.bundles_dropped <- t.mstats.bundles_dropped + 1; 123 123 Dropped reason 124 - | Action.Accept_custody -> 124 + | Action.Accept_custody -> ( 125 125 (* Accept custody and forward via CGR *) 126 - (match find_route t bundle.Bundle.primary.destination with 126 + match find_route t bundle.Bundle.primary.destination with 127 127 | None -> 128 128 Store.store t.store ~bundle ~condition:Action.Until_route 129 129 ~custody:true; ··· 135 135 ~custody:true; 136 136 Stored 137 137 | Some next_hop -> 138 - t.stats.bundles_forwarded <- t.stats.bundles_forwarded + 1; 138 + t.mstats.bundles_forwarded <- t.mstats.bundles_forwarded + 1; 139 139 Forward [ { bundle; next_hop; via = None } ])) 140 140 | Action.Refuse_custody { reason } -> Dropped ("custody refused: " ^ reason) 141 141 | Action.Rate_limit _ -> ··· 143 143 execute_action t bundle Action.Forward_route 144 144 145 145 let process t bundle ~tenant = 146 - t.stats.bundles_received <- t.stats.bundles_received + 1; 146 + t.mstats.bundles_received <- t.mstats.bundles_received + 1; 147 147 let dest = bundle.Bundle.primary.destination in 148 148 149 149 (* Check if this is for us *) 150 150 if is_local t dest then ( 151 - t.stats.bundles_delivered <- t.stats.bundles_delivered + 1; 151 + t.mstats.bundles_delivered <- t.mstats.bundles_delivered + 1; 152 152 if bundle.Bundle.primary.flags.is_admin_record then Admin_handled 153 153 else Delivered) 154 154 else ··· 165 165 match Policy.eval ctx t.policy with 166 166 | Policy.No_match -> 167 167 Log.warn (fun m -> m "No policy match, dropping bundle"); 168 - t.stats.bundles_dropped <- t.stats.bundles_dropped + 1; 168 + t.mstats.bundles_dropped <- t.mstats.bundles_dropped + 1; 169 169 Dropped "no policy match" 170 170 | Policy.Actions [] -> Dropped "empty action list" 171 171 | Policy.Actions (action :: _) -> ··· 173 173 execute_action t bundle action 174 174 175 175 let process_admin t record = 176 - t.stats.admin_handled <- t.stats.admin_handled + 1; 176 + t.mstats.admin_handled <- t.mstats.admin_handled + 1; 177 177 match record with 178 178 | Admin.Query Admin.Query_status -> 179 179 let status : Admin.status = ··· 181 181 node_id = t.config.local_eid; 182 182 uptime_secs = Unix.gettimeofday () -. t.start_time; 183 183 bundles_stored = Store.count t.store; 184 - bundles_forwarded = t.stats.bundles_forwarded; 185 - bundles_delivered = t.stats.bundles_delivered; 186 - bundles_dropped = t.stats.bundles_dropped; 184 + bundles_forwarded = t.mstats.bundles_forwarded; 185 + bundles_delivered = t.mstats.bundles_delivered; 186 + bundles_dropped = t.mstats.bundles_dropped; 187 187 active_contacts = 188 188 List.length 189 189 (Cgr.Contact_plan.active_at t.contact_plan ··· 216 216 contacts 217 217 in 218 218 t.contact_plan <- Cgr.Contact_plan.of_list new_contacts; 219 - Log.info (fun m -> m "Contact plan updated with %d contacts" 220 - (List.length contacts)); 219 + Log.info (fun m -> 220 + m "Contact plan updated with %d contacts" (List.length contacts)); 221 221 None 222 222 | Admin.Policy_update { source; _ } -> 223 223 Log.info (fun m -> m "Policy update received: %s" source); ··· 232 232 List.map 233 233 (fun entry -> 234 234 Store.remove t.store entry.Store.id; 235 - t.stats.bundles_forwarded <- t.stats.bundles_forwarded + 1; 235 + t.mstats.bundles_forwarded <- t.mstats.bundles_forwarded + 1; 236 236 { bundle = entry.bundle; next_hop = to_eid; via = None }) 237 237 ready 238 238 ··· 243 243 List.iter 244 244 (fun entry -> 245 245 Store.remove t.store entry.Store.id; 246 - t.stats.bundles_dropped <- t.stats.bundles_dropped + 1) 246 + t.mstats.bundles_dropped <- t.mstats.bundles_dropped + 1) 247 247 expired; 248 248 List.length expired 249 249 ··· 258 258 | None -> None 259 259 | Some next_hop -> 260 260 Store.remove t.store entry.id; 261 - t.stats.bundles_forwarded <- t.stats.bundles_forwarded + 1; 261 + t.mstats.bundles_forwarded <- t.mstats.bundles_forwarded + 1; 262 262 Some { bundle = entry.bundle; next_hop; via = None })) 263 263 waiting 264 264 265 - type nonrec stats = stats = { 265 + type stats = { 266 266 bundles_received : int; 267 267 bundles_forwarded : int; 268 268 bundles_delivered : int; ··· 272 272 273 273 let stats t = 274 274 { 275 - bundles_received = t.stats.bundles_received; 276 - bundles_forwarded = t.stats.bundles_forwarded; 277 - bundles_delivered = t.stats.bundles_delivered; 278 - bundles_dropped = t.stats.bundles_dropped; 279 - admin_handled = t.stats.admin_handled; 275 + bundles_received = t.mstats.bundles_received; 276 + bundles_forwarded = t.mstats.bundles_forwarded; 277 + bundles_delivered = t.mstats.bundles_delivered; 278 + bundles_dropped = t.mstats.bundles_dropped; 279 + admin_handled = t.mstats.admin_handled; 280 280 } 281 281 282 282 let pp_stats ppf s = 283 - Fmt.pf ppf "@[<v>received: %d@,forwarded: %d@,delivered: %d@,dropped: %d@,admin: %d@]" 283 + Fmt.pf ppf 284 + "@[<v>received: %d@,forwarded: %d@,delivered: %d@,dropped: %d@,admin: %d@]" 284 285 s.bundles_received s.bundles_forwarded s.bundles_delivered s.bundles_dropped 285 286 s.admin_handled
+4 -8
lib/forwarding/engine.mli
··· 18 18 } 19 19 (** Engine configuration. *) 20 20 21 - val config : node_id:int64 -> config 22 - (** [config ~node_id] creates a config for node [ipn:node_id.*]. *) 21 + val make_config : node_id:int64 -> config 22 + (** [make_config ~node_id] creates a config for node [ipn:node_id.*]. *) 23 23 24 24 (** {1 Engine State} *) 25 25 ··· 27 27 (** The forwarding engine. *) 28 28 29 29 val create : 30 - config:config -> 31 - policy:Policy.t -> 32 - contact_plan:Cgr.Contact_plan.t -> 33 - t 30 + config:config -> policy:Policy.t -> contact_plan:Cgr.Contact_plan.t -> t 34 31 (** [create ~config ~policy ~contact_plan] creates a new engine. *) 35 32 36 33 val config : t -> config ··· 65 62 | Stored 66 63 | Delivered 67 64 | Dropped of string 68 - | Admin_handled 69 - (** Result of processing a bundle. *) 65 + | Admin_handled (** Result of processing a bundle. *) 70 66 71 67 val process : t -> Bundle.t -> tenant:Delegation.Name.t option -> process_result 72 68 (** [process t bundle ~tenant] processes an incoming bundle according to policy.
+3 -1
lib/policy/action.ml
··· 20 20 | Refuse_custody of { reason : string } 21 21 | Rate_limit of { tenant : Delegation.Name.t; max_rate : int } 22 22 23 - let forward ?(custody = false) ?via next_hop = Forward { next_hop; custody; via } 23 + let forward ?(custody = false) ?via next_hop = 24 + Forward { next_hop; custody; via } 25 + 24 26 let forward_route = Forward_route 25 27 let store until = Store { until } 26 28 let store_until_contact eid = Store { until = Until_contact eid }
+9 -25
lib/policy/action.mli
··· 10 10 11 11 (** {1 Convergence Layer Adapter} *) 12 12 13 - type cla = 14 - | Tcpcl 15 - | Ltp 16 - | Udp 17 - | Any_cla 18 - (** Convergence layer adapter type. *) 13 + type cla = Tcpcl | Ltp | Udp | Any_cla (** Convergence layer adapter type. *) 19 14 20 15 (** {1 Store Conditions} *) 21 16 17 + (** Conditions for releasing a stored bundle. *) 22 18 type store_condition = 23 19 | Until_contact of Bundle.eid (** Store until contact with given node. *) 24 20 | Until_route (** Store until any route to destination becomes available. *) 25 21 | Until_time of float (** Store until absolute time (epoch seconds). *) 26 22 | Until_expiry (** Store until bundle expires. *) 27 - (** Conditions for releasing a stored bundle. *) 28 23 29 24 (** {1 Action Type} *) 30 25 26 + (** A policy action. *) 31 27 type t = 32 - | Forward of { 33 - next_hop : Bundle.eid; 34 - custody : bool; 35 - via : cla option; 36 - } 28 + | Forward of { next_hop : Bundle.eid; custody : bool; via : cla option } 37 29 (** Forward to next hop, optionally taking custody. *) 38 - | Forward_route 39 - (** Forward using CGR-computed route. *) 30 + | Forward_route (** Forward using CGR-computed route. *) 40 31 | Store of { until : store_condition } 41 32 (** Store bundle until condition is met. *) 42 - | Drop of { reason : string } 43 - (** Drop bundle with given reason. *) 44 - | Accept_custody 45 - (** Accept custody transfer for this bundle. *) 46 - | Refuse_custody of { reason : string } 47 - (** Refuse custody transfer. *) 48 - | Rate_limit of { 49 - tenant : Delegation.Name.t; 50 - max_rate : int; 51 - } 33 + | Drop of { reason : string } (** Drop bundle with given reason. *) 34 + | Accept_custody (** Accept custody transfer for this bundle. *) 35 + | Refuse_custody of { reason : string } (** Refuse custody transfer. *) 36 + | Rate_limit of { tenant : Delegation.Name.t; max_rate : int } 52 37 (** Apply rate limiting for tenant. *) 53 - (** A policy action. *) 54 38 55 39 (** {1 Constructors} *) 56 40
+1
lib/policy/dune
··· 1 1 (library 2 2 (name policy) 3 3 (public_name borealis.policy) 4 + (wrapped false) 4 5 (libraries bundle cgr delegation fmt logs))
+6 -6
lib/policy/policy.ml
··· 60 60 let rec find = function 61 61 | [] -> No_match 62 62 | p :: rest -> ( 63 - match eval ctx p with 64 - | Actions _ as r -> r 65 - | No_match -> find rest) 63 + match eval ctx p with Actions _ as r -> r | No_match -> find rest) 66 64 in 67 65 find policies 68 66 69 67 let rec pp ppf = function 70 68 | Action a -> Fmt.pf ppf "action(%a)" Action.pp a 71 69 | Match (pred, then_, else_) -> 72 - Fmt.pf ppf "@[<hv2>match %a@,then %a@,else %a@]" Predicate.pp pred pp then_ 73 - pp else_ 70 + Fmt.pf ppf "@[<hv2>match %a@,then %a@,else %a@]" Predicate.pp pred pp 71 + then_ pp else_ 74 72 | When_temporal (temp, then_, else_) -> 75 73 Fmt.pf ppf "@[<hv2>when %a@,then %a@,else %a@]" Temporal.pp temp pp then_ 76 74 pp else_ 77 75 | Seq (p1, p2) -> Fmt.pf ppf "@[<hv2>%a >>@,%a@]" pp p1 pp p2 78 76 | First_match policies -> 79 - Fmt.pf ppf "@[<hv2>first_match[@,%a@,]@]" Fmt.(list ~sep:comma pp) policies 77 + Fmt.pf ppf "@[<hv2>first_match[@,%a@,]@]" 78 + Fmt.(list ~sep:comma pp) 79 + policies 80 80 81 81 let rec equal a b = 82 82 match (a, b) with
+9 -14
lib/policy/policy.mli
··· 30 30 31 31 (** {1 Policy Type} *) 32 32 33 + (** A policy tree. *) 33 34 type t = 34 - | Action of Action.t 35 - (** Terminal: execute an action. *) 35 + | Action of Action.t (** Terminal: execute an action. *) 36 36 | Match of Predicate.t * t * t 37 37 (** Conditional: if predicate then policy else policy. *) 38 38 | When_temporal of Temporal.t * t * t 39 - (** Temporal conditional: if temporal predicate then policy else policy. *) 40 - | Seq of t * t 41 - (** Sequential: execute first policy, then second. *) 42 - | First_match of t list 43 - (** First matching policy in list. *) 44 - (** A policy tree. *) 39 + (** Temporal conditional: if temporal predicate then policy else policy. 40 + *) 41 + | Seq of t * t (** Sequential: execute first policy, then second. *) 42 + | First_match of t list (** First matching policy in list. *) 45 43 46 44 (** {1 Constructors} *) 47 45 ··· 55 53 (** Alias for {!match_pred}. *) 56 54 57 55 val when_temporal : Temporal.t -> t -> t -> t 58 - (** [when_temporal temp then_policy else_policy] branches on temporal 59 - predicate. *) 56 + (** [when_temporal temp then_policy else_policy] branches on temporal predicate. 57 + *) 60 58 61 59 val seq : t -> t -> t 62 60 (** [seq p1 p2] executes [p1] then [p2]. *) ··· 95 93 } 96 94 (** Full evaluation context. *) 97 95 98 - type result = 99 - | Actions of Action.t list 100 - | No_match 101 - (** Evaluation result. *) 96 + type result = Actions of Action.t list | No_match (** Evaluation result. *) 102 97 103 98 val eval : context -> t -> result 104 99 (** [eval ctx policy] evaluates the policy in context [ctx]. *)
+2 -4
lib/policy/predicate.ml
··· 117 117 | Priority p1, Priority p2 -> equal_priority_pred p1 p2 118 118 | Lifetime_remaining s1, Lifetime_remaining s2 -> Float.equal s1 s2 119 119 | Tenant n1, Tenant n2 -> Delegation.Name.equal n1 n2 120 - | And (a1, b1), And (a2, b2) -> 121 - if equal a1 a2 then equal b1 b2 else false 122 - | Or (a1, b1), Or (a2, b2) -> 123 - if equal a1 a2 then equal b1 b2 else false 120 + | And (a1, b1), And (a2, b2) -> if equal a1 a2 then equal b1 b2 else false 121 + | Or (a1, b1), Or (a2, b2) -> if equal a1 a2 then equal b1 b2 else false 124 122 | Not p1, Not p2 -> equal p1 p2 125 123 | _ -> false
+2 -4
lib/policy/predicate.mli
··· 22 22 | Tenant of Delegation.Name.t 23 23 | And of t * t 24 24 | Or of t * t 25 - | Not of t 26 - (** A predicate over bundle properties. *) 25 + | Not of t (** A predicate over bundle properties. *) 27 26 28 27 and priority_pred = 29 28 | Bulk (** Normal priority. *) 30 29 | Normal (** Normal priority (alias). *) 31 30 | Expedited (** Expedited priority (report flags set). *) 32 - | Any_priority 33 - (** Priority predicates based on bundle flags. *) 31 + | Any_priority (** Priority predicates based on bundle flags. *) 34 32 35 33 (** {1 Constructors} *) 36 34
+2 -3
lib/policy/temporal.mli
··· 10 10 11 11 (** {1 Temporal Predicate Type} *) 12 12 13 + (** A temporal predicate over the contact plan. *) 13 14 type t = 14 15 | Contact_available of Bundle.eid 15 16 (** A contact to the given node is currently active. *) ··· 17 18 (** A contact to the given node will be available within N seconds. *) 18 19 | Route_exists of Bundle.eid 19 20 (** A route to the destination exists in the contact plan. *) 20 - | No_contacts 21 - (** No outbound contacts are currently active. *) 22 - (** A temporal predicate over the contact plan. *) 21 + | No_contacts (** No outbound contacts are currently active. *) 23 22 24 23 (** {1 Constructors} *) 25 24
+1 -2
lib/store/bundle_id.ml
··· 37 37 let equal a b = compare a b = 0 38 38 39 39 let hash t = 40 - Hashtbl.hash 41 - (t.source, t.creation_time, t.sequence, t.fragment_offset) 40 + Hashtbl.hash (t.source, t.creation_time, t.sequence, t.fragment_offset) 42 41 43 42 let pp ppf t = 44 43 match t.fragment_offset with
+2 -1
lib/store/dune
··· 1 1 (library 2 2 (name store) 3 3 (public_name borealis.store) 4 - (libraries borealis.policy bundle fmt logs)) 4 + (wrapped false) 5 + (libraries borealis.policy bundle fmt logs unix))
+1 -1
lib/store/store.ml
··· 3 3 SPDX-License-Identifier: ISC 4 4 ---------------------------------------------------------------------------*) 5 5 6 - module Condition = Policy.Action 6 + module Condition = Action 7 7 8 8 type entry = { 9 9 bundle : Bundle.t;
+5 -4
lib/store/store.mli
··· 23 23 bundle : Bundle.t; 24 24 id : Bundle_id.t; 25 25 stored_at : float; 26 - release_condition : Policy.Action.store_condition; (** @see [Policy.Action.store_condition] *) 26 + release_condition : Action.store_condition; 27 27 custody : bool; 28 28 } 29 - (** Note: The [Action] module is from [borealis.policy] library. *) 30 - (** A stored bundle with metadata. *) 29 + (** A stored bundle with metadata. 30 + 31 + Note: [Action.store_condition] is from [borealis.policy] library. *) 31 32 32 33 (** {1 Operations} *) 33 34 34 35 val store : 35 36 t -> 36 37 bundle:Bundle.t -> 37 - condition:Policy.Action.store_condition -> 38 + condition:Action.store_condition -> 38 39 custody:bool -> 39 40 unit 40 41 (** [store t ~bundle ~condition ~custody] stores the bundle with the given
+29 -26
test/test_admin.ml
··· 4 4 ---------------------------------------------------------------------------*) 5 5 6 6 let test_roundtrip_status_report () = 7 - let status : Admin.Admin.status = 7 + let status : Admin.status = 8 8 { 9 9 node_id = Bundle.Ipn (1L, 0L); 10 10 uptime_secs = 3600.0; ··· 15 15 active_contacts = 2; 16 16 } 17 17 in 18 - let record = Admin.Admin.Status_report status in 19 - let encoded = Admin.Admin.encode record in 20 - match Admin.Admin.decode encoded with 18 + let record = Admin.Status_report status in 19 + let encoded = Admin.encode record in 20 + match Admin.decode encoded with 21 21 | Error e -> Alcotest.fail ("decode failed: " ^ e) 22 - | Ok (Admin.Admin.Status_report s) -> 23 - Alcotest.(check int64) "node" 1L 22 + | Ok (Admin.Status_report s) -> 23 + Alcotest.(check int64) 24 + "node" 1L 24 25 (match s.node_id with Bundle.Ipn (n, _) -> n | _ -> -1L); 25 26 Alcotest.(check int) "stored" 10 s.bundles_stored; 26 27 Alcotest.(check int) "forwarded" 100 s.bundles_forwarded 27 28 | Ok _ -> Alcotest.fail "wrong record type" 28 29 29 30 let test_roundtrip_query () = 30 - let record = Admin.Admin.Query Admin.Admin.Query_status in 31 - let encoded = Admin.Admin.encode record in 32 - match Admin.Admin.decode encoded with 31 + let record = Admin.Query Admin.Query_status in 32 + let encoded = Admin.encode record in 33 + match Admin.decode encoded with 33 34 | Error e -> Alcotest.fail ("decode failed: " ^ e) 34 - | Ok (Admin.Admin.Query Admin.Admin.Query_status) -> () 35 + | Ok (Admin.Query Admin.Query_status) -> () 35 36 | Ok _ -> Alcotest.fail "wrong record type" 36 37 37 38 let test_roundtrip_contact_plan () = 38 39 let contacts = 39 40 [ 40 41 { 41 - Admin.Admin.from_node = "SAT-A"; 42 + Admin.from_node = "SAT-A"; 42 43 to_node = "SAT-B"; 43 44 start_time = 1000.0; 44 45 stop_time = 2000.0; ··· 46 47 owlt_secs = 0.001; 47 48 }; 48 49 { 49 - Admin.Admin.from_node = "SAT-B"; 50 + Admin.from_node = "SAT-B"; 50 51 to_node = "GROUND"; 51 52 start_time = 1500.0; 52 53 stop_time = 1600.0; ··· 55 56 }; 56 57 ] 57 58 in 58 - let record = Admin.Admin.Contact_update { contacts } in 59 - let encoded = Admin.Admin.encode record in 60 - match Admin.Admin.decode encoded with 59 + let record = Admin.Contact_update { contacts } in 60 + let encoded = Admin.encode record in 61 + match Admin.decode encoded with 61 62 | Error e -> Alcotest.fail ("decode failed: " ^ e) 62 - | Ok (Admin.Admin.Contact_update { contacts = decoded }) -> 63 + | Ok (Admin.Contact_update { contacts = decoded }) -> 63 64 Alcotest.(check int) "contact count" 2 (List.length decoded); 64 65 let c1 = List.hd decoded in 65 66 Alcotest.(check string) "from_node" "SAT-A" c1.from_node; ··· 67 68 | Ok _ -> Alcotest.fail "wrong record type" 68 69 69 70 let test_make_bundle () = 70 - let record = Admin.Admin.Query Admin.Admin.Query_status in 71 + let record = Admin.Query Admin.Query_status in 71 72 let bundle = 72 - Admin.Admin.make_bundle ~source:(Bundle.Ipn (0L, 0L)) 73 + Admin.make_bundle 74 + ~source:(Bundle.Ipn (0L, 0L)) 73 75 ~destination:(Bundle.Ipn (1L, 0L)) 74 - ~timestamp:{ time = 0L; seq = 0L } 75 - record 76 + ~timestamp:{ time = 0L; seq = 0L } record 76 77 in 77 - Alcotest.(check bool) "is admin" true 78 - bundle.Bundle.primary.flags.is_admin_record; 79 - match Admin.Admin.extract bundle with 78 + Alcotest.(check bool) 79 + "is admin" true bundle.Bundle.primary.flags.is_admin_record; 80 + match Admin.extract bundle with 80 81 | Error e -> Alcotest.fail ("extract failed: " ^ e) 81 - | Ok (Admin.Admin.Query Admin.Admin.Query_status) -> () 82 + | Ok (Admin.Query Admin.Query_status) -> () 82 83 | Ok _ -> Alcotest.fail "wrong record type" 83 84 84 85 let suite = 85 86 [ 86 - Alcotest.test_case "roundtrip status report" `Quick test_roundtrip_status_report; 87 + Alcotest.test_case "roundtrip status report" `Quick 88 + test_roundtrip_status_report; 87 89 Alcotest.test_case "roundtrip query" `Quick test_roundtrip_query; 88 - Alcotest.test_case "roundtrip contact plan" `Quick test_roundtrip_contact_plan; 90 + Alcotest.test_case "roundtrip contact plan" `Quick 91 + test_roundtrip_contact_plan; 89 92 Alcotest.test_case "make bundle" `Quick test_make_bundle; 90 93 ]
+42 -44
test/test_engine.ml
··· 5 5 6 6 let make_bundle ?(source = Bundle.Ipn (10L, 1L)) ?(dest = Bundle.Ipn (20L, 1L)) 7 7 payload = 8 - Bundle.v ~source ~destination:dest 9 - ~creation_timestamp:{ time = 0L; seq = 0L } 8 + Bundle.v ~source ~destination:dest ~creation_timestamp:{ time = 0L; seq = 0L } 10 9 ~payload () 11 10 12 11 let default_policy = 13 12 (* Forward all bundles via CGR, store if no route *) 14 - Policy.Policy.forward_route 13 + Policy.forward_route 15 14 16 15 let test_process_local_delivery () = 17 - let config = Engine.Engine.config ~node_id:1L in 16 + let config = Engine.make_config ~node_id:1L in 18 17 let engine = 19 - Engine.Engine.create ~config ~policy:default_policy 18 + Engine.create ~config ~policy:default_policy 20 19 ~contact_plan:Cgr.Contact_plan.empty 21 20 in 22 21 (* Bundle destined for local node *) 23 22 let bundle = make_bundle ~dest:(Bundle.Ipn (1L, 1L)) "local payload" in 24 - match Engine.Engine.process engine bundle ~tenant:None with 25 - | Engine.Engine.Delivered -> () 23 + match Engine.process engine bundle ~tenant:None with 24 + | Engine.Delivered -> () 26 25 | result -> 27 26 Alcotest.fail 28 27 (Fmt.str "expected Delivered, got %s" 29 28 (match result with 30 - | Engine.Engine.Forward _ -> "Forward" 31 - | Engine.Engine.Stored -> "Stored" 32 - | Engine.Engine.Dropped r -> "Dropped: " ^ r 33 - | Engine.Engine.Admin_handled -> "Admin_handled" 34 - | Engine.Engine.Delivered -> "Delivered")) 29 + | Engine.Forward _ -> "Forward" 30 + | Engine.Stored -> "Stored" 31 + | Engine.Dropped r -> "Dropped: " ^ r 32 + | Engine.Admin_handled -> "Admin_handled" 33 + | Engine.Delivered -> "Delivered")) 35 34 36 35 let test_process_no_route_stores () = 37 - let config = Engine.Engine.config ~node_id:1L in 36 + let config = Engine.make_config ~node_id:1L in 38 37 let engine = 39 - Engine.Engine.create ~config ~policy:default_policy 38 + Engine.create ~config ~policy:default_policy 40 39 ~contact_plan:Cgr.Contact_plan.empty 41 40 in 42 41 (* Bundle to remote node, no contacts defined *) 43 42 let bundle = make_bundle ~dest:(Bundle.Ipn (99L, 1L)) "remote payload" in 44 - match Engine.Engine.process engine bundle ~tenant:None with 45 - | Engine.Engine.Stored -> 46 - Alcotest.(check int) "stored count" 1 47 - (Store.Store.count (Engine.Engine.store engine)) 43 + match Engine.process engine bundle ~tenant:None with 44 + | Engine.Stored -> 45 + Alcotest.(check int) "stored count" 1 (Store.count (Engine.store engine)) 48 46 | result -> 49 47 Alcotest.fail 50 48 (Fmt.str "expected Stored, got %s" 51 49 (match result with 52 - | Engine.Engine.Forward _ -> "Forward" 53 - | Engine.Engine.Stored -> "Stored" 54 - | Engine.Engine.Dropped r -> "Dropped: " ^ r 55 - | Engine.Engine.Admin_handled -> "Admin_handled" 56 - | Engine.Engine.Delivered -> "Delivered")) 50 + | Engine.Forward _ -> "Forward" 51 + | Engine.Stored -> "Stored" 52 + | Engine.Dropped r -> "Dropped: " ^ r 53 + | Engine.Admin_handled -> "Admin_handled" 54 + | Engine.Delivered -> "Delivered")) 57 55 58 56 let test_process_with_route_forwards () = 59 - let config = Engine.Engine.config ~node_id:1L in 60 - (* Create a contact from node 1 to node 99 *) 57 + let config = Engine.make_config ~node_id:1L in 58 + (* Create a contact from node 1 to node 99, starting now *) 59 + let now = Unix.gettimeofday () in 61 60 let contact = 62 - Cgr.Contact.v ~from:(Cgr.Node.v "1") ~to_:(Cgr.Node.v "99") ~start:0. 63 - ~stop:1000. ~rate:1_000_000. () 61 + Cgr.Contact.v ~from:(Cgr.Node.v "1") ~to_:(Cgr.Node.v "99") ~start:now 62 + ~stop:(now +. 1000.) ~rate:1_000_000. () 64 63 in 65 64 let contact_plan = Cgr.Contact_plan.of_list [ contact ] in 66 - let engine = 67 - Engine.Engine.create ~config ~policy:default_policy ~contact_plan 68 - in 65 + let engine = Engine.create ~config ~policy:default_policy ~contact_plan in 69 66 let bundle = make_bundle ~dest:(Bundle.Ipn (99L, 1L)) "remote payload" in 70 - match Engine.Engine.process engine bundle ~tenant:None with 71 - | Engine.Engine.Forward reqs -> 67 + match Engine.process engine bundle ~tenant:None with 68 + | Engine.Forward reqs -> 72 69 Alcotest.(check int) "forward request count" 1 (List.length reqs) 73 70 | result -> 74 71 Alcotest.fail 75 72 (Fmt.str "expected Forward, got %s" 76 73 (match result with 77 - | Engine.Engine.Forward _ -> "Forward" 78 - | Engine.Engine.Stored -> "Stored" 79 - | Engine.Engine.Dropped r -> "Dropped: " ^ r 80 - | Engine.Engine.Admin_handled -> "Admin_handled" 81 - | Engine.Engine.Delivered -> "Delivered")) 74 + | Engine.Forward _ -> "Forward" 75 + | Engine.Stored -> "Stored" 76 + | Engine.Dropped r -> "Dropped: " ^ r 77 + | Engine.Admin_handled -> "Admin_handled" 78 + | Engine.Delivered -> "Delivered")) 82 79 83 80 let test_stats () = 84 - let config = Engine.Engine.config ~node_id:1L in 81 + let config = Engine.make_config ~node_id:1L in 85 82 let engine = 86 - Engine.Engine.create ~config ~policy:default_policy 83 + Engine.create ~config ~policy:default_policy 87 84 ~contact_plan:Cgr.Contact_plan.empty 88 85 in 89 - let stats = Engine.Engine.stats engine in 86 + let stats = Engine.stats engine in 90 87 Alcotest.(check int) "initial received" 0 stats.bundles_received; 91 88 (* Process some bundles *) 92 89 let bundle1 = make_bundle ~dest:(Bundle.Ipn (1L, 1L)) "local" in 93 90 let bundle2 = make_bundle ~dest:(Bundle.Ipn (99L, 1L)) "remote" in 94 - ignore (Engine.Engine.process engine bundle1 ~tenant:None); 95 - ignore (Engine.Engine.process engine bundle2 ~tenant:None); 96 - let stats = Engine.Engine.stats engine in 91 + ignore (Engine.process engine bundle1 ~tenant:None); 92 + ignore (Engine.process engine bundle2 ~tenant:None); 93 + let stats = Engine.stats engine in 97 94 Alcotest.(check int) "received" 2 stats.bundles_received; 98 95 Alcotest.(check int) "delivered" 1 stats.bundles_delivered 99 96 ··· 101 98 [ 102 99 Alcotest.test_case "local delivery" `Quick test_process_local_delivery; 103 100 Alcotest.test_case "no route stores" `Quick test_process_no_route_stores; 104 - Alcotest.test_case "with route forwards" `Quick test_process_with_route_forwards; 101 + Alcotest.test_case "with route forwards" `Quick 102 + test_process_with_route_forwards; 105 103 Alcotest.test_case "stats" `Quick test_stats; 106 104 ]
+39 -30
test/test_policy.ml
··· 10 10 11 11 let test_eid_pattern_any () = 12 12 let pat = Eid_pattern.any in 13 - Alcotest.(check bool) "any matches ipn" true 13 + Alcotest.(check bool) 14 + "any matches ipn" true 14 15 (Eid_pattern.matches pat (Bundle.Ipn (1L, 1L))); 15 - Alcotest.(check bool) "any matches dtn" true 16 + Alcotest.(check bool) 17 + "any matches dtn" true 16 18 (Eid_pattern.matches pat (Bundle.Dtn "node/app")); 17 - Alcotest.(check bool) "any matches none" true 19 + Alcotest.(check bool) 20 + "any matches none" true 18 21 (Eid_pattern.matches pat Bundle.Dtn_none) 19 22 20 23 let test_eid_pattern_ipn_node () = 21 24 let pat = Eid_pattern.ipn_node 10L in 22 - Alcotest.(check bool) "matches same node" true 25 + Alcotest.(check bool) 26 + "matches same node" true 23 27 (Eid_pattern.matches pat (Bundle.Ipn (10L, 1L))); 24 - Alcotest.(check bool) "matches same node different service" true 28 + Alcotest.(check bool) 29 + "matches same node different service" true 25 30 (Eid_pattern.matches pat (Bundle.Ipn (10L, 99L))); 26 - Alcotest.(check bool) "no match different node" false 31 + Alcotest.(check bool) 32 + "no match different node" false 27 33 (Eid_pattern.matches pat (Bundle.Ipn (11L, 1L))); 28 - Alcotest.(check bool) "no match dtn" false 34 + Alcotest.(check bool) 35 + "no match dtn" false 29 36 (Eid_pattern.matches pat (Bundle.Dtn "10")) 30 37 31 38 let test_eid_pattern_ipn_range () = 32 39 let pat = Eid_pattern.ipn_range 10L 20L in 33 - Alcotest.(check bool) "matches lower bound" true 40 + Alcotest.(check bool) 41 + "matches lower bound" true 34 42 (Eid_pattern.matches pat (Bundle.Ipn (10L, 1L))); 35 - Alcotest.(check bool) "matches upper bound" true 43 + Alcotest.(check bool) 44 + "matches upper bound" true 36 45 (Eid_pattern.matches pat (Bundle.Ipn (20L, 1L))); 37 - Alcotest.(check bool) "matches middle" true 46 + Alcotest.(check bool) 47 + "matches middle" true 38 48 (Eid_pattern.matches pat (Bundle.Ipn (15L, 1L))); 39 - Alcotest.(check bool) "no match below" false 49 + Alcotest.(check bool) 50 + "no match below" false 40 51 (Eid_pattern.matches pat (Bundle.Ipn (9L, 1L))); 41 - Alcotest.(check bool) "no match above" false 52 + Alcotest.(check bool) 53 + "no match above" false 42 54 (Eid_pattern.matches pat (Bundle.Ipn (21L, 1L))) 43 55 44 56 let test_predicate_source () = 45 57 let bundle = 46 - Bundle.v ~source:(Bundle.Ipn (10L, 1L)) 58 + Bundle.v 59 + ~source:(Bundle.Ipn (10L, 1L)) 47 60 ~destination:(Bundle.Ipn (20L, 1L)) 48 - ~creation_timestamp:{ time = 0L; seq = 0L } 49 - ~payload:"test" () 61 + ~creation_timestamp:{ time = 0L; seq = 0L } ~payload:"test" () 50 62 in 51 - let ctx : Predicate.context = 52 - { bundle; current_time = 0.; tenant = None } 53 - in 63 + let ctx : Predicate.context = { bundle; current_time = 0.; tenant = None } in 54 64 let pred = Predicate.source (Eid_pattern.ipn_node 10L) in 55 65 Alcotest.(check bool) "source matches" true (Predicate.eval ctx pred); 56 66 let pred_other = Predicate.source (Eid_pattern.ipn_node 99L) in ··· 58 68 59 69 let test_predicate_and_or () = 60 70 let bundle = 61 - Bundle.v ~source:(Bundle.Ipn (10L, 1L)) 71 + Bundle.v 72 + ~source:(Bundle.Ipn (10L, 1L)) 62 73 ~destination:(Bundle.Ipn (20L, 1L)) 63 - ~creation_timestamp:{ time = 0L; seq = 0L } 64 - ~payload:"test" () 65 - in 66 - let ctx : Predicate.context = 67 - { bundle; current_time = 0.; tenant = None } 74 + ~creation_timestamp:{ time = 0L; seq = 0L } ~payload:"test" () 68 75 in 76 + let ctx : Predicate.context = { bundle; current_time = 0.; tenant = None } in 69 77 let p1 = Predicate.source (Eid_pattern.ipn_node 10L) in 70 78 let p2 = Predicate.dest (Eid_pattern.ipn_node 20L) in 71 79 let p3 = Predicate.dest (Eid_pattern.ipn_node 99L) in 72 80 Alcotest.(check bool) "and true" true Predicate.(eval ctx (p1 && p2)); 73 81 Alcotest.(check bool) "and false" false Predicate.(eval ctx (p1 && p3)); 74 82 Alcotest.(check bool) "or true" true Predicate.(eval ctx (p2 || p3)); 75 - Alcotest.(check bool) "or false" false Predicate.(eval ctx (p3 || Predicate.false_)) 83 + Alcotest.(check bool) 84 + "or false" false 85 + Predicate.(eval ctx (p3 || Predicate.false_)) 76 86 77 87 let test_policy_eval () = 78 88 let bundle = 79 - Bundle.v ~source:(Bundle.Ipn (10L, 1L)) 89 + Bundle.v 90 + ~source:(Bundle.Ipn (10L, 1L)) 80 91 ~destination:(Bundle.Ipn (20L, 1L)) 81 - ~creation_timestamp:{ time = 0L; seq = 0L } 82 - ~payload:"test" () 92 + ~creation_timestamp:{ time = 0L; seq = 0L } ~payload:"test" () 83 93 in 84 94 let ctx : Policy.context = 85 95 { ··· 93 103 let policy = 94 104 Policy.if_pred 95 105 (Predicate.source (Eid_pattern.ipn_node 10L)) 96 - Policy.forward_route 97 - (Policy.drop "no match") 106 + Policy.forward_route (Policy.drop "no match") 98 107 in 99 108 match Policy.eval ctx policy with 100 109 | Policy.Actions [ Action.Forward_route ] -> ()
+24 -28
test/test_store.ml
··· 5 5 6 6 let make_bundle ?(source = Bundle.Ipn (1L, 1L)) ?(dest = Bundle.Ipn (2L, 1L)) 7 7 ?(seq = 0L) payload = 8 - Bundle.v ~source ~destination:dest 9 - ~creation_timestamp:{ time = 0L; seq } 8 + Bundle.v ~source ~destination:dest ~creation_timestamp:{ time = 0L; seq } 10 9 ~payload () 11 10 12 11 let test_store_and_find () = 13 - let store = Store.Store.create () in 12 + let store = Store.create () in 14 13 let bundle = make_bundle "test payload" in 15 - Store.Store.store store ~bundle ~condition:Policy.Action.Until_route 16 - ~custody:false; 17 - Alcotest.(check int) "count" 1 (Store.Store.count store); 18 - let id = Store.Bundle_id.of_bundle bundle in 19 - match Store.Store.find store id with 14 + Store.store store ~bundle ~condition:Action.Until_route ~custody:false; 15 + Alcotest.(check int) "count" 1 (Store.count store); 16 + let id = Bundle_id.of_bundle bundle in 17 + match Store.find store id with 20 18 | None -> Alcotest.fail "bundle not found" 21 19 | Some entry -> 22 20 Alcotest.(check bool) "custody" false entry.custody; 23 - Alcotest.(check string) "payload" "test payload" 21 + Alcotest.(check string) 22 + "payload" "test payload" 24 23 (Option.get (Bundle.payload entry.bundle)) 25 24 26 25 let test_remove () = 27 - let store = Store.Store.create () in 26 + let store = Store.create () in 28 27 let bundle = make_bundle "test" in 29 - let id = Store.Bundle_id.of_bundle bundle in 30 - Store.Store.store store ~bundle ~condition:Policy.Action.Until_route 31 - ~custody:false; 32 - Alcotest.(check int) "count before" 1 (Store.Store.count store); 33 - Store.Store.remove store id; 34 - Alcotest.(check int) "count after" 0 (Store.Store.count store); 35 - Alcotest.(check bool) "mem" false (Store.Store.mem store id) 28 + let id = Bundle_id.of_bundle bundle in 29 + Store.store store ~bundle ~condition:Action.Until_route ~custody:false; 30 + Alcotest.(check int) "count before" 1 (Store.count store); 31 + Store.remove store id; 32 + Alcotest.(check int) "count after" 0 (Store.count store); 33 + Alcotest.(check bool) "mem" false (Store.mem store id) 36 34 37 35 let test_ready_for_contact () = 38 - let store = Store.Store.create () in 36 + let store = Store.create () in 39 37 let target = Bundle.Ipn (10L, 0L) in 40 38 let b1 = make_bundle ~seq:1L "bundle 1" in 41 39 let b2 = make_bundle ~seq:2L "bundle 2" in 42 40 let b3 = make_bundle ~seq:3L "bundle 3" in 43 - Store.Store.store store ~bundle:b1 ~condition:(Policy.Action.Until_contact target) 44 - ~custody:false; 45 - Store.Store.store store ~bundle:b2 ~condition:Policy.Action.Until_route 41 + Store.store store ~bundle:b1 ~condition:(Action.Until_contact target) 46 42 ~custody:false; 47 - Store.Store.store store ~bundle:b3 ~condition:(Policy.Action.Until_contact target) 43 + Store.store store ~bundle:b2 ~condition:Action.Until_route ~custody:false; 44 + Store.store store ~bundle:b3 ~condition:(Action.Until_contact target) 48 45 ~custody:true; 49 - let ready = Store.Store.ready_for_contact store target in 46 + let ready = Store.ready_for_contact store target in 50 47 Alcotest.(check int) "ready count" 2 (List.length ready) 51 48 52 49 let test_ready_for_route () = 53 - let store = Store.Store.create () in 50 + let store = Store.create () in 54 51 let target = Bundle.Ipn (10L, 0L) in 55 52 let b1 = make_bundle ~seq:1L "bundle 1" in 56 53 let b2 = make_bundle ~seq:2L "bundle 2" in 57 - Store.Store.store store ~bundle:b1 ~condition:(Policy.Action.Until_contact target) 58 - ~custody:false; 59 - Store.Store.store store ~bundle:b2 ~condition:Policy.Action.Until_route 54 + Store.store store ~bundle:b1 ~condition:(Action.Until_contact target) 60 55 ~custody:false; 61 - let ready = Store.Store.ready_for_route store in 56 + Store.store store ~bundle:b2 ~condition:Action.Until_route ~custody:false; 57 + let ready = Store.ready_for_route store in 62 58 Alcotest.(check int) "ready count" 1 (List.length ready) 63 59 64 60 let suite =