···11use dashmap::DashMap;
22use std::sync::Arc;
33+use std::collections::HashMap;
34use parking_lot::RwLock;
45use crate::{
56 NodeId, RelationshipId, Node, Relationship,
67 Result, GigabrainError
78};
88-use crate::core::{relationship::Direction, GraphSchema};
99+use crate::core::{relationship::Direction, GraphSchema, PropertyValue};
1010+use crate::index::{IndexManager, IndexType, PersistentIndexManager};
1111+use crate::storage::StorageEngine;
9121013pub struct Graph {
1114 nodes: Arc<DashMap<NodeId, Node>>,
1215 relationships: Arc<DashMap<RelationshipId, Relationship>>,
1316 node_relationships: Arc<DashMap<NodeId, Vec<RelationshipId>>>,
1417 schema: Arc<RwLock<GraphSchema>>,
1818+ index_manager: Arc<IndexManager>,
1919+ persistent_index_manager: Option<Arc<PersistentIndexManager>>,
15201621 next_node_id: Arc<RwLock<u64>>,
1722 next_relationship_id: Arc<RwLock<u64>>,
···2429 relationships: Arc::new(DashMap::new()),
2530 node_relationships: Arc::new(DashMap::new()),
2631 schema: Arc::new(RwLock::new(GraphSchema::new())),
3232+ index_manager: Arc::new(IndexManager::new()),
3333+ persistent_index_manager: None,
3434+ next_node_id: Arc::new(RwLock::new(0)),
3535+ next_relationship_id: Arc::new(RwLock::new(0)),
3636+ }
3737+ }
3838+3939+ /// Create a new graph with persistent storage for indexes
4040+ pub fn with_persistent_indexes(storage: Arc<dyn StorageEngine>) -> Self {
4141+ let memory_index_manager = Arc::new(IndexManager::new());
4242+ let persistent_index_manager = Arc::new(PersistentIndexManager::with_memory_manager(
4343+ memory_index_manager.clone(),
4444+ storage
4545+ ));
4646+4747+ Self {
4848+ nodes: Arc::new(DashMap::new()),
4949+ relationships: Arc::new(DashMap::new()),
5050+ node_relationships: Arc::new(DashMap::new()),
5151+ schema: Arc::new(RwLock::new(GraphSchema::new())),
5252+ index_manager: memory_index_manager,
5353+ persistent_index_manager: Some(persistent_index_manager),
2754 next_node_id: Arc::new(RwLock::new(0)),
2855 next_relationship_id: Arc::new(RwLock::new(0)),
2956 }
···4673 F: FnOnce(&mut Node),
4774 {
4875 if let Some(mut node) = self.nodes.get_mut(&id) {
7676+ // Store old state for index updates
7777+ let old_labels: Vec<_> = node.labels.iter().cloned().collect();
7878+ let old_properties = node.properties.clone();
7979+4980 update_fn(&mut node);
8181+8282+ // Update indexes with new node state
8383+ let new_labels: Vec<_> = node.labels.iter().cloned().collect();
8484+ let new_properties = node.properties.clone();
8585+8686+ // Ensure indexes exist for new labels
8787+ for &label_id in &new_labels {
8888+ if !old_labels.contains(&label_id) {
8989+ let index_type = IndexType::Label(label_id);
9090+ if let Err(e) = self.index_manager.create_index(index_type, None, false) {
9191+ tracing::debug!("Label index may already exist: {}", e);
9292+ }
9393+ }
9494+ }
9595+9696+ // Ensure indexes exist for new properties
9797+ for &property_key_id in new_properties.keys() {
9898+ if !old_properties.contains_key(&property_key_id) {
9999+ let index_type = IndexType::Property(property_key_id);
100100+ if let Err(e) = self.index_manager.create_index(index_type, None, false) {
101101+ tracing::debug!("Property index may already exist: {}", e);
102102+ }
103103+ }
104104+ }
105105+106106+ // Update index manager
107107+ if let Err(e) = self.index_manager.update_node_properties(id, &old_properties, &new_properties) {
108108+ tracing::warn!("Failed to update property indexes: {}", e);
109109+ }
110110+111111+ // Add node to indexes with new labels and properties
112112+ if let Err(e) = self.index_manager.add_node(id, &new_labels, &new_properties) {
113113+ tracing::warn!("Failed to add node to indexes: {}", e);
114114+ }
115115+50116 Ok(())
51117 } else {
52118 Err(GigabrainError::NodeNotFound(id))
···132198 }
133199 }
134200201201+ // Remove from indexes
202202+ if let Err(e) = self.index_manager.remove_node(id) {
203203+ tracing::warn!("Failed to remove node from indexes: {}", e);
204204+ }
205205+135206 self.nodes
136207 .remove(&id)
137208 .ok_or(GigabrainError::NodeNotFound(id))?;
···163234 &self.schema
164235 }
165236237237+ pub fn index_manager(&self) -> &Arc<IndexManager> {
238238+ &self.index_manager
239239+ }
240240+241241+ /// Get the persistent index manager if available
242242+ pub fn persistent_index_manager(&self) -> Option<&Arc<PersistentIndexManager>> {
243243+ self.persistent_index_manager.as_ref()
244244+ }
245245+246246+ /// Flush indexes to persistent storage if available
247247+ pub async fn flush_indexes(&self) -> Result<()> {
248248+ if let Some(persistent_manager) = &self.persistent_index_manager {
249249+ persistent_manager.flush().await?;
250250+ }
251251+ Ok(())
252252+ }
253253+254254+ /// Load indexes from storage if persistent manager is available
255255+ pub async fn load_indexes(&self) -> Result<()> {
256256+ if let Some(persistent_manager) = &self.persistent_index_manager {
257257+ persistent_manager.ensure_loaded().await?;
258258+ }
259259+ Ok(())
260260+ }
261261+166262 /// Get all node IDs in the graph
167263 pub fn get_all_nodes(&self) -> Vec<NodeId> {
168264 self.nodes.iter().map(|entry| *entry.key()).collect()
265265+ }
266266+267267+ /// Find nodes by label using indexes when available
268268+ pub fn find_nodes_by_label(&self, label_name: &str) -> Result<Vec<NodeId>> {
269269+ let schema = self.schema.read();
270270+ if let Some(&label_id) = schema.labels.get(label_name) {
271271+ self.index_manager.get_nodes_by_label(label_id)
272272+ } else {
273273+ Ok(Vec::new())
274274+ }
275275+ }
276276+277277+ /// Find nodes by property value using indexes when available
278278+ pub fn find_nodes_by_property(&self, property_name: &str, value: &PropertyValue) -> Result<Vec<NodeId>> {
279279+ let schema = self.schema.read();
280280+ if let Some(&property_key_id) = schema.property_keys.get(property_name) {
281281+ self.index_manager.get_nodes_by_property(property_key_id, value)
282282+ } else {
283283+ Ok(Vec::new())
284284+ }
169285 }
170286}
+2
src/core/mod.rs
···33use crate::{NodeId, RelationshipId, LabelId, PropertyKeyId, Result};
4455pub mod graph;
66+pub mod persistent_graph;
67pub mod node;
78pub mod relationship;
89pub mod property;
910pub mod schema_validation;
10111112pub use graph::Graph;
1313+pub use persistent_graph::PersistentGraph;
1214pub use node::Node;
1315pub use relationship::Relationship;
1416pub use property::{Property, PropertyValue};
+479
src/core/persistent_graph.rs
···11+use dashmap::DashMap;
22+use std::sync::Arc;
33+use parking_lot::RwLock;
44+use crate::{
55+ NodeId, RelationshipId, Node, Relationship,
66+ Result, GigabrainError
77+};
88+use crate::core::{relationship::Direction, GraphSchema};
99+use crate::persistence::{PersistentStorage, StorageBackend};
1010+1111+/// A graph implementation that persists data to storage
1212+pub struct PersistentGraph {
1313+ // In-memory cache for fast access
1414+ nodes: Arc<DashMap<NodeId, Node>>,
1515+ relationships: Arc<DashMap<RelationshipId, Relationship>>,
1616+ node_relationships: Arc<DashMap<NodeId, Vec<RelationshipId>>>,
1717+ schema: Arc<RwLock<GraphSchema>>,
1818+1919+ // ID generators
2020+ next_node_id: Arc<RwLock<u64>>,
2121+ next_relationship_id: Arc<RwLock<u64>>,
2222+2323+ // Persistent storage
2424+ storage: Arc<PersistentStorage>,
2525+2626+ // Flag to enable/disable write-through caching
2727+ write_through: bool,
2828+}
2929+3030+impl PersistentGraph {
3131+ /// Create a new persistent graph with the given storage backend
3232+ pub async fn new(storage_backend: Arc<dyn StorageBackend>) -> Result<Self> {
3333+ let storage = Arc::new(PersistentStorage::new(storage_backend));
3434+ storage.initialize().await?;
3535+3636+ let mut graph = Self {
3737+ nodes: Arc::new(DashMap::new()),
3838+ relationships: Arc::new(DashMap::new()),
3939+ node_relationships: Arc::new(DashMap::new()),
4040+ schema: Arc::new(RwLock::new(GraphSchema::new())),
4141+ next_node_id: Arc::new(RwLock::new(0)),
4242+ next_relationship_id: Arc::new(RwLock::new(0)),
4343+ storage,
4444+ write_through: true,
4545+ };
4646+4747+ // Load existing data from storage
4848+ graph.load_from_storage().await?;
4949+5050+ Ok(graph)
5151+ }
5252+5353+ /// Load data from persistent storage into memory cache
5454+ async fn load_from_storage(&mut self) -> Result<()> {
5555+ tracing::info!("Loading graph data from storage...");
5656+5757+ // Load schema
5858+ if let Some(schema) = self.storage.get_schema().await? {
5959+ *self.schema.write() = schema;
6060+ }
6161+6262+ // Load node and relationship counters
6363+ if let Some(node_counter) = self.storage.get_counter("node_counter").await? {
6464+ *self.next_node_id.write() = node_counter;
6565+ }
6666+6767+ if let Some(rel_counter) = self.storage.get_counter("relationship_counter").await? {
6868+ *self.next_relationship_id.write() = rel_counter;
6969+ }
7070+7171+ // Load all nodes
7272+ let node_ids = self.storage.get_all_node_ids().await?;
7373+ for node_id in node_ids {
7474+ if let Some(node) = self.storage.get_node(node_id).await? {
7575+ self.nodes.insert(node_id, node);
7676+ self.node_relationships.insert(node_id, Vec::new());
7777+ }
7878+ }
7979+8080+ // Load all relationships and build relationship index
8181+ let relationship_ids = self.storage.get_all_relationship_ids().await?;
8282+ for rel_id in relationship_ids {
8383+ if let Some(relationship) = self.storage.get_relationship(rel_id).await? {
8484+ let start_node = relationship.start_node;
8585+ let end_node = relationship.end_node;
8686+8787+ self.relationships.insert(rel_id, relationship);
8888+8989+ // Update relationship indices
9090+ self.node_relationships.entry(start_node).and_modify(|rels| rels.push(rel_id));
9191+ if start_node != end_node {
9292+ self.node_relationships.entry(end_node).and_modify(|rels| rels.push(rel_id));
9393+ }
9494+ }
9595+ }
9696+9797+ tracing::info!(
9898+ "Loaded {} nodes and {} relationships from storage",
9999+ self.nodes.len(),
100100+ self.relationships.len()
101101+ );
102102+103103+ Ok(())
104104+ }
105105+106106+ /// Persist current state to storage
107107+ pub async fn persist_to_storage(&self) -> Result<()> {
108108+ tracing::info!("Persisting graph data to storage...");
109109+110110+ // Persist schema
111111+ self.storage.store_schema(&self.schema.read()).await?;
112112+113113+ // Persist counters
114114+ self.storage.store_counter("node_counter", *self.next_node_id.read()).await?;
115115+ self.storage.store_counter("relationship_counter", *self.next_relationship_id.read()).await?;
116116+117117+ // Persist all nodes
118118+ for entry in self.nodes.iter() {
119119+ let (_node_id, node) = entry.pair();
120120+ self.storage.store_node(node).await?;
121121+ }
122122+123123+ // Persist all relationships
124124+ for entry in self.relationships.iter() {
125125+ let (_rel_id, relationship) = entry.pair();
126126+ self.storage.store_relationship(relationship).await?;
127127+ }
128128+129129+ // Flush to ensure data is written
130130+ self.storage.flush().await?;
131131+132132+ tracing::info!("Graph data persisted to storage successfully");
133133+ Ok(())
134134+ }
135135+136136+ pub async fn create_node(&self) -> Result<NodeId> {
137137+ let mut id_gen = self.next_node_id.write();
138138+ let id = NodeId(*id_gen);
139139+ *id_gen += 1;
140140+ drop(id_gen);
141141+142142+ let node = Node::new(id);
143143+144144+ // Store in cache
145145+ self.nodes.insert(id, node.clone());
146146+ self.node_relationships.insert(id, Vec::new());
147147+148148+ // Persist to storage if write-through is enabled
149149+ if self.write_through {
150150+ self.storage.store_node(&node).await?;
151151+ self.storage.store_counter("node_counter", *self.next_node_id.read()).await?;
152152+ }
153153+154154+ Ok(id)
155155+ }
156156+157157+ pub async fn update_node<F>(&self, id: NodeId, update_fn: F) -> Result<()>
158158+ where
159159+ F: FnOnce(&mut Node),
160160+ {
161161+ if let Some(mut node) = self.nodes.get_mut(&id) {
162162+ update_fn(&mut node);
163163+164164+ // Persist to storage if write-through is enabled
165165+ if self.write_through {
166166+ self.storage.store_node(&node).await?;
167167+ }
168168+169169+ Ok(())
170170+ } else {
171171+ Err(GigabrainError::NodeNotFound(id))
172172+ }
173173+ }
174174+175175+ pub async fn create_relationship(
176176+ &self,
177177+ start: NodeId,
178178+ end: NodeId,
179179+ rel_type: u32,
180180+ ) -> Result<RelationshipId> {
181181+ if !self.nodes.contains_key(&start) {
182182+ return Err(GigabrainError::NodeNotFound(start));
183183+ }
184184+ if !self.nodes.contains_key(&end) {
185185+ return Err(GigabrainError::NodeNotFound(end));
186186+ }
187187+188188+ let mut id_gen = self.next_relationship_id.write();
189189+ let id = RelationshipId(*id_gen);
190190+ *id_gen += 1;
191191+ drop(id_gen);
192192+193193+ let relationship = Relationship::new(id, start, end, rel_type);
194194+195195+ // Store in cache
196196+ self.relationships.insert(id, relationship.clone());
197197+198198+ self.node_relationships.entry(start).and_modify(|rels| rels.push(id));
199199+ if start != end {
200200+ self.node_relationships.entry(end).and_modify(|rels| rels.push(id));
201201+ }
202202+203203+ // Persist to storage if write-through is enabled
204204+ if self.write_through {
205205+ self.storage.store_relationship(&relationship).await?;
206206+ self.storage.store_counter("relationship_counter", *self.next_relationship_id.read()).await?;
207207+ }
208208+209209+ Ok(id)
210210+ }
211211+212212+ pub async fn get_node(&self, id: NodeId) -> Option<Node> {
213213+ // Try cache first
214214+ if let Some(node) = self.nodes.get(&id) {
215215+ return Some(node.clone());
216216+ }
217217+218218+ // If not in cache, try loading from storage
219219+ if let Ok(Some(node)) = self.storage.get_node(id).await {
220220+ // Cache the loaded node
221221+ self.nodes.insert(id, node.clone());
222222+ Some(node)
223223+ } else {
224224+ None
225225+ }
226226+ }
227227+228228+ pub async fn get_relationship(&self, id: RelationshipId) -> Option<Relationship> {
229229+ // Try cache first
230230+ if let Some(rel) = self.relationships.get(&id) {
231231+ return Some(rel.clone());
232232+ }
233233+234234+ // If not in cache, try loading from storage
235235+ if let Ok(Some(relationship)) = self.storage.get_relationship(id).await {
236236+ // Cache the loaded relationship
237237+ self.relationships.insert(id, relationship.clone());
238238+ Some(relationship)
239239+ } else {
240240+ None
241241+ }
242242+ }
243243+244244+ pub fn get_node_relationships(
245245+ &self,
246246+ node_id: NodeId,
247247+ direction: Direction,
248248+ rel_types: Option<&[u32]>,
249249+ ) -> Vec<Relationship> {
250250+ let rel_ids = match self.node_relationships.get(&node_id) {
251251+ Some(ids) => ids.clone(),
252252+ None => return Vec::new(),
253253+ };
254254+255255+ rel_ids
256256+ .into_iter()
257257+ .filter_map(|rel_id| {
258258+ self.relationships.get(&rel_id).and_then(|rel| {
259259+ let matches_direction = match direction {
260260+ Direction::Outgoing => rel.start_node == node_id,
261261+ Direction::Incoming => rel.end_node == node_id,
262262+ Direction::Both => true,
263263+ };
264264+265265+ let matches_type = rel_types
266266+ .map(|types| types.contains(&rel.rel_type))
267267+ .unwrap_or(true);
268268+269269+ if matches_direction && matches_type {
270270+ Some(rel.clone())
271271+ } else {
272272+ None
273273+ }
274274+ })
275275+ })
276276+ .collect()
277277+ }
278278+279279+ pub async fn delete_node(&self, id: NodeId) -> Result<()> {
280280+ // Delete all relationships connected to this node
281281+ if let Some((_, rel_ids)) = self.node_relationships.remove(&id) {
282282+ for rel_id in rel_ids {
283283+ self.delete_relationship(rel_id).await?;
284284+ }
285285+ }
286286+287287+ // Remove from cache
288288+ self.nodes
289289+ .remove(&id)
290290+ .ok_or(GigabrainError::NodeNotFound(id))?;
291291+292292+ // Remove from storage if write-through is enabled
293293+ if self.write_through {
294294+ self.storage.delete_node(id).await?;
295295+ }
296296+297297+ Ok(())
298298+ }
299299+300300+ pub async fn delete_relationship(&self, id: RelationshipId) -> Result<()> {
301301+ // Remove from cache
302302+ let rel = self
303303+ .relationships
304304+ .remove(&id)
305305+ .ok_or(GigabrainError::RelationshipNotFound(id))?
306306+ .1;
307307+308308+ // Update relationship indices
309309+ self.node_relationships.entry(rel.start_node).and_modify(|rels| {
310310+ rels.retain(|&r| r != id);
311311+ });
312312+313313+ if rel.start_node != rel.end_node {
314314+ self.node_relationships.entry(rel.end_node).and_modify(|rels| {
315315+ rels.retain(|&r| r != id);
316316+ });
317317+ }
318318+319319+ // Remove from storage if write-through is enabled
320320+ if self.write_through {
321321+ self.storage.delete_relationship(id).await?;
322322+ }
323323+324324+ Ok(())
325325+ }
326326+327327+ pub fn schema(&self) -> &Arc<RwLock<GraphSchema>> {
328328+ &self.schema
329329+ }
330330+331331+ /// Get all node IDs in the graph
332332+ pub fn get_all_nodes(&self) -> Vec<NodeId> {
333333+ self.nodes.iter().map(|entry| *entry.key()).collect()
334334+ }
335335+336336+ /// Enable or disable write-through caching
337337+ pub fn set_write_through(&mut self, enabled: bool) {
338338+ self.write_through = enabled;
339339+ }
340340+341341+ /// Flush all pending changes to storage
342342+ pub async fn flush(&self) -> Result<()> {
343343+ self.storage.flush().await
344344+ }
345345+346346+ /// Close the persistent graph and storage
347347+ pub async fn close(self) -> Result<()> {
348348+ // Ensure all data is persisted before closing
349349+ if self.write_through {
350350+ self.persist_to_storage().await?;
351351+ }
352352+353353+ self.storage.close().await
354354+ }
355355+}
356356+357357+// Implement the same interface as the original Graph for compatibility
358358+impl PersistentGraph {
359359+ /// Legacy sync method for create_node (for compatibility)
360360+ pub fn create_node_sync(&self) -> NodeId {
361361+ // For backward compatibility, use blocking async
362362+ tokio::task::block_in_place(|| {
363363+ tokio::runtime::Handle::current().block_on(async {
364364+ self.create_node().await.expect("Failed to create node")
365365+ })
366366+ })
367367+ }
368368+369369+ /// Legacy sync method for update_node (for compatibility)
370370+ pub fn update_node_sync<F>(&self, id: NodeId, update_fn: F) -> Result<()>
371371+ where
372372+ F: FnOnce(&mut Node),
373373+ {
374374+ // For backward compatibility, use blocking async
375375+ tokio::task::block_in_place(|| {
376376+ tokio::runtime::Handle::current().block_on(async {
377377+ self.update_node(id, update_fn).await
378378+ })
379379+ })
380380+ }
381381+382382+ /// Legacy sync method for create_relationship (for compatibility)
383383+ pub fn create_relationship_sync(
384384+ &self,
385385+ start: NodeId,
386386+ end: NodeId,
387387+ rel_type: u32,
388388+ ) -> Result<RelationshipId> {
389389+ // For backward compatibility, use blocking async
390390+ tokio::task::block_in_place(|| {
391391+ tokio::runtime::Handle::current().block_on(async {
392392+ self.create_relationship(start, end, rel_type).await
393393+ })
394394+ })
395395+ }
396396+397397+ /// Legacy sync method for get_node (for compatibility)
398398+ pub fn get_node_sync(&self, id: NodeId) -> Option<Node> {
399399+ // For backward compatibility, use blocking async
400400+ tokio::task::block_in_place(|| {
401401+ tokio::runtime::Handle::current().block_on(async {
402402+ self.get_node(id).await
403403+ })
404404+ })
405405+ }
406406+407407+ /// Legacy sync method for get_relationship (for compatibility)
408408+ pub fn get_relationship_sync(&self, id: RelationshipId) -> Option<Relationship> {
409409+ // For backward compatibility, use blocking async
410410+ tokio::task::block_in_place(|| {
411411+ tokio::runtime::Handle::current().block_on(async {
412412+ self.get_relationship(id).await
413413+ })
414414+ })
415415+ }
416416+}
417417+418418+#[cfg(test)]
419419+mod tests {
420420+ use super::*;
421421+ #[cfg(feature = "rocksdb-storage")]
422422+ use crate::persistence::rocksdb_store::RocksDBStore;
423423+ use tempfile::tempdir;
424424+425425+ #[tokio::test]
426426+ #[cfg(feature = "rocksdb-storage")]
427427+ async fn test_persistent_graph() -> Result<()> {
428428+ let temp_dir = tempdir().unwrap();
429429+ let storage_backend = Arc::new(crate::persistence::rocksdb_store::RocksDBStore::new(temp_dir.path())?);
430430+ let graph = PersistentGraph::new(storage_backend).await?;
431431+432432+ // Create nodes
433433+ let node1 = graph.create_node().await?;
434434+ let node2 = graph.create_node().await?;
435435+436436+ // Update a node with properties
437437+ graph.update_node(node1, |node| {
438438+ let schema = graph.schema();
439439+ let mut schema_guard = schema.write();
440440+ let name_prop = schema_guard.get_or_create_property_key("name");
441441+ node.properties.insert(name_prop, crate::core::PropertyValue::String("Alice".to_string()));
442442+ }).await?;
443443+444444+ // Create relationship
445445+ let schema = graph.schema();
446446+ let mut schema_guard = schema.write();
447447+ let knows_rel = schema_guard.get_or_create_relationship_type("KNOWS");
448448+ drop(schema_guard);
449449+450450+ let rel_id = graph.create_relationship(node1, node2, knows_rel).await?;
451451+452452+ // Verify nodes and relationships exist
453453+ assert!(graph.get_node(node1).await.is_some());
454454+ assert!(graph.get_node(node2).await.is_some());
455455+ assert!(graph.get_relationship(rel_id).await.is_some());
456456+457457+ // Test relationship queries
458458+ let relationships = graph.get_node_relationships(node1, Direction::Outgoing, None);
459459+ assert_eq!(relationships.len(), 1);
460460+ assert_eq!(relationships[0].id, rel_id);
461461+462462+ // Close and reopen to test persistence
463463+ graph.close().await?;
464464+465465+ let storage_backend2 = Arc::new(RocksDBStore::new(temp_dir.path())?);
466466+ let graph2 = PersistentGraph::new(storage_backend2).await?;
467467+468468+ // Verify data persisted
469469+ assert!(graph2.get_node(node1).await.is_some());
470470+ assert!(graph2.get_node(node2).await.is_some());
471471+ assert!(graph2.get_relationship(rel_id).await.is_some());
472472+473473+ let relationships2 = graph2.get_node_relationships(node1, Direction::Outgoing, None);
474474+ assert_eq!(relationships2.len(), 1);
475475+ assert_eq!(relationships2[0].id, rel_id);
476476+477477+ Ok(())
478478+ }
479479+}
+53
src/core/property.rs
···15151616impl Eq for PropertyValue {}
17171818+impl PartialOrd for PropertyValue {
1919+ fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
2020+ Some(self.cmp(other))
2121+ }
2222+}
2323+2424+impl Ord for PropertyValue {
2525+ fn cmp(&self, other: &Self) -> std::cmp::Ordering {
2626+ use std::cmp::Ordering;
2727+ use PropertyValue::*;
2828+2929+ // First compare by type for consistent ordering
3030+ let self_type_order = self.type_order();
3131+ let other_type_order = other.type_order();
3232+3333+ match self_type_order.cmp(&other_type_order) {
3434+ Ordering::Equal => {
3535+ // Same type, compare values
3636+ match (self, other) {
3737+ (Null, Null) => Ordering::Equal,
3838+ (Boolean(a), Boolean(b)) => a.cmp(b),
3939+ (Integer(a), Integer(b)) => a.cmp(b),
4040+ (Float(a), Float(b)) => a.partial_cmp(b).unwrap_or(Ordering::Equal),
4141+ (String(a), String(b)) => a.cmp(b),
4242+ (List(a), List(b)) => a.cmp(b),
4343+ (Map(a), Map(b)) => {
4444+ // Convert to sorted vectors for comparison
4545+ let mut a_sorted: Vec<_> = a.iter().collect();
4646+ let mut b_sorted: Vec<_> = b.iter().collect();
4747+ a_sorted.sort_by_key(|(k, _)| *k);
4848+ b_sorted.sort_by_key(|(k, _)| *k);
4949+ a_sorted.cmp(&b_sorted)
5050+ },
5151+ _ => unreachable!("Types should be equal due to type order check"),
5252+ }
5353+ },
5454+ other_ordering => other_ordering,
5555+ }
5656+ }
5757+}
5858+1859impl std::hash::Hash for PropertyValue {
1960 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
2061 match self {
···60101 PropertyValue::String(_) => "string",
61102 PropertyValue::List(_) => "list",
62103 PropertyValue::Map(_) => "map",
104104+ }
105105+ }
106106+107107+ fn type_order(&self) -> u8 {
108108+ match self {
109109+ PropertyValue::Null => 0,
110110+ PropertyValue::Boolean(_) => 1,
111111+ PropertyValue::Integer(_) => 2,
112112+ PropertyValue::Float(_) => 3,
113113+ PropertyValue::String(_) => 4,
114114+ PropertyValue::List(_) => 5,
115115+ PropertyValue::Map(_) => 6,
63116 }
64117 }
65118}
+411-26
src/cypher/executor.rs
···11use crate::cypher::planner::{QueryPlan, ScanPlan, CreatePlan, ProjectPlan};
22use crate::cypher::ast::{CypherQuery, MatchClause, CreateClause, ReturnClause, Expression, PatternElement};
33use crate::core::{Graph, PropertyValue};
44+use crate::index::{IndexQuery, IndexType};
45use crate::{Result, GigabrainError, NodeId, RelationshipId};
56use std::sync::Arc;
67use std::collections::HashMap;
88+use tracing::{debug, info};
79810pub struct QueryExecutor {
911 graph: Arc<Graph>,
···99101 }
100102 }
101103104104+ // Apply WHERE clause filtering if present
105105+ if let Some(where_expr) = &match_clause.where_clause {
106106+ self.apply_where_filter(context, where_expr).await?;
107107+ }
108108+102109 Ok(QueryResult::empty())
103110 }
104111112112+ async fn apply_where_filter(&self, context: &mut ExecutionContext, where_expr: &Expression) -> Result<()> {
113113+ // Create a new context to store filtered results
114114+ let mut filtered_context = ExecutionContext::new();
115115+116116+ // For each variable binding, evaluate the WHERE clause
117117+ let variable_names: Vec<String> = context.variables.keys().cloned().collect();
118118+119119+ if variable_names.is_empty() {
120120+ return Ok(());
121121+ }
122122+123123+ // Get all possible combinations of variable bindings
124124+ let combinations = self.generate_binding_combinations(context).await?;
125125+126126+ // Filter combinations based on WHERE clause
127127+ let mut filtered_combinations = Vec::new();
128128+ for combination in combinations {
129129+ // Create a temporary context with this specific binding combination
130130+ let mut temp_context = ExecutionContext::new();
131131+ for (var_name, binding) in &combination {
132132+ temp_context.bind_variable(var_name.clone(), binding.clone());
133133+ }
134134+135135+ // Evaluate WHERE clause with this binding
136136+ let result = self.evaluate_expression(where_expr, &temp_context, None).await?;
137137+138138+ if self.is_truthy(&result) {
139139+ filtered_combinations.push(combination);
140140+ }
141141+ }
142142+143143+ // Rebuild the context with filtered results
144144+ self.rebuild_context_from_combinations(context, filtered_combinations);
145145+146146+ Ok(())
147147+ }
148148+149149+ async fn generate_binding_combinations(&self, context: &ExecutionContext) -> Result<Vec<HashMap<String, VariableBinding>>> {
150150+ let mut combinations = Vec::new();
151151+ let variable_names: Vec<String> = context.variables.keys().cloned().collect();
152152+153153+ if variable_names.is_empty() {
154154+ return Ok(combinations);
155155+ }
156156+157157+ // Generate all combinations of variable bindings
158158+ // This is a simplified approach - for performance, we'd use join algorithms
159159+ if variable_names.len() == 1 {
160160+ let var_name = &variable_names[0];
161161+ if let Some(binding) = context.variables.get(var_name) {
162162+ match binding {
163163+ VariableBinding::Nodes(node_ids) => {
164164+ for &node_id in node_ids {
165165+ let mut combination = HashMap::new();
166166+ combination.insert(var_name.clone(), VariableBinding::Nodes(vec![node_id]));
167167+ combinations.push(combination);
168168+ }
169169+ },
170170+ VariableBinding::Relationships(rel_ids) => {
171171+ for &rel_id in rel_ids {
172172+ let mut combination = HashMap::new();
173173+ combination.insert(var_name.clone(), VariableBinding::Relationships(vec![rel_id]));
174174+ combinations.push(combination);
175175+ }
176176+ }
177177+ }
178178+ }
179179+ } else {
180180+ // For multiple variables, generate cartesian product
181181+ // This is simplified - real implementation would be more efficient
182182+ self.generate_cartesian_product(context, &variable_names, 0, &mut HashMap::new(), &mut combinations);
183183+ }
184184+185185+ Ok(combinations)
186186+ }
187187+188188+ fn generate_cartesian_product(
189189+ &self,
190190+ context: &ExecutionContext,
191191+ variable_names: &[String],
192192+ index: usize,
193193+ current_combination: &mut HashMap<String, VariableBinding>,
194194+ combinations: &mut Vec<HashMap<String, VariableBinding>>
195195+ ) {
196196+ if index >= variable_names.len() {
197197+ combinations.push(current_combination.clone());
198198+ return;
199199+ }
200200+201201+ let var_name = &variable_names[index];
202202+ if let Some(binding) = context.variables.get(var_name) {
203203+ match binding {
204204+ VariableBinding::Nodes(node_ids) => {
205205+ for &node_id in node_ids {
206206+ current_combination.insert(var_name.clone(), VariableBinding::Nodes(vec![node_id]));
207207+ self.generate_cartesian_product(context, variable_names, index + 1, current_combination, combinations);
208208+ }
209209+ },
210210+ VariableBinding::Relationships(rel_ids) => {
211211+ for &rel_id in rel_ids {
212212+ current_combination.insert(var_name.clone(), VariableBinding::Relationships(vec![rel_id]));
213213+ self.generate_cartesian_product(context, variable_names, index + 1, current_combination, combinations);
214214+ }
215215+ }
216216+ }
217217+ }
218218+ }
219219+220220+ fn rebuild_context_from_combinations(&self, context: &mut ExecutionContext, combinations: Vec<HashMap<String, VariableBinding>>) {
221221+ // Clear current context
222222+ context.variables.clear();
223223+224224+ // Group combinations back into variable bindings
225225+ let mut var_to_nodes: HashMap<String, Vec<NodeId>> = HashMap::new();
226226+ let mut var_to_rels: HashMap<String, Vec<RelationshipId>> = HashMap::new();
227227+228228+ for combination in combinations {
229229+ for (var_name, binding) in combination {
230230+ match binding {
231231+ VariableBinding::Nodes(node_ids) => {
232232+ var_to_nodes.entry(var_name).or_insert_with(Vec::new).extend(node_ids);
233233+ },
234234+ VariableBinding::Relationships(rel_ids) => {
235235+ var_to_rels.entry(var_name).or_insert_with(Vec::new).extend(rel_ids);
236236+ }
237237+ }
238238+ }
239239+ }
240240+241241+ // Rebuild context with filtered bindings
242242+ for (var_name, node_ids) in var_to_nodes {
243243+ context.bind_variable(var_name, VariableBinding::Nodes(node_ids));
244244+ }
245245+246246+ for (var_name, rel_ids) in var_to_rels {
247247+ context.bind_variable(var_name, VariableBinding::Relationships(rel_ids));
248248+ }
249249+ }
250250+105251 async fn find_matching_nodes(&self, node_pattern: &crate::cypher::ast::NodePattern) -> Result<Vec<NodeId>> {
106106- // For now, return all nodes - in reality this would filter by labels and properties
107107- let all_nodes = self.graph.get_all_nodes();
252252+ debug!("Finding nodes matching pattern: labels={:?}, properties={:?}",
253253+ node_pattern.labels, node_pattern.properties);
254254+255255+ let mut candidate_nodes: Option<Vec<NodeId>> = None;
256256+257257+ // Use label indexes if labels are specified
258258+ if !node_pattern.labels.is_empty() {
259259+ debug!("Using label indexes for labels: {:?}", node_pattern.labels);
260260+ let mut label_matches = Vec::new();
261261+262262+ for label_name in &node_pattern.labels {
263263+ match self.graph.find_nodes_by_label(label_name) {
264264+ Ok(nodes) => {
265265+ if label_matches.is_empty() {
266266+ label_matches = nodes;
267267+ } else {
268268+ // Intersect with previous results (AND logic for multiple labels)
269269+ label_matches.retain(|node_id| nodes.contains(node_id));
270270+ }
271271+ },
272272+ Err(e) => {
273273+ debug!("Failed to use label index for '{}': {}", label_name, e);
274274+ // Fall back to scanning all nodes
275275+ label_matches = self.graph.get_all_nodes();
276276+ break;
277277+ }
278278+ }
279279+ }
280280+281281+ candidate_nodes = Some(label_matches);
282282+ info!("Label index returned {} candidate nodes", candidate_nodes.as_ref().unwrap().len());
283283+ }
284284+285285+ // Use property indexes if properties are specified
286286+ if let Some(ref properties) = node_pattern.properties {
287287+ debug!("Using property indexes for {} properties", properties.len());
288288+ let mut property_matches: Option<Vec<NodeId>> = None;
289289+290290+ for (prop_name, value_expr) in properties {
291291+ // For now, only handle literal values in property patterns
292292+ if let Expression::Literal(prop_value) = value_expr {
293293+ match self.graph.find_nodes_by_property(prop_name, prop_value) {
294294+ Ok(nodes) => {
295295+ if let Some(ref mut existing_matches) = property_matches {
296296+ // Intersect with previous property matches
297297+ existing_matches.retain(|node_id| nodes.contains(node_id));
298298+ } else {
299299+ property_matches = Some(nodes);
300300+ }
301301+ },
302302+ Err(e) => {
303303+ debug!("Failed to use property index for '{}': {}", prop_name, e);
304304+ // Continue with other properties or fall back to scanning
305305+ }
306306+ }
307307+ }
308308+ }
309309+310310+ if let Some(prop_nodes) = property_matches {
311311+ if let Some(ref mut candidates) = candidate_nodes {
312312+ // Intersect label matches with property matches
313313+ candidates.retain(|node_id| prop_nodes.contains(node_id));
314314+ } else {
315315+ candidate_nodes = Some(prop_nodes);
316316+ }
317317+ info!("Property index intersection returned {} candidate nodes",
318318+ candidate_nodes.as_ref().unwrap().len());
319319+ }
320320+ }
321321+322322+ // If no indexes were used, scan all nodes
323323+ let result_nodes = candidate_nodes.unwrap_or_else(|| {
324324+ debug!("No indexes available, scanning all nodes");
325325+ self.graph.get_all_nodes()
326326+ });
108327109109- // TODO: Filter by labels if specified
110110- // TODO: Filter by properties if specified
328328+ // TODO: Apply additional filtering for complex property expressions
329329+ // that couldn't be handled by indexes
111330112112- Ok(all_nodes)
331331+ debug!("Final result: {} nodes matched the pattern", result_nodes.len());
332332+ Ok(result_nodes)
113333 }
114334115335 async fn get_all_relationships(&self) -> Result<Vec<RelationshipId>> {
···183403184404 created_nodes.push(node_id);
185405406406+ // Add to indexes after creating the node
407407+ if let Some(node) = self.graph.get_node(node_id) {
408408+ let labels: Vec<_> = node.labels.iter().cloned().collect();
409409+ let properties = node.properties.clone();
410410+411411+ // Ensure indexes exist for any new labels
412412+ for &label_id in &labels {
413413+ let index_type = IndexType::Label(label_id);
414414+ if let Err(e) = self.graph.index_manager().create_index(index_type, None, false) {
415415+ debug!("Label index may already exist or failed to create: {}", e);
416416+ }
417417+ }
418418+419419+ // Ensure indexes exist for any new properties
420420+ for &property_key_id in properties.keys() {
421421+ let index_type = IndexType::Property(property_key_id);
422422+ if let Err(e) = self.graph.index_manager().create_index(index_type, None, false) {
423423+ debug!("Property index may already exist or failed to create: {}", e);
424424+ }
425425+ }
426426+427427+ // Add node to indexes
428428+ if let Err(e) = self.graph.index_manager().add_node(node_id, &labels, &properties) {
429429+ debug!("Failed to add created node to indexes: {}", e);
430430+ } else {
431431+ debug!("Added newly created node {} to indexes", node_id.0);
432432+ }
433433+ }
434434+186435 // Store node variable mapping for relationships
187436 if let Some(var_name) = &node_pattern.variable {
188437 node_variables.insert(var_name.clone(), node_id);
···300549 Ok(QueryResult { rows, columns })
301550 }
302551303303- async fn evaluate_expression(&self, expr: &Expression, context: &ExecutionContext, current_node: Option<NodeId>) -> Result<Value> {
304304- match expr {
305305- Expression::Variable(var_name) => {
306306- if let Some(binding) = context.variables.get(var_name) {
307307- match binding {
308308- VariableBinding::Nodes(nodes) => {
309309- if let Some(node_id) = current_node {
310310- Ok(Value::Node(node_id))
311311- } else if let Some(&first_node) = nodes.first() {
312312- Ok(Value::Node(first_node))
313313- } else {
314314- Ok(Value::Null)
552552+ fn evaluate_expression<'a>(&'a self, expr: &'a Expression, context: &'a ExecutionContext, current_node: Option<NodeId>) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Value>> + 'a>> {
553553+ Box::pin(async move {
554554+ match expr {
555555+ Expression::Variable(var_name) => {
556556+ if let Some(binding) = context.variables.get(var_name) {
557557+ match binding {
558558+ VariableBinding::Nodes(nodes) => {
559559+ if let Some(node_id) = current_node {
560560+ Ok(Value::Node(node_id))
561561+ } else if let Some(&first_node) = nodes.first() {
562562+ Ok(Value::Node(first_node))
563563+ } else {
564564+ Ok(Value::Null)
565565+ }
566566+ },
567567+ VariableBinding::Relationships(rels) => {
568568+ if let Some(&first_rel) = rels.first() {
569569+ Ok(Value::Relationship(first_rel))
570570+ } else {
571571+ Ok(Value::Null)
572572+ }
315573 }
316316- },
317317- VariableBinding::Relationships(rels) => {
318318- if let Some(&first_rel) = rels.first() {
319319- Ok(Value::Relationship(first_rel))
320320- } else {
321321- Ok(Value::Null)
574574+ }
575575+ } else {
576576+ Ok(Value::Null)
577577+ }
578578+ },
579579+ Expression::Property(prop_expr) => {
580580+ let base_value = self.evaluate_expression(&prop_expr.expression, context, current_node).await?;
581581+ self.get_property_value(base_value, &prop_expr.property).await
582582+ },
583583+ Expression::Literal(literal) => {
584584+ Ok(self.property_value_to_executor_value(literal))
585585+ },
586586+ Expression::Equal(left, right) => {
587587+ let left_val = self.evaluate_expression(left, context, current_node).await?;
588588+ let right_val = self.evaluate_expression(right, context, current_node).await?;
589589+ Ok(Value::Boolean(self.values_equal(&left_val, &right_val)))
590590+ },
591591+ Expression::NotEqual(left, right) => {
592592+ let left_val = self.evaluate_expression(left, context, current_node).await?;
593593+ let right_val = self.evaluate_expression(right, context, current_node).await?;
594594+ Ok(Value::Boolean(!self.values_equal(&left_val, &right_val)))
595595+ },
596596+ Expression::And(left, right) => {
597597+ let left_val = self.evaluate_expression(left, context, current_node).await?;
598598+ let right_val = self.evaluate_expression(right, context, current_node).await?;
599599+ Ok(Value::Boolean(self.is_truthy(&left_val) && self.is_truthy(&right_val)))
600600+ },
601601+ Expression::Or(left, right) => {
602602+ let left_val = self.evaluate_expression(left, context, current_node).await?;
603603+ let right_val = self.evaluate_expression(right, context, current_node).await?;
604604+ Ok(Value::Boolean(self.is_truthy(&left_val) || self.is_truthy(&right_val)))
605605+ },
606606+ Expression::Not(expr) => {
607607+ let val = self.evaluate_expression(expr, context, current_node).await?;
608608+ Ok(Value::Boolean(!self.is_truthy(&val)))
609609+ },
610610+ Expression::LessThan(left, right) => {
611611+ let left_val = self.evaluate_expression(left, context, current_node).await?;
612612+ let right_val = self.evaluate_expression(right, context, current_node).await?;
613613+ Ok(Value::Boolean(self.compare_values(&left_val, &right_val) < 0))
614614+ },
615615+ Expression::LessThanOrEqual(left, right) => {
616616+ let left_val = self.evaluate_expression(left, context, current_node).await?;
617617+ let right_val = self.evaluate_expression(right, context, current_node).await?;
618618+ Ok(Value::Boolean(self.compare_values(&left_val, &right_val) <= 0))
619619+ },
620620+ Expression::GreaterThan(left, right) => {
621621+ let left_val = self.evaluate_expression(left, context, current_node).await?;
622622+ let right_val = self.evaluate_expression(right, context, current_node).await?;
623623+ Ok(Value::Boolean(self.compare_values(&left_val, &right_val) > 0))
624624+ },
625625+ Expression::GreaterThanOrEqual(left, right) => {
626626+ let left_val = self.evaluate_expression(left, context, current_node).await?;
627627+ let right_val = self.evaluate_expression(right, context, current_node).await?;
628628+ Ok(Value::Boolean(self.compare_values(&left_val, &right_val) >= 0))
629629+ },
630630+ _ => Ok(Value::Null), // TODO: Implement remaining expression types
631631+ }
632632+ })
633633+ }
634634+635635+ async fn get_property_value(&self, base_value: Value, property_name: &str) -> Result<Value> {
636636+ match base_value {
637637+ Value::Node(node_id) => {
638638+ if let Some(node) = self.graph.get_node(node_id) {
639639+ // Get property value from node
640640+ let schema = self.graph.schema();
641641+ let schema_guard = schema.read();
642642+ for (prop_key, _) in &schema_guard.property_keys {
643643+ if prop_key == property_name {
644644+ let prop_key_id = schema_guard.property_keys[prop_key];
645645+ if let Some(prop_value) = node.properties.get(&prop_key_id) {
646646+ return Ok(self.property_value_to_executor_value(prop_value));
322647 }
323648 }
324649 }
325325- } else {
326326- Ok(Value::Null)
327650 }
651651+ Ok(Value::Null)
328652 },
329329- _ => Ok(Value::Null), // TODO: Implement other expression types
653653+ Value::Relationship(_rel_id) => {
654654+ // TODO: Implement relationship property access
655655+ Ok(Value::Null)
656656+ },
657657+ _ => Ok(Value::Null),
658658+ }
659659+ }
660660+661661+ fn property_value_to_executor_value(&self, prop_val: &PropertyValue) -> Value {
662662+ match prop_val {
663663+ PropertyValue::String(s) => Value::String(s.clone()),
664664+ PropertyValue::Integer(i) => Value::Integer(*i),
665665+ PropertyValue::Float(f) => Value::Float(*f),
666666+ PropertyValue::Boolean(b) => Value::Boolean(*b),
667667+ PropertyValue::List(list) => {
668668+ let converted_list: Vec<Value> = list.iter()
669669+ .map(|item| self.property_value_to_executor_value(item))
670670+ .collect();
671671+ Value::List(converted_list)
672672+ },
673673+ PropertyValue::Null => Value::Null,
674674+ PropertyValue::Map(_) => Value::Null, // TODO: Implement map support
675675+ }
676676+ }
677677+678678+ fn values_equal(&self, left: &Value, right: &Value) -> bool {
679679+ match (left, right) {
680680+ (Value::String(a), Value::String(b)) => a == b,
681681+ (Value::Integer(a), Value::Integer(b)) => a == b,
682682+ (Value::Float(a), Value::Float(b)) => (a - b).abs() < f64::EPSILON,
683683+ (Value::Boolean(a), Value::Boolean(b)) => a == b,
684684+ (Value::Node(a), Value::Node(b)) => a == b,
685685+ (Value::Relationship(a), Value::Relationship(b)) => a == b,
686686+ (Value::Null, Value::Null) => true,
687687+ // Type coercion for numbers
688688+ (Value::Integer(a), Value::Float(b)) => (*a as f64 - b).abs() < f64::EPSILON,
689689+ (Value::Float(a), Value::Integer(b)) => (a - *b as f64).abs() < f64::EPSILON,
690690+ _ => false,
691691+ }
692692+ }
693693+694694+ fn is_truthy(&self, value: &Value) -> bool {
695695+ match value {
696696+ Value::Boolean(b) => *b,
697697+ Value::Null => false,
698698+ Value::Integer(i) => *i != 0,
699699+ Value::Float(f) => *f != 0.0,
700700+ Value::String(s) => !s.is_empty(),
701701+ Value::List(list) => !list.is_empty(),
702702+ Value::Node(_) => true,
703703+ Value::Relationship(_) => true,
704704+ }
705705+ }
706706+707707+ fn compare_values(&self, left: &Value, right: &Value) -> i32 {
708708+ match (left, right) {
709709+ (Value::Integer(a), Value::Integer(b)) => a.cmp(b) as i32,
710710+ (Value::Float(a), Value::Float(b)) => a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal) as i32,
711711+ (Value::Integer(a), Value::Float(b)) => (*a as f64).partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal) as i32,
712712+ (Value::Float(a), Value::Integer(b)) => a.partial_cmp(&(*b as f64)).unwrap_or(std::cmp::Ordering::Equal) as i32,
713713+ (Value::String(a), Value::String(b)) => a.cmp(b) as i32,
714714+ _ => 0, // Incomparable types are considered equal
330715 }
331716 }
332717
···11pub mod core;
22pub mod storage;
33+pub mod persistence;
34pub mod cypher;
45pub mod algorithms;
56pub mod index;
···1011pub mod observability;
1112pub mod cli;
12131313-pub use core::{Graph, Node, Relationship, Property};
1414+pub use core::{Graph, PersistentGraph, Node, Relationship, Property};
1415pub use error::{GigabrainError, Result};
1516pub use observability::{ObservabilitySystem, HealthLevel, HealthStatus};
1717+pub use persistence::{PersistentStorage, StorageBackend};
1818+pub use index::{IndexManager, IndexType, IndexQuery, IndexConfig};
1619use serde::{Serialize, Deserialize};
17201821#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]