Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 27 additions & 11 deletions src/did/implementations/doc2sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ def _get_superclass_str(doc_props):
MATLAB produces comma-space separated, sorted unique superclass names.
For MATLAB-style definitions like "$PATH/base.json", strip path and extension.
For DID-python style ["base", "demoA"], use directly.

Schema-v2 (DID-schema V_delta / V_epsilon) names a superclass directly with a
bare ``{"class_name": ...}`` object. Read ``class_name`` first but UNION in the
``definition``-derived name rather than short-circuiting, so a mixed-shape entry
never silently narrows the superclass set. This mirrors the reference contract
in ndi-cloud-node ``api/src/dal/class_lineage.ts`` (``computeClassLineage``) and
NDI-python ``ndi.document.doc_superclass``, keeping the SQL ``meta.superclass``
column (and both isa paths that read it) consistent across all three stacks.
"""
# DID-python schema format: top-level 'superclasses' list of strings
if "superclasses" in doc_props and isinstance(
Expand All @@ -65,12 +73,16 @@ def _get_superclass_str(doc_props):
for sc in superclasses:
if isinstance(sc, str):
names.append(sc)
elif isinstance(sc, dict) and "definition" in sc:
# MATLAB-style: extract name from definition path
defn = sc["definition"]
name = re.sub(r".+/", "", defn)
name = re.sub(r"\..+$", "", name)
names.append(name)
elif isinstance(sc, dict):
# class_name-first, UNION with the definition-derived name.
if sc.get("class_name"):
names.append(sc["class_name"])
if sc.get("definition"):
# MATLAB-style: extract name from definition path
defn = sc["definition"]
name = re.sub(r".+/", "", defn)
name = re.sub(r"\..+$", "", name)
names.append(name)
names = sorted(set(names))
return ", ".join(names)

Expand All @@ -82,11 +94,15 @@ def _get_superclass_str(doc_props):
if isinstance(superclasses, list):
names = []
for sc in superclasses:
if isinstance(sc, dict) and "definition" in sc:
defn = sc["definition"]
name = re.sub(r".+/", "", defn)
name = re.sub(r"\..+$", "", name)
names.append(name)
if isinstance(sc, dict):
# class_name-first, UNION with the definition-derived name.
if sc.get("class_name"):
names.append(sc["class_name"])
if sc.get("definition"):
defn = sc["definition"]
name = re.sub(r".+/", "", defn)
name = re.sub(r"\..+$", "", name)
names.append(name)
elif isinstance(sc, str):
names.append(sc)
names = sorted(set(names))
Expand Down
92 changes: 75 additions & 17 deletions src/did/implementations/sqlitedb.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,29 @@ def _sql_escape(value):
return str(value).replace("'", "''")


# Escape character used in LIKE patterns (see _sql_like_escape / ESCAPE clauses).
_LIKE_ESCAPE_CHAR = "\\"


def _sql_like_escape(value):
"""Escape LIKE wildcards in a literal operand of a LIKE pattern.

