My aggregated monorepo of OCaml code, automaintained
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add progress callbacks and FRP async bridge

- Zarr_v3.Store.read: optional ~on_shard:(int -> int -> unit) callback
- Tessera_zarr.fetch_region: optional ~progress:(string -> unit) callback
threads shard progress messages from the reader
- Tessera_zarr_jsoo.Frp_async: bridge Note signals to Lwt async ops
with Loading/Ready/Error states and stale request cancellation
- Expose async fetch, sync fetch, and Frp_async from tessera-zarr-jsoo

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

+122 -24
+1 -1
tessera-zarr-jsoo/lib/dune
··· 1 1 (library 2 2 (name tessera_zarr_jsoo) 3 3 (public_name tessera-zarr-jsoo) 4 - (libraries zarr-v3 tessera-zarr js_of_ocaml js_of_ocaml-lwt lwt) 4 + (libraries zarr-v3 tessera-zarr js_of_ocaml js_of_ocaml-lwt lwt note) 5 5 (preprocess (pps js_of_ocaml-ppx)))
+52
tessera-zarr-jsoo/lib/frp_async.ml
··· 1 + (** Bridge between Note FRP signals and Lwt async operations. *) 2 + 3 + type 'a state = 4 + | Idle 5 + | Loading of string 6 + | Ready of 'a 7 + | Error of string 8 + 9 + let state_eq eq a b = match a, b with 10 + | Idle, Idle -> true 11 + | Loading a, Loading b -> String.equal a b 12 + | Ready a, Ready b -> eq a b 13 + | Error a, Error b -> String.equal a b 14 + | _ -> false 15 + 16 + let async_bind 17 + (type a b) 18 + ?(eq : b -> b -> bool = fun _ _ -> false) 19 + (signal : a option Note.signal) 20 + (f : a -> (string -> unit) -> b Lwt.t) 21 + : b state Note.signal = 22 + let (out_signal, set_out) = Note.S.create ~eq:(state_eq eq) Idle in 23 + let generation = ref 0 in 24 + (* Observe the input signal: when it changes, kick off the async work *) 25 + let logr = Note.Logr.create ( 26 + Note.Logr.app (Note.Logr.const (fun v -> 27 + incr generation; 28 + let my_gen = !generation in 29 + match v with 30 + | None -> 31 + set_out Idle 32 + | Some input -> 33 + set_out (Loading "starting..."); 34 + let progress msg = 35 + if !generation = my_gen then 36 + set_out (Loading msg) 37 + in 38 + Lwt.async (fun () -> 39 + Lwt.catch 40 + (fun () -> 41 + let open Lwt.Syntax in 42 + let+ result = f input progress in 43 + if !generation = my_gen then 44 + set_out (Ready result)) 45 + (fun exn -> 46 + if !generation = my_gen then 47 + set_out (Error (Printexc.to_string exn)); 48 + Lwt.return_unit)))) 49 + (Note.S.obs signal) 50 + ) in 51 + Note.Logr.hold logr; 52 + out_signal
+23
tessera-zarr-jsoo/lib/frp_async.mli
··· 1 + (** Bridge between Note FRP signals and Lwt async operations. *) 2 + 3 + type 'a state = 4 + | Idle (** No input / initial state *) 5 + | Loading of string (** Async operation in progress, with status message *) 6 + | Ready of 'a (** Operation completed successfully *) 7 + | Error of string (** Operation failed *) 8 + 9 + val async_bind : 10 + ?eq:('b -> 'b -> bool) -> 11 + 'a option Note.signal -> 12 + ('a -> (string -> unit) -> 'b Lwt.t) -> 13 + 'b state Note.signal 14 + (** [async_bind signal f] maps a signal through an async function. 15 + 16 + When [signal] changes to [Some v], starts [f v progress] asynchronously. 17 + [progress] can be called to update the loading status message. 18 + The output signal transitions through [Loading msg] → [Ready result]. 19 + 20 + When [signal] changes to [None], the output is [Idle]. 21 + 22 + If [signal] changes again before [f] completes, the previous 23 + computation's result is silently discarded (stale cancellation). *)
+4 -2
tessera-zarr-jsoo/lib/tessera_zarr_jsoo.ml
··· 60 60 let url = Printf.sprintf "%s/%d.zarr" base_url year in 61 61 Zarr_v3.Store.open_store ~fetch ~codecs url 62 62 63 - let fetch_region ?(base_url = "https://dl2.geotessera.org/zarr/v1") 63 + let fetch_region ?progress ?(base_url = "https://dl2.geotessera.org/zarr/v1") 64 64 ?(year = 2024) bbox = 65 65 let open Lwt.Syntax in 66 66 let* store = open_store ~base_url ~year () in 67 - Tessera_zarr.fetch_region ~store bbox 67 + Tessera_zarr.fetch_region ?progress ~store bbox 68 68 69 69 (* Synchronous version for notebook cells. 70 70 Uses synchronous XHR so all Lwt promises resolve immediately. *) ··· 80 80 match Lwt.poll result_lwt with 81 81 | Some v -> v 82 82 | None -> failwith "Tessera_zarr_jsoo: unexpected async in fetch_region" 83 + 84 + module Frp_async = Frp_async
+17 -15
tessera-zarr-jsoo/lib/tessera_zarr_jsoo.mli
··· 1 1 (** Browser backend for tessera-zarr. 2 2 3 - Provides synchronous XHR-based HTTP fetch with byte-range support 4 - and convenience wrappers for opening the GeoTessera Zarr store 5 - and fetching embeddings from the browser. *) 3 + Provides async and sync HTTP fetch with byte-range support, 4 + convenience wrappers for opening the GeoTessera Zarr store, 5 + and an FRP bridge for reactive notebooks. *) 6 6 7 7 val fetch : Zarr_v3.Store.fetch 8 - (** Synchronous HTTP fetch via [XMLHttpRequest] with [Range] header support. 9 - Returns an immediately-resolved [Lwt.t]. *) 8 + (** Async HTTP fetch via [XMLHttpRequest] with [Range] header support. *) 9 + 10 + val fetch_sync : Zarr_v3.Store.fetch 11 + (** Synchronous HTTP fetch. Returns immediately-resolved [Lwt.t]. *) 10 12 11 13 val codecs : Zarr_v3.Store.codec_registry 12 - (** Codec registry. Currently returns [None] for all codecs — 13 - the built-in Blosc memcpy handling is sufficient for 14 - GeoTessera's uncompressed data. *) 14 + (** Codec registry. Currently returns [None] for all codecs. *) 15 15 16 16 val open_store : ?base_url:string -> ?year:int -> unit -> Zarr_v3.Store.store Lwt.t 17 - (** Open the GeoTessera Zarr store. 18 - Default base URL: ["https://dl2.geotessera.org/zarr/v1"]. 19 - Default year: [2024]. *) 17 + (** Open the GeoTessera Zarr store (async fetch). *) 20 18 21 - val fetch_region : ?base_url:string -> ?year:int -> 19 + val fetch_region : ?progress:(string -> unit) -> ?base_url:string -> ?year:int -> 22 20 Geotessera.bbox -> (Linalg.mat * int * int * Geotessera.bbox) Lwt.t 23 - (** Async wrapper: opens the store and fetches embeddings for a bbox. *) 21 + (** Async: opens the store and fetches embeddings for a bbox. 22 + [progress] receives status messages during fetching. *) 24 23 25 24 val fetch_region_sync : ?base_url:string -> ?year:int -> 26 25 Geotessera.bbox -> Linalg.mat * int * int * Geotessera.bbox 27 - (** Synchronous convenience for notebook cells. Internally uses 28 - synchronous XHR so all Lwt promises resolve immediately. *) 26 + (** Synchronous convenience for imperative notebook cells. *) 27 + 28 + (** {1 FRP bridge} *) 29 + 30 + module Frp_async = Frp_async
+9 -3
tessera-zarr/lib/tessera_zarr.ml
··· 31 31 (Int32.shift_left (Int32.of_int (Char.code s.[off+3])) 24))) in 32 32 Int32.float_of_bits bits 33 33 34 - let fetch_region ~store bbox = 34 + let fetch_region ?(progress = fun (_:string) -> ()) ~store bbox = 35 35 let open Lwt.Syntax in 36 36 let open Geotessera in 37 37 (* 1. Determine UTM zone from bbox centre *) ··· 78 78 let* emb_arr = Zarr_v3.Store.open_array store (zone_name ^ "/embeddings") in 79 79 let* scales_arr = Zarr_v3.Store.open_array store (zone_name ^ "/scales") in 80 80 81 - let emb_fetch = Zarr_v3.Store.read emb_arr 81 + let on_emb_shard i n = 82 + progress (Printf.sprintf "Fetching embeddings: shard %d/%d" i n) in 83 + let on_scales_shard i n = 84 + progress (Printf.sprintf "Fetching scales: shard %d/%d" i n) in 85 + 86 + let emb_fetch = Zarr_v3.Store.read ~on_shard:on_emb_shard emb_arr 82 87 ~start:[| row_start; col_start; 0 |] 83 88 ~shape:[| tile_h; tile_w; n_features |] in 84 - let scales_fetch = Zarr_v3.Store.read scales_arr 89 + let scales_fetch = Zarr_v3.Store.read ~on_shard:on_scales_shard scales_arr 85 90 ~start:[| row_start; col_start |] 86 91 ~shape:[| tile_h; tile_w |] in 87 92 88 93 let* emb_data = emb_fetch 89 94 and* scales_data = scales_fetch in 90 95 96 + progress "Dequantizing..."; 91 97 (* 5. Dequantize: float32 = int8 × scale *) 92 98 let mat = Linalg.create_mat ~rows:(tile_h * tile_w) ~cols:n_features in 93 99 for i = 0 to tile_h - 1 do
+1
tessera-zarr/lib/tessera_zarr.mli
··· 51 51 (** {1 Fetching embeddings} *) 52 52 53 53 val fetch_region : 54 + ?progress:(string -> unit) -> 54 55 store:Zarr_v3.Store.store -> 55 56 Geotessera.bbox -> 56 57 (Linalg.mat * int * int * Geotessera.bbox) Lwt.t
+10 -1
zarr-v3/lib/store.ml
··· 172 172 done; 173 173 !idx 174 174 175 - let read arr ~start ~shape = 175 + let read ?on_shard arr ~start ~shape = 176 176 let open Lwt.Syntax in 177 177 let meta = arr.meta in 178 178 let ndim = Array.length meta.shape in ··· 205 205 let shard_start = Array.init ndim (fun d -> start.(d) / chunk_shape.(d)) in 206 206 let shard_stop = Array.init ndim (fun d -> (stop.(d) - 1) / chunk_shape.(d) + 1) in 207 207 208 + (* Count total shards *) 209 + let n_shards = Array.init ndim (fun d -> shard_stop.(d) - shard_start.(d)) 210 + |> Array.fold_left ( * ) 1 in 211 + let shards_done = ref 0 in 212 + 208 213 (* Iterate over all needed shards *) 209 214 let shard_tasks = ref [] in 210 215 ··· 219 224 let task = 220 225 (* Fetch the entire shard *) 221 226 let* shard_data = arr.store.fetch shard_url () in 227 + incr shards_done; 228 + (match on_shard with 229 + | Some f -> f !shards_done n_shards 230 + | None -> ()); 222 231 let shard_len = String.length shard_data in 223 232 224 233 (* Read the shard index from the end *)
+5 -2
zarr-v3/lib/store.mli
··· 103 103 104 104 (** {1 Reading data} *) 105 105 106 - val read : arr -> start:int array -> shape:int array -> string Lwt.t 107 - (** [read arr ~start ~shape] reads a rectangular region of an array. 106 + val read : ?on_shard:(int -> int -> unit) -> 107 + arr -> start:int array -> shape:int array -> string Lwt.t 108 + (** [read ?on_shard arr ~start ~shape] reads a rectangular region of an array. 108 109 109 110 [start] is the origin (inclusive) in pixel coordinates. 110 111 [shape] is the size of the region in each dimension. 111 112 Returns raw bytes in C-order. The caller must interpret the bytes 112 113 according to {!array_meta.data_type}. 114 + 115 + [on_shard i n] is called when shard [i] of [n] total has been fetched. 113 116 114 117 For sharded arrays, fetches only the shards that overlap the 115 118 requested region. Shard fetches run in parallel via [Lwt.join].