feat(records): ingest endpoint#2652
Conversation
1636624 to
4287519
Compare
There was a problem hiding this comment.
Code Review
This pull request adds support for ingesting records into data modeling streams by introducing the ingest method to both the sync and async records API, along with supporting data classes (RecordWrite, RecordSource, RecordSourceReference, and RecordWriteList). Feedback on the changes focuses on adding missing type hints to class attributes across the new data classes to comply with the repository style guide, and safely handling potential null values for sources in RecordWrite._load to prevent a TypeError.
| @classmethod | ||
| def _load(cls, resource: dict[str, Any]) -> Self: | ||
| return cls( | ||
| space=resource["space"], | ||
| external_id=resource["externalId"], | ||
| sources=[RecordSource._load(s) for s in resource.get("sources", [])], | ||
| ) |
There was a problem hiding this comment.
If the API response contains sources as null (which is represented as None in Python), resource.get("sources", []) will return None. This will cause a TypeError when attempting to iterate over it. Use resource.get("sources") or [] to safely handle this case.
| @classmethod | |
| def _load(cls, resource: dict[str, Any]) -> Self: | |
| return cls( | |
| space=resource["space"], | |
| external_id=resource["externalId"], | |
| sources=[RecordSource._load(s) for s in resource.get("sources", [])], | |
| ) | |
| @classmethod | |
| def _load(cls, resource: dict[str, Any]) -> Self: | |
| return cls( | |
| space=resource["space"], | |
| external_id=resource["externalId"], | |
| sources=[RecordSource._load(s) for s in resource.get("sources") or []], | |
| ) |
| class RecordWriteList(CogniteResourceList[RecordWrite]): | ||
| """A list of :class:`RecordWrite` objects.""" | ||
|
|
||
| _RESOURCE = RecordWrite |
There was a problem hiding this comment.
Please add a type hint for the class attribute _RESOURCE to comply with the repository style guide requirement that all class attributes must have type hints.
| class RecordWriteList(CogniteResourceList[RecordWrite]): | |
| """A list of :class:`RecordWrite` objects.""" | |
| _RESOURCE = RecordWrite | |
| class RecordWriteList(CogniteResourceList[RecordWrite]): | |
| """A list of :class:`RecordWrite` objects.""" | |
| _RESOURCE: type[RecordWrite] = RecordWrite |
References
- All functions, methods, and class attributes must have type hints. (link)
| resource_path=self._records_url(stream_id), | ||
| ) | ||
|
|
||
| async def ingest(self, stream_id: str, items: RecordWrite | Sequence[RecordWrite]) -> None: |
There was a problem hiding this comment.
I suggest we adopt the "items as posarg, stream_id as kwarg" pattern here too:
| async def ingest(self, stream_id: str, items: RecordWrite | Sequence[RecordWrite]) -> None: | |
| async def ingest( | |
| self, | |
| items: RecordWrite | Sequence[RecordWrite], | |
| *, | |
| stream_id: str, | |
| ) -> None: |
| assert list_cls is not None | ||
| assert resource_cls is not None | ||
| assert input_resource_cls is not None |
| from cognite.client.utils._identifier import RecordId | ||
|
|
There was a problem hiding this comment.
| from cognite.client.utils._identifier import RecordId |
| from cognite.client.utils._identifier import RecordId | ||
|
|
There was a problem hiding this comment.
| from cognite.client.utils._identifier import RecordId |
| } | ||
|
|
||
|
|
||
| class RecordWrite(WriteableCogniteResource["RecordWrite"]): |
There was a problem hiding this comment.
We have used the word "Apply" in Data Modeling, i.e. NodeApply and EdgeApply, but honestly, I like Write better.
| return self | ||
|
|
||
|
|
||
| class RecordWriteList(CogniteResourceList[RecordWrite]): |
There was a problem hiding this comment.
This is missing an as_ids (not that write-list-type classes ever see any real use, dont even know why we have them 😆 )
There was a problem hiding this comment.
Let's remove all Record* classes from here (cognite/client/data_classes/__init__.py) and keep them in cognite/client/data_classes/data_modeling/__init__.py.
| ... ) | ||
| """ | ||
| self._warning.warn() | ||
| item_list: list[RecordWrite] = [items] if isinstance(items, RecordWrite) else list(items) |
There was a problem hiding this comment.
Does mypy complain about this? Would be nice to avoid making a full copy
| item_list: list[RecordWrite] = [items] if isinstance(items, RecordWrite) else list(items) | |
| item_list: list[RecordWrite] = [items] if isinstance(items, RecordWrite) else items |
| return len(self) == len({(r.space, r.external_id) for r in self._identifiers}) | ||
|
|
||
|
|
||
| class RecordSourceReference(CogniteResource): |
There was a problem hiding this comment.
I believe we can remove this class entirely and use the existing ContainerId
There was a problem hiding this comment.
Or make a shallow subclass RecordContainerId
| } | ||
|
|
||
|
|
||
| class RecordSource(CogniteResource): |
There was a problem hiding this comment.
If RecordSourceReference is replaced with ContainerId, then RecordSource becomes nearly identical to NodeOrEdgeData, but I think we should keep it. The node-or-edge thingy is very bloated due to support for TypedInstance, which for the foreseeable future should not make it into Records.
So, I just suggest update source to ContainerId here.
|
One last question, the API has both |
https://cognitedata.atlassian.net/browse/HVD-1261
Created a new PR (again), due to github issues.