this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add bidirectional channel abstraction for push messages

This adds the Channel module which extends the RPC model to support:
- Push messages from server to client (one-way notifications)
- Event messages from client to server (widget interactions)

Message types:
- Request: client → server (expects response, like RPC)
- Response: server → client (matches request ID)
- Push: server → client (stdout/stderr streaming, widget updates, progress)
- Event: client → server (widget events)

Push kinds:
- Output: streaming stdout/stderr
- Widget_update: widget state changes
- Progress: task progress notifications
- Custom_push: extensible custom push types

Event kinds:
- Widget_event: user interactions (clicks, input changes)
- Custom_event: extensible custom event types

All messages use CBOR encoding for efficient binary transport.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+373 -1
+166
idl/channel.ml
··· 1 + (** Bidirectional message channel for worker communication. *) 2 + 3 + type push_kind = 4 + | Output of { stream : [ `Stdout | `Stderr ]; data : string } 5 + | Widget_update of { widget_id : string; state : Rpc.t } 6 + | Progress of { task_id : string; percent : int; message : string option } 7 + | Custom_push of { kind : string; data : Rpc.t } 8 + 9 + type event_kind = 10 + | Widget_event of { widget_id : string; event_type : string; data : Rpc.t } 11 + | Custom_event of { kind : string; data : Rpc.t } 12 + 13 + type message = 14 + | Request of { id : int64; call : Rpc.call } 15 + | Response of { id : int64; response : Rpc.response } 16 + | Push of push_kind 17 + | Event of event_kind 18 + 19 + (* CBOR tags for message discrimination *) 20 + let tag_request = 0 21 + let tag_response = 1 22 + let tag_push = 2 23 + let tag_event = 3 24 + 25 + (* CBOR tags for push kinds *) 26 + let push_tag_output = 0 27 + let push_tag_widget_update = 1 28 + let push_tag_progress = 2 29 + let push_tag_custom = 3 30 + 31 + (* CBOR tags for event kinds *) 32 + let event_tag_widget_event = 0 33 + let event_tag_custom = 1 34 + 35 + (* Stream tags *) 36 + let stream_stdout = 0 37 + let stream_stderr = 1 38 + 39 + (* Codecs for push_kind *) 40 + let push_kind_codec : push_kind Cbort.t = 41 + let open Cbort in 42 + let case_output = 43 + Variant.case push_tag_output 44 + (tuple2 int string) 45 + (fun (stream_int, data) -> 46 + let stream = if stream_int = stream_stdout then `Stdout else `Stderr in 47 + Output { stream; data }) 48 + (function 49 + | Output { stream; data } -> 50 + let stream_int = match stream with `Stdout -> stream_stdout | `Stderr -> stream_stderr in 51 + Some (stream_int, data) 52 + | _ -> None) 53 + in 54 + let case_widget_update = 55 + Variant.case push_tag_widget_update 56 + (tuple2 string Rpc_cbor.codec) 57 + (fun (widget_id, state) -> Widget_update { widget_id; state }) 58 + (function 59 + | Widget_update { widget_id; state } -> Some (widget_id, state) 60 + | _ -> None) 61 + in 62 + let case_progress = 63 + Variant.case push_tag_progress 64 + (tuple3 string int (nullable string)) 65 + (fun (task_id, percent, message) -> Progress { task_id; percent; message }) 66 + (function 67 + | Progress { task_id; percent; message } -> Some (task_id, percent, message) 68 + | _ -> None) 69 + in 70 + let case_custom = 71 + Variant.case push_tag_custom 72 + (tuple2 string Rpc_cbor.codec) 73 + (fun (kind, data) -> Custom_push { kind; data }) 74 + (function 75 + | Custom_push { kind; data } -> Some (kind, data) 76 + | _ -> None) 77 + in 78 + Variant.variant [ case_output; case_widget_update; case_progress; case_custom ] 79 + 80 + (* Codecs for event_kind *) 81 + let event_kind_codec : event_kind Cbort.t = 82 + let open Cbort in 83 + let case_widget_event = 84 + Variant.case event_tag_widget_event 85 + (tuple3 string string Rpc_cbor.codec) 86 + (fun (widget_id, event_type, data) -> Widget_event { widget_id; event_type; data }) 87 + (function 88 + | Widget_event { widget_id; event_type; data } -> Some (widget_id, event_type, data) 89 + | _ -> None) 90 + in 91 + let case_custom = 92 + Variant.case event_tag_custom 93 + (tuple2 string Rpc_cbor.codec) 94 + (fun (kind, data) -> Custom_event { kind; data }) 95 + (function 96 + | Custom_event { kind; data } -> Some (kind, data) 97 + | _ -> None) 98 + in 99 + Variant.variant [ case_widget_event; case_custom ] 100 + 101 + (* Main message codec *) 102 + let message_codec : message Cbort.t = 103 + let open Cbort in 104 + let case_request = 105 + Variant.case tag_request 106 + (tuple2 int64 Rpc_cbor.call_codec) 107 + (fun (id, call) -> Request { id; call }) 108 + (function 109 + | Request { id; call } -> Some (id, call) 110 + | _ -> None) 111 + in 112 + let case_response = 113 + Variant.case tag_response 114 + (tuple2 int64 Rpc_cbor.response_codec) 115 + (fun (id, response) -> Response { id; response }) 116 + (function 117 + | Response { id; response } -> Some (id, response) 118 + | _ -> None) 119 + in 120 + let case_push = 121 + Variant.case tag_push 122 + push_kind_codec 123 + (fun kind -> Push kind) 124 + (function 125 + | Push kind -> Some kind 126 + | _ -> None) 127 + in 128 + let case_event = 129 + Variant.case tag_event 130 + event_kind_codec 131 + (fun kind -> Event kind) 132 + (function 133 + | Event kind -> Some kind 134 + | _ -> None) 135 + in 136 + Variant.variant [ case_request; case_response; case_push; case_event ] 137 + 138 + let encode msg = Cbort.encode_string message_codec msg 139 + 140 + let decode s = 141 + match Cbort.decode_string message_codec s with 142 + | Ok msg -> Ok msg 143 + | Error e -> Error (Cbort.Error.to_string e) 144 + 145 + let decode_exn s = 146 + match decode s with 147 + | Ok msg -> msg 148 + | Error e -> failwith e 149 + 150 + let encode_request id call = encode (Request { id; call }) 151 + 152 + let encode_response id response = encode (Response { id; response }) 153 + 154 + let encode_push kind = encode (Push kind) 155 + 156 + let encode_event kind = encode (Event kind) 157 + 158 + let push_stdout data = encode_push (Output { stream = `Stdout; data }) 159 + 160 + let push_stderr data = encode_push (Output { stream = `Stderr; data }) 161 + 162 + let push_widget_update ~widget_id state = 163 + encode_push (Widget_update { widget_id; state }) 164 + 165 + let push_progress ~task_id ~percent ?message () = 166 + encode_push (Progress { task_id; percent; message })
+74
idl/channel.mli
··· 1 + (** Bidirectional message channel for worker communication. 2 + 3 + This module extends the RPC model to support push messages from 4 + server to client, enabling: 5 + - Streaming output (stdout/stderr) 6 + - Widget state updates 7 + - Progress notifications 8 + 9 + Message types: 10 + - Request: client → server (expects response) 11 + - Response: server → client (matches request ID) 12 + - Push: server → client (one-way notification) 13 + - Event: client → server (widget interactions, no response) 14 + *) 15 + 16 + (** {1 Message Types} *) 17 + 18 + type push_kind = 19 + | Output of { stream : [ `Stdout | `Stderr ]; data : string } 20 + | Widget_update of { widget_id : string; state : Rpc.t } 21 + | Progress of { task_id : string; percent : int; message : string option } 22 + | Custom_push of { kind : string; data : Rpc.t } 23 + (** Types of push messages from server to client. *) 24 + 25 + type event_kind = 26 + | Widget_event of { widget_id : string; event_type : string; data : Rpc.t } 27 + | Custom_event of { kind : string; data : Rpc.t } 28 + (** Types of event messages from client to server. *) 29 + 30 + type message = 31 + | Request of { id : int64; call : Rpc.call } 32 + | Response of { id : int64; response : Rpc.response } 33 + | Push of push_kind 34 + | Event of event_kind 35 + (** A message in the channel protocol. *) 36 + 37 + (** {1 Encoding/Decoding} *) 38 + 39 + val encode : message -> string 40 + (** [encode msg] encodes a message to CBOR. *) 41 + 42 + val decode : string -> (message, string) result 43 + (** [decode s] decodes a CBOR message. *) 44 + 45 + val decode_exn : string -> message 46 + (** [decode_exn s] decodes a CBOR message, raising on error. *) 47 + 48 + (** {1 Convenience Functions} *) 49 + 50 + val encode_request : int64 -> Rpc.call -> string 51 + (** [encode_request id call] encodes an RPC request. *) 52 + 53 + val encode_response : int64 -> Rpc.response -> string 54 + (** [encode_response id response] encodes an RPC response. *) 55 + 56 + val encode_push : push_kind -> string 57 + (** [encode_push kind] encodes a push notification. *) 58 + 59 + val encode_event : event_kind -> string 60 + (** [encode_event kind] encodes a client event. *) 61 + 62 + (** {1 Push Message Helpers} *) 63 + 64 + val push_stdout : string -> string 65 + (** [push_stdout data] creates an encoded stdout push message. *) 66 + 67 + val push_stderr : string -> string 68 + (** [push_stderr data] creates an encoded stderr push message. *) 69 + 70 + val push_widget_update : widget_id:string -> Rpc.t -> string 71 + (** [push_widget_update ~widget_id state] creates a widget update message. *) 72 + 73 + val push_progress : task_id:string -> percent:int -> ?message:string -> unit -> string 74 + (** [push_progress ~task_id ~percent ?message ()] creates a progress message. *)
+1 -1
idl/dune
··· 1 1 (library 2 2 (name js_top_worker_rpc) 3 3 (public_name js_top_worker-rpc) 4 - (modules toplevel_api_gen rpc_cbor transport) 4 + (modules toplevel_api_gen rpc_cbor transport channel) 5 5 (libraries rresult mime_printer merlin-lib.query_protocol rpclib rpclib.json cbort)) 6 6 7 7 (library
+6
idl/rpc_cbor.mli
··· 21 21 22 22 Convenience functions for encoding/decoding RPC calls and responses. *) 23 23 24 + val call_codec : Rpc.call Cbort.t 25 + (** Codec for RPC calls. *) 26 + 27 + val response_codec : Rpc.response Cbort.t 28 + (** Codec for RPC responses. *) 29 + 24 30 val encode_call : Rpc.call -> string 25 31 (** [encode_call c] encodes an RPC call to CBOR. *) 26 32
+123
test/channel/channel_test.ml
··· 1 + (** Tests for the Channel module (push message support). *) 2 + 3 + open Js_top_worker_rpc 4 + 5 + let test_request_roundtrip () = 6 + let id = 42L in 7 + let call = Rpc.{ name = "test_method"; params = [Rpc.String "arg1"]; is_notification = false } in 8 + let encoded = Channel.encode_request id call in 9 + match Channel.decode encoded with 10 + | Ok (Channel.Request { id = id'; call = call' }) -> 11 + assert (id = id'); 12 + assert (call.name = call'.name); 13 + print_endline "Request roundtrip: OK" 14 + | Ok _ -> failwith "Wrong message type" 15 + | Error e -> failwith ("Decode error: " ^ e) 16 + 17 + let test_response_roundtrip () = 18 + let id = 123L in 19 + let response = Rpc.{ success = true; contents = Rpc.String "result"; is_notification = false } in 20 + let encoded = Channel.encode_response id response in 21 + match Channel.decode encoded with 22 + | Ok (Channel.Response { id = id'; response = response' }) -> 23 + assert (id = id'); 24 + assert (response.success = response'.success); 25 + print_endline "Response roundtrip: OK" 26 + | Ok _ -> failwith "Wrong message type" 27 + | Error e -> failwith ("Decode error: " ^ e) 28 + 29 + let test_push_stdout () = 30 + let data = "Hello, world!" in 31 + let encoded = Channel.push_stdout data in 32 + match Channel.decode encoded with 33 + | Ok (Channel.Push (Channel.Output { stream = `Stdout; data = data' })) -> 34 + assert (data = data'); 35 + print_endline "Push stdout: OK" 36 + | Ok _ -> failwith "Wrong message type" 37 + | Error e -> failwith ("Decode error: " ^ e) 38 + 39 + let test_push_stderr () = 40 + let data = "Error message" in 41 + let encoded = Channel.push_stderr data in 42 + match Channel.decode encoded with 43 + | Ok (Channel.Push (Channel.Output { stream = `Stderr; data = data' })) -> 44 + assert (data = data'); 45 + print_endline "Push stderr: OK" 46 + | Ok _ -> failwith "Wrong message type" 47 + | Error e -> failwith ("Decode error: " ^ e) 48 + 49 + let test_push_widget_update () = 50 + let widget_id = "widget_1" in 51 + let state = Rpc.Dict [("value", Rpc.Int 42L)] in 52 + let encoded = Channel.push_widget_update ~widget_id state in 53 + match Channel.decode encoded with 54 + | Ok (Channel.Push (Channel.Widget_update { widget_id = id'; state = state' })) -> 55 + assert (widget_id = id'); 56 + assert (state = state'); 57 + print_endline "Push widget_update: OK" 58 + | Ok _ -> failwith "Wrong message type" 59 + | Error e -> failwith ("Decode error: " ^ e) 60 + 61 + let test_push_progress () = 62 + let task_id = "task_1" in 63 + let percent = 50 in 64 + let message = Some "Processing..." in 65 + let encoded = Channel.push_progress ~task_id ~percent ?message () in 66 + match Channel.decode encoded with 67 + | Ok (Channel.Push (Channel.Progress { task_id = id'; percent = p'; message = m' })) -> 68 + assert (task_id = id'); 69 + assert (percent = p'); 70 + assert (message = m'); 71 + print_endline "Push progress: OK" 72 + | Ok _ -> failwith "Wrong message type" 73 + | Error e -> failwith ("Decode error: " ^ e) 74 + 75 + let test_event_widget () = 76 + let widget_id = "widget_1" in 77 + let event_type = "click" in 78 + let data = Rpc.Dict [("x", Rpc.Int 100L); ("y", Rpc.Int 200L)] in 79 + let event = Channel.Widget_event { widget_id; event_type; data } in 80 + let encoded = Channel.encode_event event in 81 + match Channel.decode encoded with 82 + | Ok (Channel.Event (Channel.Widget_event { widget_id = id'; event_type = et'; data = d' })) -> 83 + assert (widget_id = id'); 84 + assert (event_type = et'); 85 + assert (data = d'); 86 + print_endline "Event widget: OK" 87 + | Ok _ -> failwith "Wrong message type" 88 + | Error e -> failwith ("Decode error: " ^ e) 89 + 90 + let test_custom_push () = 91 + let kind = "my_custom_push" in 92 + let data = Rpc.Enum [Rpc.String "a"; Rpc.String "b"] in 93 + let push = Channel.Custom_push { kind; data } in 94 + let encoded = Channel.encode_push push in 95 + match Channel.decode encoded with 96 + | Ok (Channel.Push (Channel.Custom_push { kind = k'; data = d' })) -> 97 + assert (kind = k'); 98 + assert (data = d'); 99 + print_endline "Custom push: OK" 100 + | Ok _ -> failwith "Wrong message type" 101 + | Error e -> failwith ("Decode error: " ^ e) 102 + 103 + let () = 104 + print_endline "=== Channel Tests ==="; 105 + print_newline (); 106 + 107 + test_request_roundtrip (); 108 + test_response_roundtrip (); 109 + 110 + print_newline (); 111 + print_endline "=== Push Message Tests ==="; 112 + test_push_stdout (); 113 + test_push_stderr (); 114 + test_push_widget_update (); 115 + test_push_progress (); 116 + test_custom_push (); 117 + 118 + print_newline (); 119 + print_endline "=== Event Tests ==="; 120 + test_event_widget (); 121 + 122 + print_newline (); 123 + print_endline "All channel tests passed!"
+3
test/channel/dune
··· 1 + (test 2 + (name channel_test) 3 + (libraries js_top_worker-rpc))