'%' and '_' are LIKE wildcards; without escaping, a field name containing
'_' would match any single character (e.g. 'a_b' would also match 'axb'),
producing false-positive matches. Callers that embed the result inside a
LIKE pattern must also append "ESCAPE '\\'" so the backslash is treated as
the escape character. The single-quote escaping for the surrounding SQL
string literal is applied on top of this by _sql_escape.
"""
if value is None:
return ""
text = str(value)
text = text.replace(_LIKE_ESCAPE_CHAR, _LIKE_ESCAPE_CHAR * 2)
text = text.replace("%", _LIKE_ESCAPE_CHAR + "%")
text = text.replace("_", _LIKE_ESCAPE_CHAR + "_")
return text


class SQLiteDB(Database):
def __init__(self, filename):
super().__init__(connection=filename)
Expand Down Expand Up @@ -413,7 +436,13 @@ def _search_doc_ids(self, search_struct, branch_id):
return result

# Leaf query: build SQL and execute
sql_clause = self._query_struct_to_sql_str(search_struct)
try:
sql_clause = self._query_struct_to_sql_str(search_struct)
except (ValueError, TypeError):
# A numeric operation (exact_number/lessthan/greaterthan/...) was
# given a non-numeric param1, so the float() conversion failed.
# Fall back to brute force rather than aborting the whole search.
return self._brute_force_search(search_struct, branch_id)
if sql_clause is None:
# Fallback to brute-force for unsupported operations
return self._brute_force_search(search_struct, branch_id)
Expand Down Expand Up @@ -494,17 +523,33 @@ def _query_struct_to_sql_str(self, search_struct):
return f"fields.field_name = '{field}' AND CAST(doc_data.value AS REAL) >= {float(param1)}"

elif op_lower == "hasfield":
# 'field' is charset-restricted above, but it may legitimately
# contain '_', which is a LIKE wildcard. Escape LIKE wildcards in
# the literal prefix and add an ESCAPE clause so a field name like
# 'a_b' matches 'a_b[.subfield]' exactly, not 'axb'. The trailing
# '.%' is a real wildcard and is left unescaped.
field_like = _sql_like_escape(field)
return (
f"(fields.field_name = '{field}' OR fields.field_name LIKE '{field}.%')"
f"(fields.field_name = '{field}' "
f"OR fields.field_name LIKE '{field_like}.%' ESCAPE '{_LIKE_ESCAPE_CHAR}')"
)

elif op_lower == "isa":
# isa: match on meta.class (exact) OR meta.superclass (contains)
# isa: match on meta.class (exact) OR meta.superclass (contains).
# The meta.class branch is an exact string compare, so it only
# needs SQL-literal escaping. The meta.superclass branch embeds the
# class name inside a regexp() pattern; regex metacharacters in the
# class name (e.g. '.') must be regex-escaped first, otherwise a
# name like 'foo.bar' would also match 'fooxbar'. Anchor it as an
# exact list-element match between the '(^|, )' / '(,|$)' delimiters.
classname = _sql_escape(param1)
classname_re = _sql_escape(
_re.escape("" if param1 is None else str(param1))
)
return (
f"((fields.field_name = 'meta.class' AND doc_data.value = '{classname}') "
f"OR (fields.field_name = 'meta.superclass' AND "
f"regexp('(^|, ){classname}(,|$)', doc_data.value) IS NOT NULL))"
f"regexp('(^|, ){classname_re}(,|$)', doc_data.value) IS NOT NULL))"
)

elif op_lower == "depends_on":
Expand Down Expand Up @@ -603,19 +648,32 @@ def get_docs(self, document_ids, branch_id=None, OnMissing="error", **kwargs):
# and docs indexes. Branch membership is enforced by the JOIN; docs
# not in the branch simply aren't returned and are handled by the
# OnMissing pass below (same behaviour as before).
placeholders = ",".join("?" for _ in document_ids)
if branch_id is not None:
rows = self.do_run_sql_query(
f"SELECT d.doc_id, d.json_code FROM docs d "
f"JOIN branch_docs bd ON d.doc_idx = bd.doc_idx "
f"WHERE bd.branch_id = ? AND d.doc_id IN ({placeholders})",
(branch_id, *document_ids),
)
else:
rows = self.do_run_sql_query(
f"SELECT doc_id, json_code FROM docs WHERE doc_id IN ({placeholders})",
tuple(document_ids),
)
# Chunk the IN-list: SQLite caps host parameters per statement
# (SQLITE_MAX_VARIABLE_NUMBER — 999 on older builds), so a get_docs
# over thousands of ids (e.g. a cross-document query on a large cloud
# dataset) would raise "too many SQL variables". Batch under the limit
# and accumulate; order is restored from doc_map below.
_CHUNK = 900
rows = []
for _i in range(0, len(document_ids), _CHUNK):
chunk = document_ids[_i : _i + _CHUNK]
placeholders = ",".join("?" for _ in chunk)
if branch_id is not None:
rows.extend(
self.do_run_sql_query(
f"SELECT d.doc_id, d.json_code FROM docs d "
f"JOIN branch_docs bd ON d.doc_idx = bd.doc_idx "
f"WHERE bd.branch_id = ? AND d.doc_id IN ({placeholders})",
(branch_id, *chunk),
)
)
else:
rows.extend(
self.do_run_sql_query(
f"SELECT doc_id, json_code FROM docs WHERE doc_id IN ({placeholders})",
tuple(chunk),
)
)

# Build lookup dict
doc_map = {}
Expand Down
82 changes: 82 additions & 0 deletions tests/test_doc2sql_superclass.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,85 @@ def test_empty_superclasses(self):
def test_no_superclasses(self):
doc_props = {}
assert _get_superclass_str(doc_props) == ""


class TestGetSuperclassStrClassName:
"""Schema-v2 (V_delta / V_epsilon) names superclasses with bare
``{"class_name": ...}`` objects. ``_get_superclass_str`` must read
``class_name`` first but UNION in any ``definition``-derived name (never
short-circuit), so ``meta.superclass`` — and both isa paths that read it —
stay consistent with ndi-cloud-node ``class_lineage.ts`` and NDI-python
``ndi.document.doc_superclass``. On today's corpus every entry is
``{definition}`` (class_name count = 0), so this branch is purely additive.
"""

# --- top-level 'superclasses' branch (DID-python schema shape) ---
def test_top_level_class_name_only(self):
assert _get_superclass_str({"superclasses": [{"class_name": "base"}]}) == "base"

def test_top_level_bare_dict_class_name(self):
assert _get_superclass_str({"superclasses": {"class_name": "base"}}) == "base"

def test_top_level_class_name_list(self):
doc_props = {"superclasses": [{"class_name": "base"}, {"class_name": "demoA"}]}
assert _get_superclass_str(doc_props) == "base, demoA"

def test_top_level_union_no_shortcircuit(self):
# CONFORMANCE PIN: class_name AND a *differing* definition -> BOTH names.
# A short-circuit accessor would drop "base" and return "custom_marker".
sc = {"class_name": "custom_marker", "definition": "$NDIDOCUMENTPATH/base.json"}
assert _get_superclass_str({"superclasses": [sc]}) == "base, custom_marker"

def test_top_level_mixed_agreeing_dedup(self):
sc = {"class_name": "base", "definition": "$NDIDOCUMENTPATH/base.json"}
assert _get_superclass_str({"superclasses": [sc]}) == "base"

def test_top_level_empty_class_name_falls_back(self):
sc = {"class_name": "", "definition": "$NDIDOCUMENTPATH/base.json"}
assert _get_superclass_str({"superclasses": [sc]}) == "base"

# --- document_class.superclasses branch (NDI / MATLAB shape) ---
def test_document_class_class_name_only(self):
doc = {"document_class": {"superclasses": [{"class_name": "base"}]}}
assert _get_superclass_str(doc) == "base"

def test_document_class_bare_dict_class_name(self):
doc = {"document_class": {"superclasses": {"class_name": "base"}}}
assert _get_superclass_str(doc) == "base"

def test_document_class_union_no_shortcircuit(self):
doc = {
"document_class": {
"superclasses": [
{
"class_name": "custom_marker",
"definition": "$NDIDOCUMENTPATH/base.json",
}
]
}
}
assert _get_superclass_str(doc) == "base, custom_marker"

def test_document_class_mixed_shapes(self):
# A document may mix a v2 entry and a legacy entry in one list.
doc = {
"document_class": {
"superclasses": [
{"class_name": "element"},
{"definition": "$NDIDOCUMENTPATH/base.json"},
]
}
}
assert _get_superclass_str(doc) == "base, element"

def test_document_class_v1_definition_unchanged(self):
# Regression guard: the entire current corpus is definition-only.
doc = {
"document_class": {
"superclasses": [
{"definition": "$NDIDOCUMENTPATH/element.json"},
{"definition": "$NDIDOCUMENTPATH/base.json"},
]
}
}
assert _get_superclass_str(doc) == "base, element"
102 changes: 102 additions & 0 deletions tests/test_isa_parity.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,5 +152,107 @@ def test_legitimate_dotted_field_still_works(self):
self.assertEqual(result, ["id_probe"])


class TestVEpsilonDiamondIsa(unittest.TestCase):
"""V_epsilon's observation tier is the first MULTIPLE-INHERITANCE (diamond)
hierarchy: ``body_weight_observation`` isa ``scalar_observation`` AND
``scalar_mass``, both reaching ``base``. A produced V_epsilon document
carries its FLATTENED ancestor list as bare ``{class_name}`` entries. Both
isa paths -- the SQL ``meta.superclass`` column (populated by
``doc2sql._get_superclass_str``) and the brute-force ``field_search`` -- must
resolve every ancestor, reached via either parent, with no spurious match.
"""

DB = "test_isa_v_epsilon_diamond.sqlite"

@staticmethod
def _v2doc(class_name, superclasses):
# V_epsilon shape: superclasses named by bare {class_name} (not {definition}).
return Document(
{
"document_class": {
"definition": f"$NDIDOCUMENTPATH/{class_name}.json",
"class_name": class_name,
"class_version": 1,
"property_list_name": class_name,
"superclasses": [{"class_name": s} for s in superclasses],
},
"base": {
"id": f"id_{class_name}",
"name": class_name,
"datestamp": "2026-06-24T00:00:00",
},
}
)

@classmethod
def setUpClass(cls):
if os.path.exists(cls.DB):
os.remove(cls.DB)
cls.db = SQLiteDB(cls.DB)
cls.db.add_branch("a")
# The diamond leaf carries its FLATTENED ancestor list, exactly as a real
# producer stamps it (both parents + the shared root).
specs = [
("base", []),
("scalar_observation", ["base"]),
("scalar_mass", ["base"]),
("body_weight_observation", ["scalar_observation", "scalar_mass", "base"]),
]
cls.by_class = {}
cls.docs = []
for class_name, supers in specs:
d = cls._v2doc(class_name, supers)
cls.by_class[class_name] = d
cls.docs.append(d)
cls.db._do_add_doc(d, "a")

@classmethod
def tearDownClass(cls):
cls.db._close_db()
if os.path.exists(cls.DB):
os.remove(cls.DB)

def _sql(self, class_name):
return sorted(self.db.search(Query("", "isa", class_name), branch_id="a"))

def _brute(self, class_name):
ss = Query("", "isa", class_name).to_search_structure()
return sorted(
d.id() for d in self.docs if field_search(d.document_properties, ss)
)

def _expect(self, *class_names):
return sorted(self.by_class[c].id() for c in class_names)

def test_diamond_leaf_isa_both_parents_and_shared_ancestor(self):
leaf = "id_body_weight_observation"
for ancestor in ("scalar_observation", "scalar_mass", "base"):
self.assertIn(
leaf, self._sql(ancestor), f"SQL: leaf must be isa({ancestor})"
)
self.assertIn(leaf, self._brute(ancestor), f"brute: leaf isa({ancestor})")

def test_base_matches_whole_diamond(self):
# base is the root: every class in the diamond is isa(base).
self.assertEqual(
self._sql("base"),
self._expect(
"base", "scalar_observation", "scalar_mass", "body_weight_observation"
),
)

def test_diamond_sql_and_brute_agree(self):
for c in (
"body_weight_observation",
"scalar_observation",
"scalar_mass",
"base",
"nonexistent",
):
self.assertEqual(
self._sql(c), self._brute(c), f"isa({c}) SQL vs brute-force mismatch"
)


if __name__ == "__main__":
unittest.main()
Loading
Loading