Real-time index of opencode sessions
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add mmap-first plan/resolve/hydrate tree APIs for zero-copy flow

Implement the Option C direction by introducing explicit staged tree methods that separate planning, reference resolution, and hydration:
- plan_session_tree -> StageResult<SessionTreePlan>
- resolve_session_tree -> StageResult<SessionRefTree>
- hydrate_session_tree -> StageResult<SessionHydratedTree>

This adds reference-tree node types (SessionRefNode/MessageRefNode/PartRefNode) and plan types keyed by canonical key structs. The resolve stage now maps files into MappedSpan leaves instead of eagerly deserializing payloads.

In the storage layer, add MappedSpan and span helpers (read_span/read_session_span/read_message_span/read_part_span/read_diff_span + parse_span). read_json now routes through span parsing, so the zero-copy substrate is used consistently.

Also wire new public exports for the staged/ref-tree API surface and mapped span type.

rektide d723cc6a 63eb2b2b

+322 -7
+4 -3
src/lib.rs
··· 19 19 pub use key::{EntityKey, MessageKey, PartKey, SessionKey}; 20 20 pub use loader::{LoadedSession, MessageWithParts, SessionLoader, SessionTree}; 21 21 pub use materializer::{ 22 - MessageFlowScope, SessionFlowOptions, SessionFlowResult, SessionFlowScope, SessionMaterializer, 23 - Stats as MaterializerStats, 22 + MessageFlowScope, MessageRefNode, MessageTreePlan, PartRefNode, SessionFlowOptions, 23 + SessionFlowResult, SessionFlowScope, SessionHydratedTree, SessionMaterializer, SessionRefNode, 24 + SessionRefTree, SessionTreePlan, Stats as MaterializerStats, 24 25 }; 25 26 pub use stage::{IssueKind, StageIssue, StageName, StageReport, StageResult}; 26 - pub use storage::{FileReader, MappedFile, MappedFileCache, StoragePaths}; 27 + pub use storage::{FileReader, MappedFile, MappedFileCache, MappedSpan, StoragePaths}; 27 28 pub use types::{ 28 29 message::{AssistantMessage, FileDiff, Message, UserMessage}, 29 30 part::Part,
+265 -1
src/materializer.rs
··· 1 1 use crate::id::{MessageId, PartId, SessionId}; 2 2 use crate::index::{MessageMeta, PartRef, SessionIndex, SessionMeta}; 3 + use crate::key::{MessageKey, PartKey, SessionKey}; 3 4 use crate::loader::{LoadedSession, MessageWithParts, SessionTree}; 4 - use crate::storage::{FileReader, MappedFile, StoragePaths}; 5 + use crate::stage::{IssueKind, StageName, StageReport, StageResult}; 6 + use crate::storage::{FileReader, MappedFile, MappedSpan, StoragePaths}; 5 7 use crate::types::{Message, Part, SessionInfo}; 6 8 use crate::{Error, Result}; 7 9 use bon::Builder; ··· 44 46 pub struct SessionFlowResult { 45 47 pub scope: SessionFlowScope, 46 48 pub message_scopes: Vec<MessageFlowScope>, 49 + pub session: LoadedSession, 50 + pub messages: Vec<MessageWithParts>, 51 + } 52 + 53 + #[derive(Debug, Clone)] 54 + pub struct SessionTreePlan { 55 + pub session: SessionKey, 56 + pub messages: Vec<MessageTreePlan>, 57 + pub include_diff: bool, 58 + } 59 + 60 + #[derive(Debug, Clone)] 61 + pub struct MessageTreePlan { 62 + pub key: MessageKey, 63 + pub parts: Vec<PartKey>, 64 + } 65 + 66 + #[derive(Debug, Clone)] 67 + pub struct PartRefNode { 68 + pub key: PartKey, 69 + pub kind: crate::index::PartKind, 70 + pub span: MappedSpan, 71 + } 72 + 73 + #[derive(Debug, Clone)] 74 + pub struct MessageRefNode { 75 + pub key: MessageKey, 76 + pub span: MappedSpan, 77 + pub parts: Vec<PartRefNode>, 78 + } 79 + 80 + #[derive(Debug, Clone)] 81 + pub struct SessionRefNode { 82 + pub key: SessionKey, 83 + pub span: MappedSpan, 84 + pub diff: Option<MappedSpan>, 85 + pub messages: Vec<MessageRefNode>, 86 + } 87 + 88 + #[derive(Debug, Clone)] 89 + pub struct SessionRefTree { 90 + pub session: SessionRefNode, 91 + } 92 + 93 + #[derive(Debug, Clone)] 94 + pub struct SessionHydratedTree { 47 95 pub session: LoadedSession, 48 96 pub messages: Vec<MessageWithParts>, 49 97 } ··· 292 340 session, 293 341 messages, 294 342 }) 343 + } 344 + 345 + pub fn plan_session_tree( 346 + &self, 347 + options: &SessionFlowOptions, 348 + ) -> Result<StageResult<SessionTreePlan>> { 349 + let mut report = StageReport::new(StageName::Plan); 350 + 351 + let meta = self.require_session_meta(&options.session_id)?; 352 + let mut message_ids = self.message_ids_for_session(&options.session_id); 353 + if let Some(limit) = options.message_limit { 354 + message_ids.truncate(limit); 355 + } 356 + 357 + let mut messages = Vec::with_capacity(message_ids.len()); 358 + for message_id in message_ids { 359 + report.attempted += 1; 360 + if self.message_meta(&message_id).is_none() { 361 + report.push_issue( 362 + Some(crate::EntityKey::Message(MessageKey::new( 363 + options.session_id.clone(), 364 + message_id.clone(), 365 + ))), 366 + None, 367 + IssueKind::NotFound, 368 + "message metadata missing during tree planning", 369 + ); 370 + continue; 371 + } 372 + 373 + let mut part_ids = if options.include_parts { 374 + self.part_ids_for_message(&message_id) 375 + } else { 376 + Vec::new() 377 + }; 378 + 379 + if let Some(limit) = options.part_limit_per_message { 380 + part_ids.truncate(limit); 381 + } 382 + 383 + let parts = part_ids 384 + .into_iter() 385 + .map(|part_id| PartKey::new(message_id.clone(), part_id)) 386 + .collect(); 387 + 388 + messages.push(MessageTreePlan { 389 + key: MessageKey::new(options.session_id.clone(), message_id), 390 + parts, 391 + }); 392 + report.succeeded += 1; 393 + } 394 + 395 + let plan = SessionTreePlan { 396 + session: SessionKey::new(meta.project_id, options.session_id.clone()), 397 + messages, 398 + include_diff: options.include_diff, 399 + }; 400 + 401 + Ok(StageResult::new(plan, report)) 402 + } 403 + 404 + pub fn resolve_session_tree( 405 + &self, 406 + plan: &SessionTreePlan, 407 + ) -> Result<StageResult<SessionRefTree>> { 408 + let mut report = StageReport::new(StageName::Resolve); 409 + let session_span = self 410 + .reader 411 + .read_session_span(&plan.session.project_id, &plan.session.session_id)?; 412 + let diff = if plan.include_diff { 413 + self.reader.read_diff_span(&plan.session.session_id)? 414 + } else { 415 + None 416 + }; 417 + 418 + let mut message_nodes = Vec::with_capacity(plan.messages.len()); 419 + for message_plan in &plan.messages { 420 + report.attempted += 1; 421 + let message_span = match self 422 + .reader 423 + .read_message_span(&message_plan.key.session_id, &message_plan.key.message_id) 424 + { 425 + Ok(span) => span, 426 + Err(err) => { 427 + report.push_issue( 428 + Some(crate::EntityKey::Message(message_plan.key.clone())), 429 + Some(self.reader.paths().message_file( 430 + &message_plan.key.session_id, 431 + &message_plan.key.message_id, 432 + )), 433 + IssueKind::Io, 434 + format!("unable to map message file: {err}"), 435 + ); 436 + continue; 437 + } 438 + }; 439 + 440 + let mut part_nodes = Vec::with_capacity(message_plan.parts.len()); 441 + for part_plan in &message_plan.parts { 442 + let span = match self 443 + .reader 444 + .read_part_span(&part_plan.message_id, &part_plan.part_id) 445 + { 446 + Ok(span) => span, 447 + Err(err) => { 448 + report.push_issue( 449 + Some(crate::EntityKey::Part(part_plan.clone())), 450 + Some( 451 + self.reader 452 + .paths() 453 + .part_file(&part_plan.message_id, &part_plan.part_id), 454 + ), 455 + IssueKind::Io, 456 + format!("unable to map part file: {err}"), 457 + ); 458 + continue; 459 + } 460 + }; 461 + 462 + let Some(part_ref) = self.part_ref(&part_plan.part_id) else { 463 + report.push_issue( 464 + Some(crate::EntityKey::Part(part_plan.clone())), 465 + None, 466 + IssueKind::InvalidReference, 467 + "part metadata missing during tree resolve", 468 + ); 469 + continue; 470 + }; 471 + 472 + part_nodes.push(PartRefNode { 473 + key: part_plan.clone(), 474 + kind: part_ref.kind, 475 + span, 476 + }); 477 + } 478 + 479 + message_nodes.push(MessageRefNode { 480 + key: message_plan.key.clone(), 481 + span: message_span, 482 + parts: part_nodes, 483 + }); 484 + report.succeeded += 1; 485 + } 486 + 487 + Ok(StageResult::new( 488 + SessionRefTree { 489 + session: SessionRefNode { 490 + key: plan.session.clone(), 491 + span: session_span, 492 + diff, 493 + messages: message_nodes, 494 + }, 495 + }, 496 + report, 497 + )) 498 + } 499 + 500 + pub fn hydrate_session_tree( 501 + &self, 502 + ref_tree: &SessionRefTree, 503 + ) -> Result<StageResult<SessionHydratedTree>> { 504 + let mut report = StageReport::new(StageName::Hydrate); 505 + let info: SessionInfo = self.reader.parse_span(&ref_tree.session.span)?; 506 + let diff = match &ref_tree.session.diff { 507 + Some(span) => Some(self.reader.parse_span::<Vec<crate::FileDiff>>(span)?), 508 + None => None, 509 + }; 510 + 511 + let session = LoadedSession { info, diff }; 512 + let mut messages = Vec::with_capacity(ref_tree.session.messages.len()); 513 + 514 + for message_node in &ref_tree.session.messages { 515 + report.attempted += 1; 516 + let message = match self.reader.parse_span::<Message>(&message_node.span) { 517 + Ok(message) => message, 518 + Err(err) => { 519 + report.push_issue( 520 + Some(crate::EntityKey::Message(message_node.key.clone())), 521 + Some(self.reader.paths().message_file( 522 + &message_node.key.session_id, 523 + &message_node.key.message_id, 524 + )), 525 + IssueKind::ParseError, 526 + format!("unable to parse message JSON: {err}"), 527 + ); 528 + continue; 529 + } 530 + }; 531 + 532 + let mut parts = Vec::with_capacity(message_node.parts.len()); 533 + for part_node in &message_node.parts { 534 + match self.reader.parse_span::<Part>(&part_node.span) { 535 + Ok(part) => parts.push(part), 536 + Err(err) => { 537 + report.push_issue( 538 + Some(crate::EntityKey::Part(part_node.key.clone())), 539 + Some( 540 + self.reader 541 + .paths() 542 + .part_file(&part_node.key.message_id, &part_node.key.part_id), 543 + ), 544 + IssueKind::ParseError, 545 + format!("unable to parse part JSON: {err}"), 546 + ); 547 + } 548 + } 549 + } 550 + 551 + messages.push(MessageWithParts { message, parts }); 552 + report.succeeded += 1; 553 + } 554 + 555 + Ok(StageResult::new( 556 + SessionHydratedTree { session, messages }, 557 + report, 558 + )) 295 559 } 296 560 297 561 pub fn session_metas_created_since(&self, since: i64) -> Vec<SessionMeta> {
+1 -1
src/storage/mod.rs
··· 4 4 5 5 pub use mmap::{MappedFile, MappedFileCache}; 6 6 pub use paths::StoragePaths; 7 - pub use reader::FileReader; 7 + pub use reader::{FileReader, MappedSpan};
+52 -2
src/storage/reader.rs
··· 8 8 use std::path::Path; 9 9 use std::sync::Arc; 10 10 11 + #[derive(Debug, Clone)] 12 + pub struct MappedSpan { 13 + pub file: Arc<MappedFile>, 14 + pub offset: usize, 15 + pub len: usize, 16 + } 17 + 18 + impl MappedSpan { 19 + pub fn as_bytes(&self) -> &[u8] { 20 + &self.file.as_bytes()[self.offset..self.offset + self.len] 21 + } 22 + } 23 + 11 24 pub struct FileReader { 12 25 paths: StoragePaths, 13 26 cache: MappedFileCache, ··· 36 49 37 50 pub fn read_mapped(&self, path: &Path) -> Result<Arc<MappedFile>> { 38 51 self.cache.get(path) 52 + } 53 + 54 + pub fn read_span(&self, path: &Path) -> Result<MappedSpan> { 55 + let file = self.read_mapped(path)?; 56 + let len = file.len(); 57 + Ok(MappedSpan { 58 + file, 59 + offset: 0, 60 + len, 61 + }) 62 + } 63 + 64 + pub fn parse_span<T: DeserializeOwned>(&self, span: &MappedSpan) -> Result<T> { 65 + serde_json::from_slice(span.as_bytes()).map_err(Error::Json) 39 66 } 40 67 41 68 pub fn read_json<T: DeserializeOwned>(&self, path: &Path) -> Result<T> { 42 - let mapped = self.read_mapped(path)?; 43 - serde_json::from_slice(mapped.as_bytes()).map_err(Error::Json) 69 + let span = self.read_span(path)?; 70 + self.parse_span(&span) 71 + } 72 + 73 + pub fn read_session_span(&self, project_id: &str, id: &SessionId) -> Result<MappedSpan> { 74 + let path = self.paths.session_file(project_id, id); 75 + self.read_span(&path) 76 + } 77 + 78 + pub fn read_message_span(&self, session_id: &SessionId, id: &MessageId) -> Result<MappedSpan> { 79 + let path = self.paths.message_file(session_id, id); 80 + self.read_span(&path) 81 + } 82 + 83 + pub fn read_part_span(&self, message_id: &MessageId, id: &PartId) -> Result<MappedSpan> { 84 + let path = self.paths.part_file(message_id, id); 85 + self.read_span(&path) 86 + } 87 + 88 + pub fn read_diff_span(&self, session_id: &SessionId) -> Result<Option<MappedSpan>> { 89 + let path = self.paths.diff_file(session_id); 90 + if !path.exists() { 91 + return Ok(None); 92 + } 93 + self.read_span(&path).map(Some) 44 94 } 45 95 46 96 pub fn read_session(&self, project_id: &str, id: &SessionId) -> Result<SessionInfo> {