diff --git a/db4-storage/src/pages/node_page/writer.rs b/db4-storage/src/pages/node_page/writer.rs index 5c462bc638..4b07c8b634 100644 --- a/db4-storage/src/pages/node_page/writer.rs +++ b/db4-storage/src/pages/node_page/writer.rs @@ -31,6 +31,7 @@ pub struct NodeWriter<'a, MP: DerefMut + 'a, NS: NodeSe impl<'a, MP: DerefMut + 'a, NS: NodeSegmentOps> NodeWriter<'a, MP, NS> { pub fn new(page: &'a NS, global_num_nodes: &'a GraphStats, writer: MP) -> Self { let old_est_size = writer.est_size(); + Self { page, mut_segment: writer, @@ -38,6 +39,7 @@ impl<'a, MP: DerefMut + 'a, NS: NodeSegmentOps> NodeWri old_est_size, } } + #[inline(always)] pub fn resolve_pos(&self, node_id: VID) -> Option { let (page, pos) = resolve_pos(node_id, self.mut_segment.max_page_len()); @@ -270,6 +272,7 @@ impl<'a, MP: DerefMut + 'a, NS: NodeSegmentOps> Drop fn drop(&mut self) { self.mut_segment .increment_global_est_size(self.mut_segment.est_size() - self.old_est_size); + self.page .notify_write(self.mut_segment.deref_mut()) .expect("Failed to persist node page"); diff --git a/raphtory-api/src/core/entities/properties/meta.rs b/raphtory-api/src/core/entities/properties/meta.rs index 6113f87596..aaa932c236 100644 --- a/raphtory-api/src/core/entities/properties/meta.rs +++ b/raphtory-api/src/core/entities/properties/meta.rs @@ -234,11 +234,17 @@ impl Meta { } } +/// Manages the mapping of property names to their IDs and types. #[derive(Default, Debug, Serialize, Deserialize)] pub struct PropMapper { + /// Maps property names to their IDs. id_mapper: DictMapper, - row_size: AtomicUsize, + + /// Property types indexed by property ID. dtypes: Arc>>, + + /// Estimated size in bytes of a single row of properties maintained by this mapper. + row_size: AtomicUsize, } impl Deref for PropMapper { @@ -300,8 +306,10 @@ impl PropMapper { let wrapped_id = self.id_mapper.get_or_create_id(prop); let id = wrapped_id.inner(); let dtype_read = self.dtypes.read_recursive(); + if let Some(old_type) = dtype_read.get(id) { let mut unified = false; + if unify_types(&dtype, old_type, &mut unified).is_ok() { if !unified { // means the types were equal, no change needed @@ -315,11 +323,25 @@ impl PropMapper { }); } } - drop(dtype_read); // drop the read lock and wait for write lock as type did not exist yet + + // Drop the read lock and grab the write lock in order to add the new + // prop type or unify the existing prop type. + drop(dtype_read); + let mut dtype_write = self.dtypes.write(); + match dtype_write.get(id).cloned() { Some(old_type) => { - if let Ok(tpe) = unify_types(&dtype, &old_type, &mut false) { + let mut unified = false; + + if let Ok(tpe) = unify_types(&dtype, &old_type, &mut unified) { + if unified { + // The row size needs to account for the difference in sizes + // between the newly unified type and the old type. + let delta = tpe.est_size() - old_type.est_size(); + self.row_size.fetch_add(delta, atomic::Ordering::Relaxed); + } + dtype_write[id] = tpe; Ok(wrapped_id) } else { @@ -331,10 +353,12 @@ impl PropMapper { } } None => { - // vector not resized yet, resize it and set the dtype and return id + // vector not resized yet; resize it, set the new dtype and return the id. dtype_write.resize(id + 1, PropType::Empty); + self.row_size .fetch_add(dtype.est_size(), atomic::Ordering::Relaxed); + dtype_write[id] = dtype; Ok(wrapped_id) } @@ -371,37 +395,37 @@ impl PropMapper { WriteLockedPropMapper { dict_mapper: self.id_mapper.write(), d_types: self.dtypes.write(), + row_size: &self.row_size, } } } -pub struct LockedPropMapper<'a> { - dict_mapper: LockedDictMapper<'a>, - d_types: RwLockReadGuard<'a, Vec>, -} - +/// Write-locked view of a [`PropMapper`]. pub struct WriteLockedPropMapper<'a> { + /// Maps property names to their IDs. dict_mapper: WriteLockedDictMapper<'a>, + + /// Property types indexed by property ID. d_types: RwLockWriteGuard<'a, Vec>, + + /// Estimated size in bytes of a single row of properties maintained by this mapper. + row_size: &'a AtomicUsize, } impl<'a> WriteLockedPropMapper<'a> { - pub fn get_dtype(&'a self, prop_id: usize) -> Option<&'a PropType> { - self.d_types.get(prop_id) - } + pub fn new_id_and_dtype(&mut self, key: impl Into, dtype: PropType) -> usize { + let id = self.dict_mapper.get_or_create_id(&key.into()); + let dtypes = self.d_types.deref_mut(); - /// Fast check for property type without unifying the types - /// Returns: - /// - `Some(Either::Left(id))` if the property type can be unified - /// - `Some(Either::Right(id))` if the property type is already set and no unification is needed - /// - `None` if the property type is not set - /// - `Err(PropError::PropertyTypeError)` if the property type cannot be unified - pub fn fast_proptype_check( - &mut self, - prop: &str, - dtype: PropType, - ) -> Result>, PropError> { - fast_proptype_check(self.dict_mapper.map(), &self.d_types, prop, dtype) + if dtypes.len() <= id.inner() { + dtypes.resize(id.inner() + 1, PropType::Empty); + } + + self.row_size + .fetch_add(dtype.est_size(), atomic::Ordering::Relaxed); + + dtypes[id.inner()] = dtype; + id.inner() } pub fn set_id_and_dtype(&mut self, key: impl Into, id: usize, dtype: PropType) { @@ -409,6 +433,19 @@ impl<'a> WriteLockedPropMapper<'a> { self.set_dtype(id, dtype); } + pub fn set_dtype(&mut self, id: usize, dtype: PropType) { + let dtypes = self.d_types.deref_mut(); + + if dtypes.len() <= id { + dtypes.resize(id + 1, PropType::Empty); + } + + self.row_size + .fetch_add(dtype.est_size(), atomic::Ordering::Relaxed); + + dtypes[id] = dtype; + } + pub fn set_or_unify_id_and_dtype( &mut self, key: impl Into, @@ -419,41 +456,60 @@ impl<'a> WriteLockedPropMapper<'a> { self.set_or_unify_dtype(id, dtype) } - pub fn set_dtype(&mut self, id: usize, dtype: PropType) { - let dtypes = self.d_types.deref_mut(); - if dtypes.len() <= id { - dtypes.resize(id + 1, PropType::Empty); - } - dtypes[id] = dtype; - } - pub fn set_or_unify_dtype(&mut self, id: usize, dtype: PropType) -> Result<(), PropError> { let dtypes = self.d_types.deref_mut(); + match dtypes.get_mut(id) { None => { dtypes.resize(id + 1, PropType::Empty); + + self.row_size + .fetch_add(dtype.est_size(), atomic::Ordering::Relaxed); + dtypes[id] = dtype; } Some(old_dtype) => { let mut unified = false; let unified_type = unify_types(&old_dtype, &dtype, &mut unified)?; + + if unified { + // The row size needs to account for the difference in sizes + // between the newly unified type and the old type. + let delta = unified_type.est_size() - old_dtype.est_size(); + self.row_size.fetch_add(delta, atomic::Ordering::Relaxed); + } + *old_dtype = unified_type; } } + Ok(()) } - pub fn new_id_and_dtype(&mut self, key: impl Into, dtype: PropType) -> usize { - let id = self.dict_mapper.get_or_create_id(&key.into()); - let dtypes = self.d_types.deref_mut(); - if dtypes.len() <= id.inner() { - dtypes.resize(id.inner() + 1, PropType::Empty); - } - dtypes[id.inner()] = dtype; - id.inner() + pub fn get_dtype(&'a self, prop_id: usize) -> Option<&'a PropType> { + self.d_types.get(prop_id) + } + + /// Fast check for property type without unifying the types + /// Returns: + /// - `Some(Either::Left(id))` if the property type can be unified + /// - `Some(Either::Right(id))` if the property type is already set and no unification is needed + /// - `None` if the property type is not set + /// - `Err(PropError::PropertyTypeError)` if the property type cannot be unified + pub fn fast_proptype_check( + &mut self, + prop: &str, + dtype: PropType, + ) -> Result>, PropError> { + fast_proptype_check(self.dict_mapper.map(), &self.d_types, prop, dtype) } } +pub struct LockedPropMapper<'a> { + dict_mapper: LockedDictMapper<'a>, + d_types: RwLockReadGuard<'a, Vec>, +} + impl<'a> LockedPropMapper<'a> { pub fn get_id(&self, prop: &str) -> Option { self.dict_mapper.get_id(prop) @@ -517,38 +573,46 @@ fn fast_proptype_check( } #[cfg(test)] -mod tests { +mod prop_mapper_tests { use super::*; #[test] - fn test_get_or_create_and_validate_new_property() { + fn get_or_create_and_validate_new_property() { let prop_mapper = PropMapper::default(); let result = prop_mapper.get_or_create_and_validate("new_prop", PropType::U8); + assert!(result.is_ok()); assert_eq!(result.unwrap().inner(), 0); assert_eq!(prop_mapper.get_dtype(0), Some(PropType::U8)); } #[test] - fn test_get_or_create_and_validate_existing_property_same_type() { + fn get_or_create_and_validate_existing_property_same_type() { let prop_mapper = PropMapper::default(); + prop_mapper .get_or_create_and_validate("existing_prop", PropType::U8) .unwrap(); + let result = prop_mapper.get_or_create_and_validate("existing_prop", PropType::U8); + assert!(result.is_ok()); assert_eq!(result.unwrap().inner(), 0); assert_eq!(prop_mapper.get_dtype(0), Some(PropType::U8)); } #[test] - fn test_get_or_create_and_validate_existing_property_different_type() { + fn get_or_create_and_validate_existing_property_different_type() { let prop_mapper = PropMapper::default(); + prop_mapper .get_or_create_and_validate("existing_prop", PropType::U8) .unwrap(); + let result = prop_mapper.get_or_create_and_validate("existing_prop", PropType::U16); + assert!(result.is_err()); + if let Err(PropError { name, expected, @@ -564,32 +628,39 @@ mod tests { } #[test] - fn test_get_or_create_and_validate_unify_types() { + fn get_or_create_and_validate_unify_types() { let prop_mapper = PropMapper::default(); + prop_mapper .get_or_create_and_validate("prop", PropType::Empty) .unwrap(); + let result = prop_mapper.get_or_create_and_validate("prop", PropType::U8); + assert!(result.is_ok()); assert_eq!(result.unwrap().inner(), 0); assert_eq!(prop_mapper.get_dtype(0), Some(PropType::U8)); } #[test] - fn test_get_or_create_and_validate_resize_vector() { + fn get_or_create_and_validate_resize_vector() { let prop_mapper = PropMapper::default(); + prop_mapper.set_id_and_dtype("existing_prop", 5, PropType::U8); + let result = prop_mapper.get_or_create_and_validate("new_prop", PropType::U16); + assert!(result.is_ok()); assert_eq!(result.unwrap().inner(), 6); assert_eq!(prop_mapper.get_dtype(6), Some(PropType::U16)); } #[test] - fn test_get_or_create_and_validate_two_independent_properties() { + fn get_or_create_and_validate_two_independent_properties() { let prop_mapper = PropMapper::default(); let result1 = prop_mapper.get_or_create_and_validate("prop1", PropType::U8); let result2 = prop_mapper.get_or_create_and_validate("prop2", PropType::U16); + assert!(result1.is_ok()); assert!(result2.is_ok()); assert_eq!(result1.unwrap().inner(), 0); @@ -597,4 +668,172 @@ mod tests { assert_eq!(prop_mapper.get_dtype(0), Some(PropType::U8)); assert_eq!(prop_mapper.get_dtype(1), Some(PropType::U16)); } + + #[test] + fn unify_types_increases_row_size() { + let map_1 = PropType::map([("name", PropType::Str)]); + let map_2 = PropType::map([("location", PropType::Str)]); + + let mut unified = false; + let expected_type = unify_types(&map_1, &map_2, &mut unified).unwrap(); + let expected_delta = expected_type.est_size() - map_1.est_size(); + + assert!(unified); + assert!(expected_delta > 0, "should grow est_size on unify"); + + let prop_mapper = PropMapper::default(); + prop_mapper + .get_or_create_and_validate("attrs", map_1.clone()) + .unwrap(); + + let before = prop_mapper.row_size(); + + assert_eq!(before, map_1.est_size()); + + prop_mapper + .get_or_create_and_validate("attrs", map_2.clone()) + .unwrap(); + + let after = prop_mapper.row_size(); + + assert_eq!(after, before + expected_delta); + assert_eq!(after, expected_type.est_size()); + assert_eq!(prop_mapper.get_dtype(0), Some(expected_type)); + } +} + +#[cfg(test)] +mod write_locked_prop_mapper_tests { + use super::*; + + #[test] + fn new_id_and_dtype() { + let prop_mapper = PropMapper::default(); + + let id = { + let mut locked = prop_mapper.write_locked(); + locked.new_id_and_dtype("new_prop", PropType::U8) + }; + + assert_eq!(id, 0); + assert_eq!(prop_mapper.get_dtype(0), Some(PropType::U8)); + } + + #[test] + fn set_or_unify_existing_property_same_type() { + let prop_mapper = PropMapper::default(); + + let id = { + let mut locked = prop_mapper.write_locked(); + let id = locked.new_id_and_dtype("existing_prop", PropType::U8); + locked.set_or_unify_dtype(id, PropType::U8).unwrap(); + id + }; + + assert_eq!(id, 0); + assert_eq!(prop_mapper.get_dtype(0), Some(PropType::U8)); + } + + #[test] + fn set_or_unify_existing_property_different_type() { + let prop_mapper = PropMapper::default(); + + let result = { + let mut locked = prop_mapper.write_locked(); + let id = locked.new_id_and_dtype("existing_prop", PropType::U8); + + locked.set_or_unify_dtype(id, PropType::U16) + }; + + assert!(result.is_err()); + + if let Err(PropError { + expected, actual, .. + }) = result + { + assert_eq!(expected, PropType::U8); + assert_eq!(actual, PropType::U16); + } else { + panic!("Expected PropError"); + } + } + + #[test] + fn set_or_unify_types() { + let prop_mapper = PropMapper::default(); + + let id = { + let mut locked = prop_mapper.write_locked(); + let id = locked.new_id_and_dtype("prop", PropType::Empty); + locked.set_or_unify_dtype(id, PropType::U8).unwrap(); + id + }; + + assert_eq!(id, 0); + assert_eq!(prop_mapper.get_dtype(0), Some(PropType::U8)); + } + + #[test] + fn new_id_and_dtype_resize_vector() { + let prop_mapper = PropMapper::default(); + + let id = { + let mut locked = prop_mapper.write_locked(); + locked.set_id_and_dtype("existing_prop", 5, PropType::U8); + locked.new_id_and_dtype("new_prop", PropType::U16) + }; + + assert_eq!(id, 6); + assert_eq!(prop_mapper.get_dtype(6), Some(PropType::U16)); + } + + #[test] + fn new_id_and_dtype_two_independent_properties() { + let prop_mapper = PropMapper::default(); + + let (id1, id2) = { + let mut locked = prop_mapper.write_locked(); + let id1 = locked.new_id_and_dtype("prop1", PropType::U8); + let id2 = locked.new_id_and_dtype("prop2", PropType::U16); + + (id1, id2) + }; + + assert_eq!(id1, 0); + assert_eq!(id2, 1); + assert_eq!(prop_mapper.get_dtype(0), Some(PropType::U8)); + assert_eq!(prop_mapper.get_dtype(1), Some(PropType::U16)); + } + + #[test] + fn unify_types_increases_row_size() { + let map_1 = PropType::map([("name", PropType::Str)]); + let map_2 = PropType::map([("location", PropType::Str)]); + + let mut unified = false; + let expected_type = unify_types(&map_1, &map_2, &mut unified).unwrap(); + let expected_delta = expected_type.est_size() - map_1.est_size(); + + assert!(unified); + assert!(expected_delta > 0, "should grow est_size on unify"); + + let prop_mapper = PropMapper::default(); + let id = { + let mut locked = prop_mapper.write_locked(); + locked.new_id_and_dtype("attrs", map_1.clone()) + }; + + let before = prop_mapper.row_size(); + assert_eq!(before, map_1.est_size()); + + { + let mut locked = prop_mapper.write_locked(); + locked.set_or_unify_dtype(id, map_2.clone()).unwrap(); + } + + let after = prop_mapper.row_size(); + assert_eq!(after, before + expected_delta); + assert_eq!(after, expected_type.est_size()); + assert_eq!(prop_mapper.get_dtype(0), Some(expected_type)); + } } diff --git a/raphtory-tests/src/assertions.rs b/raphtory-tests/src/assertions.rs index f13b503202..b00e725969 100644 --- a/raphtory-tests/src/assertions.rs +++ b/raphtory-tests/src/assertions.rs @@ -2,7 +2,7 @@ use raphtory::{ db::api::view::{filter_ops::Filter, StaticGraphViewOps}, prelude::{EdgeViewOps, Graph, GraphViewOps, NodeViewOps}, }; -use std::ops::Range; +use std::{ops::Range, thread, time::Duration}; #[cfg(feature = "search")] use raphtory::prelude::{IndexMutationOps, SearchableGraphOps}; @@ -382,18 +382,7 @@ fn assert_results( variants: Vec, apply: impl ApplyFilter, ) { - fn sorted(iter: I) -> Vec - where - I: IntoIterator, - S: AsRef, - { - let mut v: Vec = iter.into_iter().map(|s| s.as_ref().to_string()).collect(); - v.sort(); - v - } - let graph = init_graph(Graph::new()); - let expected = sorted(expected.iter()); for v in variants { @@ -415,6 +404,16 @@ fn assert_results( } } +fn sorted(iter: I) -> Vec +where + I: IntoIterator, + S: AsRef, +{ + let mut v: Vec = iter.into_iter().map(|s| s.as_ref().to_string()).collect(); + v.sort(); + v +} + pub fn filter_nodes(graph: &Graph, filter: impl CreateFilter) -> Vec { let mut results = graph .filter(filter) diff --git a/raphtory-tests/tests/test_filters.rs b/raphtory-tests/tests/test_filters.rs index a6785a98fa..b8b8cc6891 100644 --- a/raphtory-tests/tests/test_filters.rs +++ b/raphtory-tests/tests/test_filters.rs @@ -5091,6 +5091,7 @@ mod test_node_property_filter_agg { Prop::List(v.into()) } + /// Writes a set of node temporal properties and node metadata to the given graph. pub fn init_nodes_graph< G: StaticGraphViewOps + AdditionOps @@ -5100,6 +5101,7 @@ mod test_node_property_filter_agg { >( graph: G, ) -> G { + // Each tuple represents (timestamp, node_name, properties). let nodes: [(i64, &str, Vec<(&str, Prop)>); 12] = [ ( 1, @@ -5300,6 +5302,7 @@ mod test_node_property_filter_agg { graph.add_node(t, id, props, None, None).unwrap(); } + // Each tuple represents (node_name, properties). let metadata: [(&str, Vec<(&str, Prop)>); 8] = [ ( "n1",