My aggregated monorepo of OCaml code, automaintained
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add DAG-based build executor for --fork mode

Instead of forking per-target (where each child independently walks
the full dependency chain), build a global DAG of unique build layers
across all solutions and execute in topological order. This ensures
shared dependencies (e.g., the compiler) are built once before their
dependents are launched, maximizing actual parallelism at each tier.

The DAG executor:
- Flattens all solutions into deduplicated build nodes keyed by hash
- Topologically sorts the global DAG
- Maintains a ready-queue of layers whose deps are all built
- Launches up to N concurrent runc containers
- Handles cache hits (pre-existing layers) without forking
- Propagates dependency failures without building

The sequential (--fork 1 / no --fork) path is unchanged.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+239 -18
+239 -18
day10/bin/main.ml
··· 1165 1165 universe_result.referenced universe_result.deleted 1166 1166 | None -> () 1167 1167 1168 + (** A node in the global build DAG. Represents a single build layer. *) 1169 + type build_node = { 1170 + pkg : OpamPackage.t; 1171 + build_hash : string; (* "build-{hash}" *) 1172 + ordered_deps : OpamPackage.t list; (* packages this depends on, in topo order *) 1173 + dep_build_hashes : string list; (* build hashes of deps, in order *) 1174 + } 1175 + 1176 + (** Build a global DAG of build layers from all solutions. 1177 + Returns a list of build_nodes in topological order (deps first), 1178 + deduplicated by build_hash. *) 1179 + let build_global_dag ~(config : Config.t) solutions = 1180 + let t = Container.init ~config in 1181 + (* Collect all (build_hash -> build_node) across all solutions *) 1182 + let nodes : (string, build_node) Hashtbl.t = Hashtbl.create 1024 in 1183 + (* Track dep edges: build_hash -> set of dep build_hashes *) 1184 + let edges : (string, string list) Hashtbl.t = Hashtbl.create 1024 in 1185 + (* Track ordering within each solution *) 1186 + let all_ordered : (string * string list) list ref = ref [] in 1187 + List.iter (fun (_target, solution) -> 1188 + let ordered = topological_sort solution in 1189 + let dependencies = pkg_deps solution ordered in 1190 + (* Map from pkg -> build_hash for this solution *) 1191 + let pkg_to_hash : (string, string) Hashtbl.t = Hashtbl.create 64 in 1192 + List.iter (fun pkg -> 1193 + let ordered_deps = extract_dag dependencies pkg |> topological_sort |> List.rev |> List.tl in 1194 + let hash = Container.layer_hash ~t (pkg :: ordered_deps) in 1195 + let build_hash = "build-" ^ hash in 1196 + let dep_build_hashes = List.filter_map (fun dep -> 1197 + Hashtbl.find_opt pkg_to_hash (OpamPackage.to_string dep) 1198 + ) ordered_deps in 1199 + Hashtbl.replace pkg_to_hash (OpamPackage.to_string pkg) build_hash; 1200 + if not (Hashtbl.mem nodes build_hash) then begin 1201 + Hashtbl.replace nodes build_hash { pkg; build_hash; ordered_deps; dep_build_hashes }; 1202 + Hashtbl.replace edges build_hash dep_build_hashes; 1203 + all_ordered := (build_hash, dep_build_hashes) :: !all_ordered 1204 + end 1205 + ) ordered 1206 + ) solutions; 1207 + Container.deinit ~t; 1208 + (* Topological sort of the global DAG by build_hash *) 1209 + let remaining : (string, int) Hashtbl.t = Hashtbl.create (Hashtbl.length nodes) in 1210 + let rdeps : (string, string list) Hashtbl.t = Hashtbl.create (Hashtbl.length nodes) in 1211 + Hashtbl.iter (fun hash deps -> 1212 + let dep_count = List.fold_left (fun acc d -> 1213 + if Hashtbl.mem nodes d then begin 1214 + Hashtbl.replace rdeps d (hash :: (try Hashtbl.find rdeps d with Not_found -> [])); 1215 + acc + 1 1216 + end else acc 1217 + ) 0 deps in 1218 + Hashtbl.replace remaining hash dep_count 1219 + ) edges; 1220 + let result = ref [] in 1221 + let ready = Queue.create () in 1222 + Hashtbl.iter (fun hash count -> 1223 + if count = 0 then Queue.push hash ready 1224 + ) remaining; 1225 + while not (Queue.is_empty ready) do 1226 + let hash = Queue.pop ready in 1227 + result := hash :: !result; 1228 + List.iter (fun dependent -> 1229 + let count = Hashtbl.find remaining dependent in 1230 + let new_count = count - 1 in 1231 + Hashtbl.replace remaining dependent new_count; 1232 + if new_count = 0 then Queue.push dependent ready 1233 + ) (try Hashtbl.find rdeps hash with Not_found -> []) 1234 + done; 1235 + (* result is in reverse topo order, reverse it *) 1236 + let ordered_hashes = List.rev !result in 1237 + List.filter_map (fun h -> Hashtbl.find_opt nodes h) ordered_hashes 1238 + 1239 + (** Execute build layers in dependency order with up to [np] concurrent runc containers. 1240 + Calls [build_one node] for each node; the function should return true on success. *) 1241 + let execute_dag ~np ~on_complete ~cache_dir ~os_key (nodes : build_node list) (build_one : build_node -> bool) = 1242 + (* Index: build_hash -> node *) 1243 + let node_of_hash : (string, build_node) Hashtbl.t = Hashtbl.create (List.length nodes) in 1244 + List.iter (fun n -> Hashtbl.replace node_of_hash n.build_hash n) nodes; 1245 + (* Track completion status *) 1246 + let completed : (string, bool) Hashtbl.t = Hashtbl.create (List.length nodes) in 1247 + (* Remaining dep count per node *) 1248 + let remaining_deps : (string, int) Hashtbl.t = Hashtbl.create (List.length nodes) in 1249 + (* Reverse deps: hash -> list of hashes that depend on it *) 1250 + let rdeps : (string, string list) Hashtbl.t = Hashtbl.create (List.length nodes) in 1251 + List.iter (fun node -> 1252 + let count = List.fold_left (fun acc dep_hash -> 1253 + if Hashtbl.mem node_of_hash dep_hash then begin 1254 + let existing = try Hashtbl.find rdeps dep_hash with Not_found -> [] in 1255 + Hashtbl.replace rdeps dep_hash (node.build_hash :: existing); 1256 + acc + 1 1257 + end else 1258 + (* Dep not in our DAG — already built from a previous run *) 1259 + acc 1260 + ) 0 node.dep_build_hashes in 1261 + Hashtbl.replace remaining_deps node.build_hash count 1262 + ) nodes; 1263 + (* Ready queue *) 1264 + let ready = Queue.create () in 1265 + List.iter (fun node -> 1266 + if Hashtbl.find remaining_deps node.build_hash = 0 then 1267 + Queue.push node ready 1268 + ) nodes; 1269 + (* Running: pid -> build_hash *) 1270 + let running : (int, string) Hashtbl.t = Hashtbl.create np in 1271 + let total = List.length nodes in 1272 + let completed_count = ref 0 in 1273 + let failed_count = ref 0 in 1274 + let reap_one () = 1275 + let pid, status = Unix.waitpid [] (-1) in 1276 + let exit_code = match status with 1277 + | Unix.WEXITED c -> c 1278 + | _ -> 1 1279 + in 1280 + match Hashtbl.find_opt running pid with 1281 + | Some hash -> 1282 + Hashtbl.remove running pid; 1283 + let success = exit_code = 0 in 1284 + Hashtbl.replace completed hash success; 1285 + incr completed_count; 1286 + if not success then incr failed_count; 1287 + on_complete ~total ~completed:!completed_count ~failed:!failed_count hash success; 1288 + (* Promote dependents *) 1289 + List.iter (fun dep_hash -> 1290 + let count = Hashtbl.find remaining_deps dep_hash - 1 in 1291 + Hashtbl.replace remaining_deps dep_hash count; 1292 + if count = 0 then begin 1293 + (* Check if all deps actually succeeded *) 1294 + let all_deps_ok = match Hashtbl.find_opt node_of_hash dep_hash with 1295 + | Some n -> List.for_all (fun dh -> 1296 + match Hashtbl.find_opt completed dh with 1297 + | Some true -> true 1298 + | _ -> not (Hashtbl.mem node_of_hash dh) (* not in DAG = pre-existing *) 1299 + ) n.dep_build_hashes 1300 + | None -> false 1301 + in 1302 + if all_deps_ok then 1303 + Queue.push (Hashtbl.find node_of_hash dep_hash) ready 1304 + else begin 1305 + (* Dependency failed — mark as failed without building *) 1306 + Hashtbl.replace completed dep_hash false; 1307 + incr completed_count; 1308 + incr failed_count; 1309 + on_complete ~total ~completed:!completed_count ~failed:!failed_count dep_hash false; 1310 + (* Propagate failure to this node's dependents too *) 1311 + let rec propagate_failure h = 1312 + List.iter (fun rdep_hash -> 1313 + let c = Hashtbl.find remaining_deps rdep_hash - 1 in 1314 + Hashtbl.replace remaining_deps rdep_hash c; 1315 + if c = 0 then begin 1316 + Hashtbl.replace completed rdep_hash false; 1317 + incr completed_count; 1318 + incr failed_count; 1319 + on_complete ~total ~completed:!completed_count ~failed:!failed_count rdep_hash false; 1320 + propagate_failure rdep_hash 1321 + end 1322 + ) (try Hashtbl.find rdeps h with Not_found -> []) 1323 + in 1324 + propagate_failure dep_hash 1325 + end 1326 + end 1327 + ) (try Hashtbl.find rdeps hash with Not_found -> []) 1328 + | None -> () 1329 + in 1330 + while !completed_count < total do 1331 + (* Launch ready nodes up to np *) 1332 + while Hashtbl.length running < np && not (Queue.is_empty ready) do 1333 + let node = Queue.pop ready in 1334 + (* Check if layer already exists (cached from previous run) *) 1335 + let layer_json = Path.(cache_dir / os_key / node.build_hash / "layer.json") in 1336 + if Sys.file_exists layer_json then begin 1337 + let exit_status = Util.load_layer_info_exit_status layer_json in 1338 + let success = exit_status = 0 in 1339 + Hashtbl.replace completed node.build_hash success; 1340 + incr completed_count; 1341 + if not success then incr failed_count; 1342 + on_complete ~total ~completed:!completed_count ~failed:!failed_count node.build_hash success; 1343 + (* Promote dependents *) 1344 + List.iter (fun dep_hash -> 1345 + let c = Hashtbl.find remaining_deps dep_hash - 1 in 1346 + Hashtbl.replace remaining_deps dep_hash c; 1347 + if c = 0 then begin 1348 + if success then 1349 + Queue.push (Hashtbl.find node_of_hash dep_hash) ready 1350 + (* If not success, will be handled by next reap cycle *) 1351 + end 1352 + ) (try Hashtbl.find rdeps node.build_hash with Not_found -> []) 1353 + end else begin 1354 + match Unix.fork () with 1355 + | 0 -> 1356 + Random.init (Unix.getpid () lxor int_of_float (Unix.gettimeofday () *. 1000000.)); 1357 + let success = (try build_one node with _ -> false) in 1358 + exit (if success then 0 else 1) 1359 + | child_pid -> 1360 + Hashtbl.replace running child_pid node.build_hash 1361 + end 1362 + done; 1363 + (* If we have running processes, wait for one to finish *) 1364 + if Hashtbl.length running > 0 then 1365 + reap_one () 1366 + (* If nothing running and nothing ready but not done, something is wrong *) 1367 + else if Queue.is_empty ready && !completed_count < total then begin 1368 + Printf.eprintf "DAG executor: deadlock detected (%d/%d completed, %d running)\n%!" 1369 + !completed_count total (Hashtbl.length running); 1370 + (* Force-complete remaining nodes as failures *) 1371 + List.iter (fun node -> 1372 + if not (Hashtbl.mem completed node.build_hash) then begin 1373 + Hashtbl.replace completed node.build_hash false; 1374 + incr completed_count; 1375 + incr failed_count 1376 + end 1377 + ) nodes 1378 + end 1379 + done; 1380 + (* Drain remaining running processes *) 1381 + while Hashtbl.length running > 0 do 1382 + reap_one () 1383 + done 1384 + 1168 1385 let run_batch (config : Config.t) package_arg = 1169 1386 let () = match Local_repo.validate config.local_repos with 1170 1387 | Ok () -> () ··· 1753 1970 (* Delete progress.json - summary.json takes over *) 1754 1971 Day10_lib.Progress.delete ~run_dir:(Day10_lib.Run_log.get_run_dir run_info) 1755 1972 | Some n -> 1756 - let completed = ref 0 in 1757 - let failed = ref 0 in 1973 + Printf.printf " Building global DAG...\n%!"; 1974 + let dag_nodes = build_global_dag ~config solutions in 1975 + Printf.printf " %d unique build layers in DAG\n%!" (List.length dag_nodes); 1976 + let os_key = Config.os_key ~config in 1977 + let t = Container.init ~config in 1978 + init t; 1758 1979 let last_reported = ref 0 in 1759 - let on_complete exit_code = 1760 - incr completed; 1761 - if exit_code <> 0 then incr failed; 1762 - (* Update progress.json after each target completion *) 1763 - progress_ref := Day10_lib.Progress.set_completed !progress_ref 1764 - ~build:!completed ~doc:!completed; 1765 - Day10_lib.Progress.write ~run_dir:(Day10_lib.Run_log.get_run_dir run_info) !progress_ref; 1766 - (* Report every 25 completions or at the end *) 1767 - if !completed - !last_reported >= 25 || !completed = total_targets then begin 1768 - (* Use fixed-width format with padding to overwrite previous content *) 1769 - Printf.printf "\r%-60s\r" ""; (* Clear line *) 1770 - if !failed > 0 then 1771 - Printf.printf "[Phase 3] %d/%d targets completed (%d failed)%!" !completed total_targets !failed 1980 + let on_complete ~total ~completed ~failed hash _success = 1981 + ignore hash; 1982 + if completed - !last_reported >= 25 || completed = total then begin 1983 + Printf.printf "\r%-60s\r" ""; 1984 + if failed > 0 then 1985 + Printf.printf "[Phase 3] %d/%d layers completed (%d failed)%!" completed total failed 1772 1986 else 1773 - Printf.printf "[Phase 3] %d/%d targets completed%!" !completed total_targets; 1774 - last_reported := !completed 1987 + Printf.printf "[Phase 3] %d/%d layers completed%!" completed total; 1988 + last_reported := completed 1775 1989 end 1776 1990 in 1777 - Os.fork_with_progress ~np:n ~on_complete run_with_target items; 1991 + let build_one (node : build_node) = 1992 + let r = build_layer t node.pkg node.build_hash node.ordered_deps node.dep_build_hashes in 1993 + match r with 1994 + | Success _ -> true 1995 + | _ -> false 1996 + in 1997 + execute_dag ~np:n ~on_complete ~cache_dir:config.dir ~os_key dag_nodes build_one; 1998 + Container.deinit ~t; 1778 1999 Printf.printf "\n%!"; 1779 2000 (* Run global deferred doc link pass for x-extra-doc-deps *) 1780 2001 run_global_deferred_doc_link config;