DTN controller and policy language for satellite networks
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Remove broken ocaml-monitor subtree

+607 -213
+4 -4
README.md
··· 53 53 54 54 ```bash 55 55 # Show daemon status 56 - borctl status 56 + borealis status 57 57 58 58 # Deploy a policy 59 - borctl policy deploy policy.bp 59 + borealis policy deploy policy.bp 60 60 61 61 # Update contact plan 62 - borctl contacts update contacts.json 62 + borealis contacts update contacts.json 63 63 64 64 # List stored bundles 65 - borctl bundles list 65 + borealis bundles list 66 66 ``` 67 67 68 68 ## Policy DSL
+3 -2
adapters/tcpcl_adapter.ml
··· 8 8 type state = Connecting | Connected | Closed 9 9 10 10 type t = { 11 - socket : [ `Generic ] Eio.Net.stream_socket_ty Eio.Resource.t; 11 + socket : [ `Generic | `Unix ] Eio.Net.stream_socket_ty Eio.Resource.t; 12 12 local_eid : Bundle.eid; 13 13 mutable peer_eid : string option; 14 14 mutable state : state; ··· 167 167 Log.err (fun m -> m "Handshake error: %s" e); 168 168 t.state <- Closed 169 169 170 - let connect ~sw ~net ~local_eid ~addr = 170 + let connect ~sw ~(net : [ `Generic | `Unix ] Eio.Net.ty Eio.Net.t) ~local_eid 171 + ~addr = 171 172 let socket = Eio.Net.connect ~sw net addr in 172 173 let t = 173 174 {
+2 -2
adapters/tcpcl_adapter.mli
··· 15 15 16 16 val connect : 17 17 sw:Eio.Switch.t -> 18 - net:[> [ `Generic ] Eio.Net.ty ] Eio.Net.t -> 18 + net:[ `Generic | `Unix ] Eio.Net.ty Eio.Net.t -> 19 19 local_eid:Bundle.eid -> 20 20 addr:Eio.Net.Sockaddr.stream -> 21 21 t ··· 25 25 val accept : 26 26 sw:Eio.Switch.t -> 27 27 local_eid:Bundle.eid -> 28 - [ `Generic ] Eio.Net.stream_socket_ty Eio.Resource.t -> 28 + [ `Generic | `Unix ] Eio.Net.stream_socket_ty Eio.Resource.t -> 29 29 t 30 30 (** [accept ~sw ~local_eid socket] accepts a TCPCL connection. *) 31 31
-168
bin/borctl.ml
··· 1 - (*--------------------------------------------------------------------------- 2 - Copyright (c) 2025 Thomas Gazagnaire. All rights reserved. 3 - SPDX-License-Identifier: ISC 4 - ---------------------------------------------------------------------------*) 5 - 6 - (** Borealis control CLI. *) 7 - 8 - open Cmdliner 9 - 10 - let log_src = Logs.Src.create "borctl" 11 - 12 - module Log = (val Logs.src_log log_src) 13 - 14 - (* Common options *) 15 - let node_id = 16 - let doc = "Target node ID (IPN scheme node number)." in 17 - Arg.(value & opt int64 1L & info [ "n"; "node" ] ~docv:"NODE" ~doc) 18 - 19 - let source_node = 20 - let doc = "Source node ID for admin bundles." in 21 - Arg.(value & opt int64 0L & info [ "s"; "source" ] ~docv:"SOURCE" ~doc) 22 - 23 - let output_file = 24 - let doc = "Output file for admin bundle (default: stdout)." in 25 - Arg.( 26 - value & opt (some string) None & info [ "o"; "output" ] ~docv:"FILE" ~doc) 27 - 28 - (* Create admin bundle *) 29 - let make_admin_bundle ~source ~dest record = 30 - let timestamp : Bundle.timestamp = 31 - { time = Int64.of_float (Unix.gettimeofday () *. 1000.); seq = 0L } 32 - in 33 - Admin.make_bundle ~source ~destination:dest ~timestamp record 34 - 35 - let write_bundle bundle output = 36 - let data = Bundle.encode bundle in 37 - match output with 38 - | None -> print_string data 39 - | Some file -> 40 - let oc = open_out_bin file in 41 - output_string oc data; 42 - close_out oc; 43 - Log.info (fun m -> m "Wrote bundle to %s" file) 44 - 45 - (* Status command *) 46 - let status source dest output () = 47 - let record = Admin.Query Admin.Query_status in 48 - let bundle = make_admin_bundle ~source ~dest record in 49 - write_bundle bundle output; 50 - Log.info (fun m -> m "Generated status query bundle") 51 - 52 - let status_cmd = 53 - let doc = "Query node status." in 54 - let info = Cmd.info "status" ~doc in 55 - let dest = Term.(const (fun n -> Bundle.Ipn (n, 0L)) $ node_id) in 56 - let source = Term.(const (fun n -> Bundle.Ipn (n, 0L)) $ source_node) in 57 - Cmd.v info 58 - Term.(const status $ source $ dest $ output_file $ Vlog.setup "borctl") 59 - 60 - (* Contacts command *) 61 - let contacts source dest output () = 62 - let record = Admin.Query Admin.Query_contacts in 63 - let bundle = make_admin_bundle ~source ~dest record in 64 - write_bundle bundle output; 65 - Log.info (fun m -> m "Generated contacts query bundle") 66 - 67 - let contacts_cmd = 68 - let doc = "Query contact plan." in 69 - let info = Cmd.info "contacts" ~doc in 70 - let dest = Term.(const (fun n -> Bundle.Ipn (n, 0L)) $ node_id) in 71 - let source = Term.(const (fun n -> Bundle.Ipn (n, 0L)) $ source_node) in 72 - Cmd.v info 73 - Term.(const contacts $ source $ dest $ output_file $ Vlog.setup "borctl") 74 - 75 - (* Policy command *) 76 - let policy_cmd = 77 - let doc = "Manage routing policies." in 78 - let info = Cmd.info "policy" ~doc in 79 - Cmd.v info Term.(ret (const (fun () -> `Help (`Pager, None)) $ const ())) 80 - 81 - (* Bundles command *) 82 - let bundles source dest filter output () = 83 - let record = Admin.Query (Admin.Query_bundles { filter }) in 84 - let bundle = make_admin_bundle ~source ~dest record in 85 - write_bundle bundle output; 86 - Log.info (fun m -> m "Generated bundles query bundle") 87 - 88 - let filter_arg = 89 - let doc = "Filter expression for bundle listing." in 90 - Arg.( 91 - value & opt (some string) None & info [ "f"; "filter" ] ~docv:"FILTER" ~doc) 92 - 93 - let bundles_cmd = 94 - let doc = "List stored bundles." in 95 - let info = Cmd.info "bundles" ~doc in 96 - let dest = Term.(const (fun n -> Bundle.Ipn (n, 0L)) $ node_id) in 97 - let source = Term.(const (fun n -> Bundle.Ipn (n, 0L)) $ source_node) in 98 - Cmd.v info 99 - Term.( 100 - const bundles $ source $ dest $ filter_arg $ output_file 101 - $ Vlog.setup "borctl") 102 - 103 - (* Decode command - decode and display an admin bundle *) 104 - let decode file () = 105 - let data = 106 - match file with 107 - | None -> 108 - let buf = Buffer.create 4096 in 109 - (try 110 - while true do 111 - Buffer.add_channel buf stdin 4096 112 - done 113 - with End_of_file -> ()); 114 - Buffer.contents buf 115 - | Some f -> 116 - let ic = open_in_bin f in 117 - let len = in_channel_length ic in 118 - let data = really_input_string ic len in 119 - close_in ic; 120 - data 121 - in 122 - match Bundle.decode data with 123 - | Error e -> 124 - Log.err (fun m -> m "Failed to decode bundle: %a" Bundle.pp_error e); 125 - exit 1 126 - | Ok bundle -> ( 127 - Fmt.pr "@[<v>Bundle:@, %a@]@." Bundle.pp bundle; 128 - match Admin.extract bundle with 129 - | Error e -> Log.err (fun m -> m "Not an admin bundle: %s" e) 130 - | Ok record -> Fmt.pr "@[<v>Admin record:@, %a@]@." Admin.pp record) 131 - 132 - let input_file = 133 - let doc = "Input file (default: stdin)." in 134 - Arg.(value & opt (some string) None & info [ "i"; "input" ] ~docv:"FILE" ~doc) 135 - 136 - let decode_cmd = 137 - let doc = "Decode and display an admin bundle." in 138 - let info = Cmd.info "decode" ~doc in 139 - Cmd.v info Term.(const decode $ input_file $ Vlog.setup "borctl") 140 - 141 - (* Main *) 142 - let info = 143 - let doc = "Control CLI for borealis DTN daemon" in 144 - let man = 145 - [ 146 - `S Manpage.s_description; 147 - `P 148 - "borctl generates admin bundles for controlling borealis nodes. These \ 149 - bundles can be injected into the network for delivery."; 150 - `S Manpage.s_commands; 151 - `P "$(b,borctl status) - Query node status"; 152 - `P "$(b,borctl contacts) - Query contact plan"; 153 - `P "$(b,borctl bundles) - List stored bundles"; 154 - `P "$(b,borctl decode) - Decode admin bundle"; 155 - `S Manpage.s_see_also; 156 - `P "$(b,borealis)(1) - DTN daemon."; 157 - ] 158 - in 159 - Cmd.info "borctl" ~version:"0.1.0" ~doc ~man 160 - 161 - let default = Term.(ret (const (fun () -> `Help (`Pager, None)) $ const ())) 162 - 163 - let () = 164 - let cmd = 165 - Cmd.group info ~default 166 - [ status_cmd; contacts_cmd; policy_cmd; bundles_cmd; decode_cmd ] 167 - in 168 - exit (Cmd.eval cmd)
+288 -32
bin/borealis.ml
··· 3 3 SPDX-License-Identifier: ISC 4 4 ---------------------------------------------------------------------------*) 5 5 6 - (** Borealis DTN daemon. *) 6 + (** Borealis DTN daemon and control CLI. *) 7 7 8 8 open Cmdliner 9 9 ··· 11 11 12 12 module Log = (val Logs.src_log log_src) 13 13 14 + (* ============================================================================ 15 + Daemon (run) 16 + ============================================================================ *) 17 + 14 18 (* Default policy: forward via CGR if route exists, otherwise store *) 15 19 let default_policy = 16 20 Policy.if_pred Predicate.is_admin ··· 19 23 (Temporal.route_exists (Bundle.Ipn (0L, 0L))) 20 24 Policy.forward_route Policy.store_until_route) 21 25 22 - let run node_id listen_port () = 26 + let run node_id listen_port config_file () = 23 27 Eio_main.run @@ fun env -> 24 - let net = Eio.Stdenv.net env in 25 - let sw = Eio.Stdenv.fs env in 26 - ignore (net, sw); 27 - let config = Engine.make_config ~node_id in 28 - let contact_plan = Cgr.Contact_plan.empty in 28 + Eio.Switch.run @@ fun sw -> 29 + let clock = Eio.Stdenv.clock env in 30 + let config, contact_plan, peers = 31 + match config_file with 32 + | Some file -> ( 33 + match Config.load file with 34 + | Ok c -> 35 + let plan = Config.to_contact_plan c in 36 + (Engine.make_config ~node_id:c.node_id, plan, c.peers) 37 + | Error e -> 38 + Log.err (fun m -> m "Failed to load config: %s" e); 39 + (Engine.make_config ~node_id, Cgr.Contact_plan.empty, [])) 40 + | None -> (Engine.make_config ~node_id, Cgr.Contact_plan.empty, []) 41 + in 29 42 let engine = Engine.create ~config ~policy:default_policy ~contact_plan in 30 - Log.info (fun m -> 31 - m "Borealis starting on node %Ld, listening on port %d" node_id 32 - listen_port); 43 + let daemon = Daemon.create ~engine in 44 + List.iter 45 + (fun (p : Config.peer) -> 46 + let node = Cgr.Node.v (Int64.to_string p.node_id) in 47 + let address = Config.peer_address p in 48 + Daemon.add_peer daemon ~node ~address) 49 + peers; 50 + let net = Eio.Stdenv.net env in 51 + Log.app (fun m -> 52 + m "Borealis starting: node=%Ld port=%d peers=%d" node_id listen_port 53 + (List.length peers)); 33 54 Log.info (fun m -> m "Local EID: %a" Bundle.pp_eid config.local_eid); 34 - Log.info (fun m -> m "Admin EID: %a" Bundle.pp_eid config.admin_eid); 35 - 36 - (* Main loop - for now just log stats periodically *) 37 - let rec loop () = 38 - Eio.Time.sleep (Eio.Stdenv.clock env) 10.0; 39 - let stats = Engine.stats engine in 40 - Log.info (fun m -> m "Stats: %a" Engine.pp_stats stats); 41 - let expired = 42 - Engine.cleanup_expired engine ~current_time:(Unix.gettimeofday ()) 43 - in 44 - if expired > 0 then 45 - Log.info (fun m -> m "Cleaned up %d expired bundles" expired); 46 - loop () 47 - in 48 - loop () 55 + Daemon.run daemon ~sw ~net ~clock ~port:listen_port 49 56 50 - (* CLI *) 51 - let node_id = 57 + let node_id_opt = 52 58 let doc = "Node ID (IPN scheme node number)." in 53 59 Arg.(value & opt int64 1L & info [ "n"; "node-id" ] ~docv:"NODE" ~doc) 54 60 ··· 56 62 let doc = "TCP port to listen on for TCPCL connections." in 57 63 Arg.(value & opt int 4556 & info [ "p"; "port" ] ~docv:"PORT" ~doc) 58 64 65 + let config_file = 66 + let doc = "Configuration file." in 67 + Arg.( 68 + value & opt (some string) None & info [ "c"; "config" ] ~docv:"FILE" ~doc) 69 + 59 70 let run_cmd = 60 71 let doc = "Run the borealis DTN daemon." in 61 72 let info = Cmd.info "run" ~doc in 62 - Cmd.v info Term.(const run $ node_id $ listen_port $ Vlog.setup "borealis") 73 + Cmd.v info 74 + Term.( 75 + const run $ node_id_opt $ listen_port $ config_file 76 + $ Vlog.setup "borealis") 77 + 78 + (* ============================================================================ 79 + Inject (send a test bundle) 80 + ============================================================================ *) 81 + 82 + let inject source_node dest_node payload dest_host dest_port () = 83 + Eio_main.run @@ fun env -> 84 + Eio.Switch.run @@ fun sw -> 85 + let net = Eio.Stdenv.net env in 86 + let clock = Eio.Stdenv.clock env in 87 + let source = Bundle.Ipn (source_node, 1L) in 88 + let destination = Bundle.Ipn (dest_node, 1L) in 89 + let now = Eio.Time.now clock in 90 + let timestamp : Bundle.timestamp = 91 + { time = Int64.of_float (now *. 1000.); seq = 0L } 92 + in 93 + let primary : Bundle.primary_block = 94 + { 95 + version = 7; 96 + flags = Bundle.bundle_flags_default; 97 + crc_type = Bundle.Crc_none; 98 + destination; 99 + source; 100 + report_to = source; 101 + creation_timestamp = timestamp; 102 + lifetime = 86400L; 103 + fragment_offset = None; 104 + total_adu_length = None; 105 + } 106 + in 107 + let payload_block : Bundle.canonical_block = 108 + { 109 + block_type = Payload; 110 + block_number = 1; 111 + flags = Bundle.block_flags_default; 112 + crc_type = Bundle.Crc_none; 113 + data = Payload_data payload; 114 + } 115 + in 116 + let bundle : Bundle.t = { primary; blocks = [ payload_block ] } in 117 + let addr = `Tcp (Eio.Net.Ipaddr.of_raw dest_host, dest_port) in 118 + Log.info (fun m -> m "Connecting to %s:%d..." dest_host dest_port); 119 + let conn = Tcpcl_adapter.connect ~sw ~net ~local_eid:source ~addr in 120 + if Tcpcl_adapter.is_connected conn then ( 121 + match Tcpcl_adapter.send_bundle conn bundle with 122 + | Ok () -> 123 + Fmt.pr "✓ Bundle injected: %Ld → %Ld (%d bytes)@." source_node dest_node 124 + (String.length payload); 125 + Tcpcl_adapter.close_connection conn 126 + | Error e -> 127 + Fmt.pr "✗ Failed to send: %s@." e; 128 + exit 1) 129 + else ( 130 + Fmt.pr "✗ Failed to connect@."; 131 + exit 1) 132 + 133 + let source_node = 134 + let doc = "Source node ID." in 135 + Arg.( 136 + required & opt (some int64) None & info [ "s"; "source" ] ~docv:"NODE" ~doc) 137 + 138 + let dest_node = 139 + let doc = "Destination node ID." in 140 + Arg.( 141 + required & opt (some int64) None & info [ "d"; "dest" ] ~docv:"NODE" ~doc) 142 + 143 + let payload_arg = 144 + let doc = "Bundle payload." in 145 + Arg.( 146 + value & opt string "Hello DTN!" & info [ "m"; "message" ] ~docv:"MSG" ~doc) 147 + 148 + let dest_host = 149 + let doc = "Destination host." in 150 + Arg.(value & opt string "127.0.0.1" & info [ "H"; "host" ] ~docv:"HOST" ~doc) 151 + 152 + let dest_port_arg = 153 + let doc = "Destination port." in 154 + Arg.(value & opt int 4556 & info [ "P"; "dest-port" ] ~docv:"PORT" ~doc) 155 + 156 + let inject_cmd = 157 + let doc = "Inject a test bundle into the network." in 158 + let info = Cmd.info "inject" ~doc in 159 + Cmd.v info 160 + Term.( 161 + const inject $ source_node $ dest_node $ payload_arg $ dest_host 162 + $ dest_port_arg $ Vlog.setup "borealis") 163 + 164 + (* ============================================================================ 165 + Control CLI 166 + ============================================================================ *) 167 + 168 + let target_node = 169 + let doc = "Target node ID (IPN scheme node number)." in 170 + Arg.(value & opt int64 1L & info [ "n"; "node" ] ~docv:"NODE" ~doc) 171 + 172 + let admin_source_node = 173 + let doc = "Source node ID for admin bundles." in 174 + Arg.(value & opt int64 0L & info [ "s"; "source" ] ~docv:"SOURCE" ~doc) 175 + 176 + let output_file = 177 + let doc = "Output file for admin bundle (default: stdout)." in 178 + Arg.( 179 + value & opt (some string) None & info [ "o"; "output" ] ~docv:"FILE" ~doc) 180 + 181 + let make_admin_bundle ~source ~dest record = 182 + Eio_main.run @@ fun env -> 183 + let clock = Eio.Stdenv.clock env in 184 + let now = Eio.Time.now clock in 185 + let timestamp : Bundle.timestamp = 186 + { time = Int64.of_float (now *. 1000.); seq = 0L } 187 + in 188 + Admin.make_bundle ~source ~destination:dest ~timestamp record 189 + 190 + let write_bundle bundle output = 191 + let data = Bundle.encode bundle in 192 + match output with 193 + | None -> print_string data 194 + | Some file -> 195 + let oc = open_out_bin file in 196 + output_string oc data; 197 + close_out oc; 198 + Fmt.pr "✓ Wrote bundle to %s@." file 199 + 200 + let status source dest output () = 201 + let record = Admin.Query Admin.Query_status in 202 + let bundle = make_admin_bundle ~source ~dest record in 203 + write_bundle bundle output 204 + 205 + let status_cmd = 206 + let doc = "Query node status." in 207 + let info = Cmd.info "status" ~doc in 208 + let dest = Term.(const (fun n -> Bundle.Ipn (n, 0L)) $ target_node) in 209 + let source = Term.(const (fun n -> Bundle.Ipn (n, 0L)) $ admin_source_node) in 210 + Cmd.v info 211 + Term.(const status $ source $ dest $ output_file $ Vlog.setup "borealis") 212 + 213 + let contacts source dest output () = 214 + let record = Admin.Query Admin.Query_contacts in 215 + let bundle = make_admin_bundle ~source ~dest record in 216 + write_bundle bundle output 217 + 218 + let contacts_cmd = 219 + let doc = "Query contact plan." in 220 + let info = Cmd.info "contacts" ~doc in 221 + let dest = Term.(const (fun n -> Bundle.Ipn (n, 0L)) $ target_node) in 222 + let source = Term.(const (fun n -> Bundle.Ipn (n, 0L)) $ admin_source_node) in 223 + Cmd.v info 224 + Term.(const contacts $ source $ dest $ output_file $ Vlog.setup "borealis") 225 + 226 + let policy_deploy_cmd = 227 + let doc = "Deploy a policy file." in 228 + let info = Cmd.info "deploy" ~doc in 229 + Cmd.v info Term.(ret (const (fun () -> `Help (`Pager, None)) $ const ())) 230 + 231 + let policy_cmd = 232 + let doc = "Manage routing policies." in 233 + let info = Cmd.info "policy" ~doc in 234 + let default = 235 + Term.(ret (const (fun () -> `Help (`Pager, None)) $ const ())) 236 + in 237 + Cmd.group info ~default [ policy_deploy_cmd ] 238 + 239 + let bundles source dest filter output () = 240 + let record = Admin.Query (Admin.Query_bundles { filter }) in 241 + let bundle = make_admin_bundle ~source ~dest record in 242 + write_bundle bundle output 243 + 244 + let filter_arg = 245 + let doc = "Filter expression for bundle listing." in 246 + Arg.( 247 + value & opt (some string) None & info [ "f"; "filter" ] ~docv:"FILTER" ~doc) 248 + 249 + let bundles_list_cmd = 250 + let doc = "List stored bundles." in 251 + let info = Cmd.info "list" ~doc in 252 + let dest = Term.(const (fun n -> Bundle.Ipn (n, 0L)) $ target_node) in 253 + let source = Term.(const (fun n -> Bundle.Ipn (n, 0L)) $ admin_source_node) in 254 + Cmd.v info 255 + Term.( 256 + const bundles $ source $ dest $ filter_arg $ output_file 257 + $ Vlog.setup "borealis") 258 + 259 + let bundles_cmd = 260 + let doc = "Manage stored bundles." in 261 + let info = Cmd.info "bundles" ~doc in 262 + let default = 263 + Term.(ret (const (fun () -> `Help (`Pager, None)) $ const ())) 264 + in 265 + Cmd.group info ~default [ bundles_list_cmd ] 266 + 267 + let decode file () = 268 + Eio_main.run @@ fun env -> 269 + let fs = Eio.Stdenv.fs env in 270 + let data = 271 + match file with 272 + | None -> 273 + let buf = Buffer.create 4096 in 274 + (try 275 + while true do 276 + Buffer.add_channel buf stdin 4096 277 + done 278 + with End_of_file -> ()); 279 + Buffer.contents buf 280 + | Some f -> Eio.Path.load Eio.Path.(fs / f) 281 + in 282 + match Bundle.decode data with 283 + | Error e -> 284 + Fmt.pr "✗ Failed to decode bundle: %a@." Bundle.pp_error e; 285 + exit 1 286 + | Ok bundle -> ( 287 + Fmt.pr "@[<v>Bundle:@, %a@]@." Bundle.pp bundle; 288 + match Admin.extract bundle with 289 + | Error e -> Log.warn (fun m -> m "Not an admin bundle: %s" e) 290 + | Ok record -> Fmt.pr "@[<v>Admin record:@, %a@]@." Admin.pp record) 291 + 292 + let input_file = 293 + let doc = "Input file (default: stdin)." in 294 + Arg.(value & opt (some string) None & info [ "i"; "input" ] ~docv:"FILE" ~doc) 295 + 296 + let decode_cmd = 297 + let doc = "Decode and display an admin bundle." in 298 + let info = Cmd.info "decode" ~doc in 299 + Cmd.v info Term.(const decode $ input_file $ Vlog.setup "borealis") 300 + 301 + (* ============================================================================ 302 + Main 303 + ============================================================================ *) 63 304 64 305 let info = 65 306 let doc = ··· 72 313 "Borealis is a Delay-Tolerant Networking daemon with an embedded \ 73 314 policy language for software-defined satellite networking."; 74 315 `S Manpage.s_commands; 75 - `P "Use $(b,borealis run) to start the daemon."; 76 - `S Manpage.s_see_also; 77 - `P "$(b,borctl)(1) - Control CLI for borealis."; 316 + `P "$(b,borealis run) - Start the daemon"; 317 + `P "$(b,borealis inject) - Inject a test bundle"; 318 + `P "$(b,borealis status) - Query node status"; 319 + `P "$(b,borealis contacts) - Query contact plan"; 320 + `P "$(b,borealis policy) - Manage routing policies"; 321 + `P "$(b,borealis bundles) - Manage stored bundles"; 322 + `P "$(b,borealis decode) - Decode admin bundles"; 78 323 ] 79 324 in 80 325 Cmd.info "borealis" ~version:"0.1.0" ~doc ~man ··· 82 327 let default = Term.(ret (const (fun () -> `Help (`Pager, None)) $ const ())) 83 328 84 329 let () = 85 - let cmd = Cmd.group info ~default [ run_cmd ] in 330 + let cmd = 331 + Cmd.group info ~default 332 + [ 333 + run_cmd; 334 + inject_cmd; 335 + status_cmd; 336 + contacts_cmd; 337 + policy_cmd; 338 + bundles_cmd; 339 + decode_cmd; 340 + ] 341 + in 86 342 exit (Cmd.eval cmd)
+4 -5
bin/dune
··· 7 7 borealis.store 8 8 borealis.forwarding 9 9 borealis.adapters 10 + borealis.daemon 10 11 bundle 12 + cbort 11 13 cgr 14 + eio 12 15 eio_main 13 16 cmdliner 14 17 fmt 15 18 logs 19 + tty 16 20 vlog)) 17 - 18 - (executable 19 - (name borctl) 20 - (public_name borctl) 21 - (libraries borealis.admin bundle cbort eio_main cmdliner fmt logs vlog tty))
+93
lib/daemon/config.ml
··· 1 + (*--------------------------------------------------------------------------- 2 + Copyright (c) 2025 Thomas Gazagnaire. All rights reserved. 3 + SPDX-License-Identifier: ISC 4 + ---------------------------------------------------------------------------*) 5 + 6 + (** Borealis configuration using YAML. *) 7 + 8 + module Log = (val Logs.src_log (Logs.Src.create "borealis.config")) 9 + 10 + type peer = { node_id : int64; host : string; port : int } 11 + 12 + type contact = { 13 + from_node : int64; 14 + to_node : int64; 15 + start : float; 16 + stop : float; 17 + rate : float; 18 + } 19 + 20 + type t = { 21 + node_id : int64; 22 + port : int; 23 + peers : peer list; 24 + contacts : contact list; 25 + } 26 + 27 + (* Jsont codecs *) 28 + let peer_jsont : peer Jsont.t = 29 + let make node_id host port : peer = { node_id; host; port } in 30 + Jsont.Object.map ~kind:"peer" make 31 + |> Jsont.Object.mem "node_id" Jsont.int64 ~enc:(fun (p : peer) -> p.node_id) 32 + |> Jsont.Object.mem "host" Jsont.string ~enc:(fun (p : peer) -> p.host) 33 + |> Jsont.Object.mem "port" Jsont.int ~enc:(fun (p : peer) -> p.port) 34 + |> Jsont.Object.finish 35 + 36 + let contact_jsont : contact Jsont.t = 37 + let make from_node to_node start stop rate : contact = 38 + { from_node; to_node; start; stop; rate } 39 + in 40 + Jsont.Object.map ~kind:"contact" make 41 + |> Jsont.Object.mem "from" Jsont.int64 ~enc:(fun (c : contact) -> c.from_node) 42 + |> Jsont.Object.mem "to" Jsont.int64 ~enc:(fun (c : contact) -> c.to_node) 43 + |> Jsont.Object.mem "start" Jsont.number ~enc:(fun (c : contact) -> c.start) 44 + |> Jsont.Object.mem "stop" Jsont.number ~enc:(fun (c : contact) -> c.stop) 45 + |> Jsont.Object.mem "rate" Jsont.number ~enc:(fun (c : contact) -> c.rate) 46 + |> Jsont.Object.finish 47 + 48 + let jsont : t Jsont.t = 49 + let make node_id port peers contacts : t = 50 + { node_id; port; peers; contacts } 51 + in 52 + Jsont.Object.map ~kind:"config" make 53 + |> Jsont.Object.mem "node_id" Jsont.int64 ~dec_absent:1L ~enc:(fun (c : t) -> 54 + c.node_id) 55 + |> Jsont.Object.mem "port" Jsont.int ~dec_absent:4556 ~enc:(fun (c : t) -> 56 + c.port) 57 + |> Jsont.Object.mem "peers" (Jsont.list peer_jsont) ~dec_absent:[] 58 + ~enc:(fun (c : t) -> c.peers) 59 + |> Jsont.Object.mem "contacts" (Jsont.list contact_jsont) ~dec_absent:[] 60 + ~enc:(fun (c : t) -> c.contacts) 61 + |> Jsont.Object.finish 62 + 63 + let default = { node_id = 1L; port = 4556; peers = []; contacts = [] } 64 + 65 + let load file = 66 + let ic = open_in file in 67 + let n = in_channel_length ic in 68 + let content = really_input_string ic n in 69 + close_in ic; 70 + match Yamlt.decode_string jsont content with 71 + | Ok config -> 72 + Log.info (fun m -> 73 + m "Loaded: node=%Ld port=%d peers=%d contacts=%d" config.node_id 74 + config.port (List.length config.peers) 75 + (List.length config.contacts)); 76 + Ok config 77 + | Error e -> 78 + Log.err (fun m -> m "Config error: %s" e); 79 + Error e 80 + 81 + let to_contact_plan config = 82 + List.fold_left 83 + (fun plan c -> 84 + let from = Cgr.Node.v (Int64.to_string c.from_node) in 85 + let to_ = Cgr.Node.v (Int64.to_string c.to_node) in 86 + let contact = 87 + Cgr.Contact.v ~from ~to_ ~start:c.start ~stop:c.stop ~rate:c.rate () 88 + in 89 + Cgr.Contact_plan.add contact plan) 90 + Cgr.Contact_plan.empty config.contacts 91 + 92 + let peer_address (p : peer) : Eio.Net.Sockaddr.stream = 93 + `Tcp (Eio.Net.Ipaddr.V4.loopback, p.port)
+198
lib/daemon/daemon.ml
··· 1 + (*--------------------------------------------------------------------------- 2 + Copyright (c) 2025 Thomas Gazagnaire. All rights reserved. 3 + SPDX-License-Identifier: ISC 4 + ---------------------------------------------------------------------------*) 5 + 6 + (** Borealis daemon - network coordination for DTN node. *) 7 + 8 + module Log = (val Logs.src_log (Logs.Src.create "borealis.daemon")) 9 + 10 + type peer = { 11 + node : Cgr.Node.t; 12 + address : Eio.Net.Sockaddr.stream; 13 + mutable connection : Tcpcl_adapter.t option; 14 + } 15 + 16 + type t = { 17 + engine : Engine.t; 18 + peers : (string, peer) Hashtbl.t; 19 + mutable running : bool; 20 + } 21 + 22 + let create ~engine = { engine; peers = Hashtbl.create 16; running = false } 23 + 24 + let add_peer t ~node ~address = 25 + let name = Cgr.Node.name node in 26 + if not (Hashtbl.mem t.peers name) then ( 27 + Hashtbl.add t.peers name { node; address; connection = None }; 28 + Log.info (fun m -> m "Added peer %s at %a" name Eio.Net.Sockaddr.pp address)) 29 + 30 + let node_of_eid = function 31 + | Bundle.Dtn_none -> None 32 + | Bundle.Dtn path -> Some (Cgr.Node.v path) 33 + | Bundle.Ipn (node, _) -> Some (Cgr.Node.v (Int64.to_string node)) 34 + 35 + let find_peer t eid = 36 + match node_of_eid eid with 37 + | None -> None 38 + | Some node -> 39 + let name = Cgr.Node.name node in 40 + Hashtbl.find_opt t.peers name 41 + 42 + let connect_peer t ~sw ~net peer = 43 + match peer.connection with 44 + | Some conn when Tcpcl_adapter.is_connected conn -> () 45 + | _ -> ( 46 + let config = Engine.config t.engine in 47 + Log.info (fun m -> 48 + m "Connecting to %s at %a" (Cgr.Node.name peer.node) 49 + Eio.Net.Sockaddr.pp peer.address); 50 + try 51 + let conn = 52 + Tcpcl_adapter.connect ~sw ~net ~local_eid:config.local_eid 53 + ~addr:peer.address 54 + in 55 + if Tcpcl_adapter.is_connected conn then ( 56 + peer.connection <- Some conn; 57 + Log.info (fun m -> m "Connected to %s" (Cgr.Node.name peer.node))) 58 + else 59 + Log.warn (fun m -> 60 + m "Failed to connect to %s" (Cgr.Node.name peer.node)) 61 + with exn -> 62 + Log.warn (fun m -> 63 + m "Connection to %s failed: %s" (Cgr.Node.name peer.node) 64 + (Printexc.to_string exn))) 65 + 66 + let send_bundle t ~sw ~net bundle next_hop = 67 + match find_peer t next_hop with 68 + | None -> 69 + Log.warn (fun m -> m "No peer for %a" Bundle.pp_eid next_hop); 70 + Error "no peer" 71 + | Some peer -> ( 72 + connect_peer t ~sw ~net peer; 73 + match peer.connection with 74 + | None -> Error "not connected" 75 + | Some conn -> ( 76 + match Tcpcl_adapter.send_bundle conn bundle with 77 + | Ok () -> 78 + Log.debug (fun m -> m "Sent bundle to %a" Bundle.pp_eid next_hop); 79 + Ok () 80 + | Error e -> 81 + Log.warn (fun m -> 82 + m "Failed to send to %a: %s" Bundle.pp_eid next_hop e); 83 + Error e)) 84 + 85 + let process_bundle t ~sw ~net bundle = 86 + let result = Engine.process t.engine bundle ~tenant:None in 87 + match result with 88 + | Engine.Forward reqs -> 89 + List.iter 90 + (fun (req : Engine.forward_request) -> 91 + ignore (send_bundle t ~sw ~net req.bundle req.next_hop)) 92 + reqs 93 + | Engine.Stored -> Log.debug (fun m -> m "Bundle stored") 94 + | Engine.Delivered -> Log.info (fun m -> m "Bundle delivered locally") 95 + | Engine.Dropped reason -> Log.info (fun m -> m "Bundle dropped: %s" reason) 96 + | Engine.Admin_handled -> ( 97 + match Admin.extract bundle with 98 + | Error _ -> () 99 + | Ok record -> ( 100 + match Engine.process_admin t.engine record with 101 + | None -> () 102 + | Some response -> 103 + (* Send response back *) 104 + let source = bundle.Bundle.primary.source in 105 + let dest = (Engine.config t.engine).admin_eid in 106 + let timestamp : Bundle.timestamp = 107 + { 108 + time = Int64.of_float (Unix.gettimeofday () *. 1000.); 109 + seq = 0L; 110 + } 111 + in 112 + let resp_bundle = 113 + Admin.make_bundle ~source:dest ~destination:source ~timestamp 114 + response 115 + in 116 + ignore (send_bundle t ~sw ~net resp_bundle source))) 117 + 118 + let handle_connection t ~sw ~net socket = 119 + let config = Engine.config t.engine in 120 + let conn = Tcpcl_adapter.accept ~sw ~local_eid:config.local_eid socket in 121 + if Tcpcl_adapter.is_connected conn then ( 122 + Log.info (fun m -> 123 + m "Accepted connection from %s" 124 + (Option.value ~default:"unknown" (Tcpcl_adapter.peer_eid conn))); 125 + (* Receive loop *) 126 + let rec loop () = 127 + match Tcpcl_adapter.receive conn with 128 + | Ok bundle -> 129 + process_bundle t ~sw ~net bundle; 130 + loop () 131 + | Error e -> 132 + Log.debug (fun m -> m "Connection closed: %s" e); 133 + Tcpcl_adapter.close_connection conn 134 + in 135 + loop ()) 136 + 137 + let listen t ~sw ~net ~port = 138 + let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, port) in 139 + let socket = Eio.Net.listen ~sw ~backlog:10 ~reuse_addr:true net addr in 140 + Log.info (fun m -> m "Listening on port %d" port); 141 + let rec accept_loop () = 142 + let conn, _addr = Eio.Net.accept ~sw socket in 143 + Eio.Fiber.fork ~sw (fun () -> handle_connection t ~sw ~net conn); 144 + accept_loop () 145 + in 146 + accept_loop () 147 + 148 + let check_contacts t ~sw ~net = 149 + let plan = Engine.contact_plan t.engine in 150 + let now = Unix.gettimeofday () in 151 + let active = Cgr.Contact_plan.active_at plan ~time:now in 152 + List.iter 153 + (fun contact -> 154 + let to_node = Cgr.Contact.to_ contact in 155 + let name = Cgr.Node.name to_node in 156 + match Hashtbl.find_opt t.peers name with 157 + | None -> () 158 + | Some peer -> 159 + (* Connect if contact is active and we're not connected *) 160 + connect_peer t ~sw ~net peer; 161 + (* Release stored bundles *) 162 + let reqs = Engine.on_contact_start t.engine contact in 163 + List.iter 164 + (fun (req : Engine.forward_request) -> 165 + ignore (send_bundle t ~sw ~net req.bundle req.next_hop)) 166 + reqs) 167 + active 168 + 169 + let check_routes t ~sw ~net = 170 + let reqs = Engine.check_routes t.engine in 171 + List.iter 172 + (fun (req : Engine.forward_request) -> 173 + ignore (send_bundle t ~sw ~net req.bundle req.next_hop)) 174 + reqs 175 + 176 + let periodic_tasks t ~sw ~net ~clock ~interval = 177 + let rec loop () = 178 + Eio.Time.sleep clock interval; 179 + if t.running then ( 180 + check_contacts t ~sw ~net; 181 + check_routes t ~sw ~net; 182 + let now = Eio.Time.now clock in 183 + let _ = Engine.cleanup_expired t.engine ~current_time:now in 184 + let stats = Engine.stats t.engine in 185 + Log.debug (fun m -> m "Stats: %a" Engine.pp_stats stats); 186 + loop ()) 187 + in 188 + loop () 189 + 190 + let run t ~sw ~net ~clock ~port = 191 + t.running <- true; 192 + Eio.Fiber.all 193 + [ 194 + (fun () -> listen t ~sw ~net ~port); 195 + (fun () -> periodic_tasks t ~sw ~net ~clock ~interval:5.0); 196 + ] 197 + 198 + let stop t = t.running <- false
+15
lib/daemon/dune
··· 1 + (library 2 + (name daemon) 3 + (public_name borealis.daemon) 4 + (wrapped false) 5 + (libraries 6 + borealis.forwarding 7 + borealis.admin 8 + borealis.adapters 9 + bundle 10 + cgr 11 + eio 12 + jsont 13 + yamlt 14 + logs 15 + fmt))