diff --git a/.gitignore b/.gitignore index 21010f5..6b49697 100644 --- a/.gitignore +++ b/.gitignore @@ -27,5 +27,8 @@ share/python-wheels/ *.egg MANIFEST +# Virtual environments +.venv/ + # Symlink examples/growattServer diff --git a/growattServer/__init__.py b/growattServer/__init__.py index e6914e8..8c66a1b 100755 --- a/growattServer/__init__.py +++ b/growattServer/__init__.py @@ -26,3 +26,17 @@ "Timespan", "hash_password", ] + + +def __getattr__(name: str): + """Lazy imports for async classes (requires httpx).""" + if name == "AsyncGrowattApi": + from .async_base_api import AsyncGrowattApi # noqa: PLC0415 + + return AsyncGrowattApi + if name == "AsyncOpenApiV1": + from .open_api_v1.async_open_api_v1 import AsyncOpenApiV1 # noqa: PLC0415 + + return AsyncOpenApiV1 + msg = f"module {__name__!r} has no attribute {name!r}" + raise AttributeError(msg) diff --git a/growattServer/async_base_api.py b/growattServer/async_base_api.py new file mode 100644 index 0000000..98d75bb --- /dev/null +++ b/growattServer/async_base_api.py @@ -0,0 +1,707 @@ +"""Async Growatt API client using httpx.""" + +from __future__ import annotations + +import datetime +import json +import re +import secrets +import warnings +from typing import TYPE_CHECKING, Any + +import httpx + +from .base_api import Timespan, hash_password +from .exceptions import GrowattError + +if TYPE_CHECKING: + from typing import Self + +name = "growattServer" + + +async def _raise_for_status(response: httpx.Response) -> None: + response.raise_for_status() + + +class AsyncGrowattApi: + """Async client for Growatt API endpoints using httpx.""" + + server_url = "https://openapi.growatt.com/" + agent_identifier = "Dalvik/2.1.0 (Linux; U; Android 12; https://github.com/indykoning/PyPi_GrowattServer)" + + def __init__(self, add_random_user_id: bool = False, agent_identifier: str | None = None, session: httpx.AsyncClient | None = None) -> None: + """ + Initialize the async Growatt API client. + + Args: + add_random_user_id: Append a short random suffix to the user-agent. + agent_identifier: Optional override for the user-agent string. + session: Optional httpx.AsyncClient to reuse. + + """ + if agent_identifier is not None: + self.agent_identifier = agent_identifier + + if add_random_user_id: + random_number = "".join(str(secrets.randbelow(10)) for _ in range(5)) + self.agent_identifier += " - " + random_number + + if session is not None: + self.session = session + self._owns_session = False + else: + self.session = httpx.AsyncClient( + headers={"User-Agent": self.agent_identifier}, + follow_redirects=True, + timeout=None, # noqa: S113 + event_hooks={"response": [_raise_for_status]}, + ) + self._owns_session = True + + async def aclose(self) -> None: + """Close the underlying HTTP session if we own it.""" + if self._owns_session: + await self.session.aclose() + + async def __aenter__(self) -> Self: + """Enter the async context manager.""" + return self + + async def __aexit__(self, *args: object) -> None: + """Exit the async context manager.""" + await self.aclose() + + def __get_date_string(self, timespan: Timespan | None = None, date: datetime.datetime | None = None) -> str: + if timespan is not None and not isinstance(timespan, Timespan): + raise ValueError("timespan must be a Timespan enum value") + + if date is None: + date = datetime.datetime.now(datetime.UTC) + + date_str = "" + if timespan == Timespan.month: + date_str = date.strftime("%Y-%m") + else: + date_str = date.strftime("%Y-%m-%d") + + return date_str + + def get_url(self, page: str) -> str: + """Return the page URL.""" + return self.server_url + page + + async def login(self, username: str, password: str, is_password_hashed: bool = False) -> dict[str, Any]: + """ + Log the user in. + + Returns + ------- + 'data' -- A List containing Objects containing the folowing + 'plantName' -- Friendly name of the plant + 'plantId' -- The ID of the plant + 'service' + 'quality' + 'isOpenSmartFamily' + 'totalData' -- An Object + 'success' -- True or False + 'msg' + 'app_code' + 'user' -- An Object containing a lot of user information + 'uid' + 'userLanguage' + 'inverterGroup' -- A List + 'timeZone' -- A Number + 'lat' + 'lng' + 'dataAcqList' -- A List + 'type' + 'accountName' -- The username + 'password' -- The password hash of the user + 'isValiPhone' + 'kind' + 'mailNotice' -- True or False + 'id' + 'lasLoginIp' + 'lastLoginTime' + 'userDeviceType' + 'phoneNum' + 'approved' -- True or False + 'area' -- Continent of the user + 'smsNotice' -- True or False + 'isAgent' + 'token' + 'nickName' + 'parentUserId' + 'customerCode' + 'country' + 'isPhoneNumReg' + 'createDate' + 'rightlevel' + 'appType' + 'serverUrl' + 'roleId' + 'enabled' -- True or False + 'agentCode' + 'inverterList' -- A list + 'email' + 'company' + 'activeName' + 'codeIndex' + 'appAlias' + 'isBigCustomer' + 'noticeType' + + """ + if not is_password_hashed: + password = hash_password(password) + + response = await self.session.post(self.get_url("newTwoLoginAPI.do"), data={ + "userName": username, + "password": password + }) + + data = response.json()["back"] + if data["success"]: + data.update({ + "userId": data["user"]["id"], + "userLevel": data["user"]["rightlevel"] + }) + return data + + async def plant_list(self, user_id: str) -> list[dict[str, Any]]: + """ + Get a list of plants connected to this account. + + Args: + user_id (str): The ID of the user. + + Returns: + list: A list of plants connected to the account. + + Raises: + httpx.HTTPError: If the request to the server fails. + + """ + response = await self.session.get( + self.get_url("PlantListAPI.do"), + params={"userId": user_id}, + follow_redirects=False + ) + + return response.json().get("back", []) + + async def plant_detail(self, plant_id: str, timespan: Timespan, date: datetime.datetime | None = None) -> dict[str, Any]: + """ + Get plant details for specified timespan. + + Args: + plant_id (str): The ID of the plant. + timespan (Timespan): The ENUM value conforming to the time window you want e.g. hours from today, days, or months. + date (datetime, optional): The date you are interested in. Defaults to datetime.datetime.now(). + + Returns: + dict: A dictionary containing the plant details. + + Raises: + httpx.HTTPError: If the request to the server fails. + + """ + date_str = self.__get_date_string(timespan, date) + + response = await self.session.get(self.get_url("PlantDetailAPI.do"), params={ + "plantId": plant_id, + "type": timespan.value, + "date": date_str + }) + + return response.json().get("back", {}) + + async def plant_list_two(self) -> list[dict[str, Any]]: + """ + Get a list of all plants with detailed information. + + Returns: + list: A list of plants with detailed information. + + """ + response = await self.session.post( + self.get_url("newTwoPlantAPI.do"), + params={"op": "getAllPlantListTwo"}, + data={ + "language": "1", + "nominalPower": "", + "order": "1", + "pageSize": "15", + "plantName": "", + "plantStatus": "", + "toPageNum": "1" + } + ) + + return response.json().get("PlantList", []) + + async def inverter_data(self, inverter_id: str, date: datetime.datetime | None = None) -> dict[str, Any]: + """ + Get inverter data for specified date or today. + + Args: + inverter_id (str): The ID of the inverter. + date (datetime, optional): The date you are interested in. Defaults to datetime.datetime.now(). + + Returns: + dict: A dictionary containing the inverter data. + + Raises: + httpx.HTTPError: If the request to the server fails. + + """ + date_str = self.__get_date_string(date=date) + response = await self.session.get(self.get_url("newInverterAPI.do"), params={ + "op": "getInverterData", + "id": inverter_id, + "type": 1, + "date": date_str + }) + + return response.json() + + async def inverter_detail(self, inverter_id: str) -> dict[str, Any]: + """ + Get detailed data from PV inverter. + + Args: + inverter_id (str): The ID of the inverter. + + Returns: + dict: A dictionary containing the inverter details. + + Raises: + httpx.HTTPError: If the request to the server fails. + + """ + response = await self.session.get(self.get_url("newInverterAPI.do"), params={ + "op": "getInverterDetailData", + "inverterId": inverter_id + }) + + return response.json() + + async def inverter_detail_two(self, inverter_id: str) -> dict[str, Any]: + """ + Get detailed data from PV inverter (alternative endpoint). + + Args: + inverter_id (str): The ID of the inverter. + + Returns: + dict: A dictionary containing the inverter details. + + Raises: + httpx.HTTPError: If the request to the server fails. + + """ + response = await self.session.get(self.get_url("newInverterAPI.do"), params={ + "op": "getInverterDetailData_two", + "inverterId": inverter_id + }) + + return response.json() + + async def tlx_system_status(self, plant_id: str, tlx_id: str) -> dict[str, Any]: + """Get status of the system.""" + response = await self.session.post( + self.get_url("newTlxApi.do"), + params={"op": "getSystemStatus_KW"}, + data={"plantId": plant_id, "id": tlx_id} + ) + return response.json().get("obj", {}) + + async def tlx_energy_overview(self, plant_id: str, tlx_id: str) -> dict[str, Any]: + """Get energy overview.""" + response = await self.session.post( + self.get_url("newTlxApi.do"), + params={"op": "getEnergyOverview"}, + data={"plantId": plant_id, "id": tlx_id} + ) + return response.json().get("obj", {}) + + async def tlx_energy_prod_cons(self, plant_id: str, tlx_id: str, timespan: Timespan = Timespan.hour, date: datetime.datetime | None = None) -> dict[str, Any]: + """Get energy production and consumption (kW).""" + date_str = self.__get_date_string(timespan, date) + response = await self.session.post( + self.get_url("newTlxApi.do"), + params={"op": "getEnergyProdAndCons_KW"}, + data={"date": date_str, "plantId": plant_id, "language": "1", + "id": tlx_id, "type": timespan.value} + ) + return response.json().get("obj", {}) + + async def tlx_data(self, tlx_id: str, date: datetime.datetime | None = None) -> dict[str, Any]: + """Get TLX inverter data for specified date or today.""" + date_str = self.__get_date_string(date=date) + response = await self.session.get(self.get_url("newTlxApi.do"), params={ + "op": "getTlxData", "id": tlx_id, "type": 1, "date": date_str + }) + return response.json() + + async def tlx_detail(self, tlx_id: str) -> dict[str, Any]: + """Get detailed data from TLX inverter.""" + response = await self.session.get(self.get_url("newTlxApi.do"), params={ + "op": "getTlxDetailData", "id": tlx_id + }) + return response.json() + + async def tlx_params(self, tlx_id: str) -> dict[str, Any]: + """Get parameters for TLX inverter.""" + response = await self.session.get(self.get_url("newTlxApi.do"), params={ + "op": "getTlxParams", "id": tlx_id + }) + return response.json() + + async def tlx_all_settings(self, tlx_id: str) -> dict[str, Any] | None: + """Get all possible settings from TLX inverter.""" + response = await self.session.post(self.get_url("newTlxApi.do"), params={ + "op": "getTlxSetData" + }, data={"serialNum": tlx_id}) + return response.json().get("obj", {}).get("tlxSetBean") + + async def tlx_enabled_settings(self, tlx_id: str) -> dict[str, Any]: + """Get "Enabled settings" from TLX inverter.""" + string_time = datetime.datetime.now(datetime.UTC).strftime("%Y-%m-%d") + response = await self.session.post( + self.get_url("newLoginAPI.do"), + params={"op": "getSetPass"}, + data={"deviceSn": tlx_id, "stringTime": string_time, "type": "5"} + ) + return response.json().get("obj", {}) + + async def tlx_battery_info(self, serial_num: str) -> dict[str, Any]: + """Get battery information.""" + response = await self.session.post( + self.get_url("newTlxApi.do"), + params={"op": "getBatInfo"}, + data={"lan": 1, "serialNum": serial_num} + ) + return response.json().get("obj", {}) + + async def tlx_battery_info_detailed(self, plant_id: str, serial_num: str) -> dict[str, Any]: + """Get detailed battery information.""" + response = await self.session.post( + self.get_url("newTlxApi.do"), + params={"op": "getBatDetailData"}, + data={"lan": 1, "plantId": plant_id, "id": serial_num} + ) + return response.json() + + async def mix_info(self, mix_id: str, plant_id: str | None = None) -> dict[str, Any]: + """Get high-level values from a Mix device.""" + request_params = {"op": "getMixInfo", "mixId": mix_id} + if plant_id: + request_params["plantId"] = plant_id + response = await self.session.get(self.get_url("newMixApi.do"), params=request_params) + return response.json().get("obj", {}) + + async def mix_totals(self, mix_id: str, plant_id: str) -> dict[str, Any]: + """Get totals values from a Mix device.""" + response = await self.session.post(self.get_url("newMixApi.do"), params={ + "op": "getEnergyOverview", "mixId": mix_id, "plantId": plant_id + }) + return response.json().get("obj", {}) + + async def mix_system_status(self, mix_id: str, plant_id: str) -> dict[str, Any]: + """Get current status from a Mix device.""" + response = await self.session.post(self.get_url("newMixApi.do"), params={ + "op": "getSystemStatus_KW", "mixId": mix_id, "plantId": plant_id + }) + return response.json().get("obj", {}) + + async def mix_detail(self, mix_id: str, plant_id: str, timespan: Timespan = Timespan.hour, date: datetime.datetime | None = None) -> dict[str, Any]: + """Get Mix details for the given timespan.""" + date_str = self.__get_date_string(timespan, date) + response = await self.session.post( + self.get_url("newMixApi.do"), + params={ + "op": "getEnergyProdAndCons_KW", "plantId": plant_id, + "mixId": mix_id, "type": timespan.value, "date": date_str, + }, + ) + return response.json().get("obj", {}) + + async def get_mix_inverter_settings(self, serial_number: str) -> dict[str, Any]: + """Get the inverter settings related to battery modes.""" + default_params = {"op": "getMixSetParams", "serialNum": serial_number, "kind": 0} + response = await self.session.get(self.get_url("newMixApi.do"), params=default_params) + return response.json() + + async def dashboard_data(self, plant_id: str, timespan: Timespan = Timespan.hour, date: datetime.datetime | None = None) -> dict[str, Any]: + """Get dashboard data for a plant over a timespan.""" + date_str = self.__get_date_string(timespan, date) + response = await self.session.post(self.get_url("newPlantAPI.do"), params={ + "action": "getEnergyStorageData", "date": date_str, + "type": timespan.value, "plantId": plant_id, + }) + return response.json() + + async def plant_settings(self, plant_id: str) -> dict[str, Any]: + """Get a dictionary containing the settings for the specified plant.""" + response = await self.session.get(self.get_url("newPlantAPI.do"), params={ + "op": "getPlant", "plantId": plant_id + }) + return response.json() + + async def storage_detail(self, storage_id: str) -> dict[str, Any]: + """Get "All parameters" from battery storage.""" + response = await self.session.get(self.get_url("newStorageAPI.do"), params={ + "op": "getStorageInfo_sacolar", "storageId": storage_id + }) + return response.json() + + async def storage_params(self, storage_id: str) -> dict[str, Any]: + """Get much more detail from battery storage.""" + response = await self.session.get(self.get_url("newStorageAPI.do"), params={ + "op": "getStorageParams_sacolar", "storageId": storage_id + }) + return response.json() + + async def storage_energy_overview(self, plant_id: str, storage_id: str) -> dict[str, Any]: + """Get some energy/generation overview data.""" + response = await self.session.post(self.get_url("newStorageAPI.do?op=getEnergyOverviewData_sacolar"), params={ + "plantId": plant_id, "storageSn": storage_id + }) + return response.json().get("obj", {}) + + async def inverter_list(self, plant_id: str) -> list[dict[str, Any]]: + """Use device_list, it's more descriptive since the list contains more than inverters.""" + warnings.warn( + "This function may be deprecated in the future because naming is not correct, use device_list instead", DeprecationWarning, stacklevel=2) + return await self.device_list(plant_id) + + async def __get_all_devices(self, plant_id: str) -> dict[str, Any]: + """Get basic plant information with device list.""" + response = await self.session.get(self.get_url("newTwoPlantAPI.do"), + params={"op": "getAllDeviceList", + "plantId": plant_id, + "language": 1}) + return response.json().get("deviceList", {}) + + async def device_list(self, plant_id: str) -> list[dict[str, Any]]: + """Get a list of all devices connected to plant.""" + device_list = (await self.plant_info(plant_id)).get("deviceList", []) + + if not device_list: + device_list = await self.__get_all_devices(plant_id) + + return device_list + + async def plant_info(self, plant_id: str) -> dict[str, Any]: + """Get basic plant information with device list.""" + response = await self.session.get(self.get_url("newTwoPlantAPI.do"), params={ + "op": "getAllDeviceListTwo", "plantId": plant_id, "pageNum": 1, "pageSize": 1 + }) + return response.json() + + async def plant_energy_data(self, plant_id: str) -> dict[str, Any]: + """Get the energy data used in the 'Plant' tab in the phone.""" + response = await self.session.post(self.get_url("newTwoPlantAPI.do"), + params={"op": "getUserCenterEnertyDataByPlantid"}, + data={"language": 1, "plantId": plant_id}) + return response.json() + + async def is_plant_noah_system(self, plant_id: str) -> dict[str, Any]: + """Check whether a plant is a Noah system.""" + response = await self.session.post(self.get_url("noahDeviceApi/noah/isPlantNoahSystem"), data={ + "plantId": plant_id + }) + return response.json() + + async def noah_system_status(self, serial_number: str) -> dict[str, Any]: + """Get the Noah device status.""" + response = await self.session.post(self.get_url("noahDeviceApi/noah/getSystemStatus"), data={ + "deviceSn": serial_number + }) + return response.json() + + async def noah_info(self, serial_number: str) -> dict[str, Any]: + """Get detailed Noah device information.""" + response = await self.session.post(self.get_url("noahDeviceApi/noah/getNoahInfoBySn"), data={ + "deviceSn": serial_number + }) + return response.json() + + async def update_plant_settings(self, plant_id: str, changed_settings: dict[str, Any], current_settings: dict[str, Any] | None = None) -> dict[str, Any]: + """ + Update plant settings. + + Args: + plant_id: Plant identifier. + changed_settings: Dict of settings to change. + current_settings: Current settings dict or None. + + Returns: + dict: Server response indicating success or failure. + + """ + if current_settings is None: + current_settings = await self.plant_settings(plant_id) + + form_settings = { + "plantCoal": (None, str(current_settings["formulaCoal"])), + "plantSo2": (None, str(current_settings["formulaSo2"])), + "accountName": (None, str(current_settings["userAccount"])), + "plantID": (None, str(current_settings["id"])), + "plantFirm": (None, "0"), + "plantCountry": (None, str(current_settings["country"])), + "plantType": (None, str(current_settings["plantType"])), + "plantIncome": (None, str(current_settings["formulaMoneyStr"])), + "plantAddress": (None, str(current_settings["plantAddress"])), + "plantTimezone": (None, str(current_settings["timezone"])), + "plantLng": (None, str(current_settings["plant_lng"])), + "plantCity": (None, str(current_settings["city"])), + "plantCo2": (None, str(current_settings["formulaCo2"])), + "plantMoney": (None, str(current_settings["formulaMoneyUnitId"])), + "plantPower": (None, str(current_settings["nominalPower"])), + "plantLat": (None, str(current_settings["plant_lat"])), + "plantDate": (None, str(current_settings["createDateText"])), + "plantName": (None, str(current_settings["plantName"])), + } + + for setting, value in changed_settings.items(): + form_settings[setting] = (None, str(value)) + + response = await self.session.post(self.get_url( + "newTwoPlantAPI.do?op=updatePlant"), files=form_settings) + + return response.json() + + async def update_inverter_setting(self, serial_number: str, setting_type: str, + default_parameters: dict[str, Any], parameters: dict[str, Any] | list[Any]) -> dict[str, Any]: + """Apply inverter settings.""" + _ = serial_number + _ = setting_type + settings_parameters = parameters + + if isinstance(parameters, list): + settings_parameters = {} + for index, param in enumerate(parameters, start=1): + settings_parameters["param" + str(index)] = param + + settings_parameters = {**default_parameters, **settings_parameters} + + response = await self.session.post(self.get_url("newTcpsetAPI.do"), + params=settings_parameters) + return response.json() + + async def update_mix_inverter_setting(self, serial_number: str, setting_type: str, parameters: dict[str, Any] | list[Any]) -> dict[str, Any]: + """Set inverter parameters for a Mix inverter.""" + default_parameters = { + "op": "mixSetApiNew", "serialNum": serial_number, "type": setting_type + } + return await self.update_inverter_setting(serial_number, setting_type, + default_parameters, parameters) + + async def update_ac_inverter_setting(self, serial_number: str, setting_type: str, parameters: dict[str, Any] | list[Any]) -> dict[str, Any]: + """Set inverter parameters for an AC-coupled inverter.""" + default_parameters = { + "op": "spaSetApi", "serialNum": serial_number, "type": setting_type + } + return await self.update_inverter_setting(serial_number, setting_type, + default_parameters, parameters) + + async def update_tlx_inverter_time_segment(self, serial_number: str, segment_id: int, batt_mode: int, start_time: datetime.time, end_time: datetime.time, enabled: bool) -> dict[str, Any]: + """Update a TLX inverter time segment.""" + params = {"op": "tlxSet"} + data = { + "serialNum": serial_number, + "type": f"time_segment{segment_id}", + "param1": batt_mode, + "param2": start_time.strftime("%H"), + "param3": start_time.strftime("%M"), + "param4": end_time.strftime("%H"), + "param5": end_time.strftime("%M"), + "param6": "1" if enabled else "0" + } + + response = await self.session.post(self.get_url("newTcpsetAPI.do"), params=params, data=data) + result = response.json() + + if not result.get("success", False): + msg = f"Failed to update TLX inverter time segment: {result.get('msg', 'Unknown error')}" + raise GrowattError(msg) + + return result + + async def update_tlx_inverter_setting(self, serial_number: str, setting_type: str, parameter: dict[str, Any] | list[Any] | str) -> dict[str, Any]: + """Set parameters on a TLX inverter.""" + default_parameters = { + "op": "tlxSet", "serialNum": serial_number, "type": setting_type + } + + if not isinstance(parameter, (dict, list)): + parameter = {"param1": parameter} + elif isinstance(parameter, list): + parameter = {f"param{index+1}": param for index, param in enumerate(parameter)} + + return await self.update_inverter_setting(serial_number, setting_type, + default_parameters, parameter) + + async def update_noah_settings(self, serial_number: str, setting_type: str, parameters: dict[str, Any] | list[Any]) -> dict[str, Any]: + """Apply settings for a Noah device.""" + default_parameters = {"serialNum": serial_number, "type": setting_type} + settings_parameters = parameters + + if isinstance(parameters, list): + settings_parameters = {} + for index, param in enumerate(parameters, start=1): + settings_parameters["param" + str(index)] = param + + settings_parameters = {**default_parameters, **settings_parameters} + + response = await self.session.post(self.get_url("noahDeviceApi/noah/set"), + data=settings_parameters) + return response.json() + + async def classic_inverter_info(self, device_sn: str) -> dict[str, Any]: + """ + Get classic inverter information by scraping the inverter settings page. + + Args: + device_sn: The serial number of the inverter. + + Returns: + dict: A dictionary containing the inverter information. + + Raises: + GrowattError: If the inverter data cannot be extracted from the response. + + """ + response = await self.session.get( + self.get_url("commonDeviceSetC/setInverter"), + params={"type": "server", "invSn": device_sn}, + ) + + match = re.search(r"inv=JSON\.parse\('(\{.*?\})'\)", response.text) + if not match: + msg = f"Could not find inverter data in response for device {device_sn}" + raise GrowattError(msg) + + try: + return json.loads(match.group(1)) + except json.JSONDecodeError as err: + msg = f"Failed to parse inverter data JSON for device {device_sn}" + raise GrowattError(msg) from err + + async def update_classic_inverter_setting(self, default_parameters: dict[str, Any], parameters: dict[str, Any] | list[Any]) -> dict[str, Any]: + """Apply classic inverter settings.""" + settings_parameters = parameters + + if isinstance(parameters, list): + settings_parameters = {} + for index, param in enumerate(parameters, start=1): + settings_parameters["param" + str(index)] = param + + settings_parameters = {**default_parameters, **settings_parameters} + + response = await self.session.post(self.get_url("tcpSet.do"), + params=settings_parameters) + return response.json() diff --git a/growattServer/open_api_v1/async_open_api_v1.py b/growattServer/open_api_v1/async_open_api_v1.py new file mode 100644 index 0000000..85a19ab --- /dev/null +++ b/growattServer/open_api_v1/async_open_api_v1.py @@ -0,0 +1,355 @@ +"""Async OpenApi V1 extensions for Growatt API client using httpx.""" + +from __future__ import annotations + +import platform +import warnings +from datetime import UTC, date, datetime, time +from typing import Any + +from growattServer.async_base_api import AsyncGrowattApi +from growattServer.exceptions import GrowattV1ApiError + +from .devices.async_min import AsyncMin +from .devices.async_sph import AsyncSph + + +class AsyncOpenApiV1(AsyncGrowattApi): + """ + Async extended Growatt API client with V1 API support using httpx. + + This class extends the base AsyncGrowattApi class with methods for MIN and SPH devices using + the public V1 API described here: https://www.showdoc.com.cn/262556420217021/0. + """ + + def _create_user_agent(self) -> str: + python_version = platform.python_version() + system = platform.system() + release = platform.release() + machine = platform.machine() + + return f"Python/{python_version} ({system} {release}; {machine})" + + def __init__(self, token: str, session: Any = None) -> None: + """ + Initialize the async Growatt API client with V1 API support. + + Args: + token (str): API token for authentication (required for V1 API access). + session: Optional httpx.AsyncClient to reuse. + + """ + super().__init__(agent_identifier=self._create_user_agent(), session=session) + + self.api_url = f"{self.server_url}v1/" + self.session.headers.update({"token": token}) + + def process_response(self, response: dict[str, Any], operation_name: str = "API operation") -> dict[str, Any]: + """ + Process API response and handle errors. + + Args: + response (dict): The JSON response from the API + operation_name (str): Name of the operation for error messages + + Returns: + dict: The 'data' field from the response + + Raises: + GrowattV1ApiError: If the API returns an error response + + """ + if response.get("error_code", 1) != 0: + msg = f"Error during {operation_name}" + raise GrowattV1ApiError( + msg, + error_code=response["error_code"], + error_msg=response.get("error_msg", "Unknown error"), + ) + return response.get("data") + + def get_url(self, page: str) -> str: + """Return the page URL for the v1 API.""" + return self.api_url + page + + async def plant_list(self) -> dict[str, Any]: + """ + Get a list of all plants with detailed information. + + Returns: + dict: A dictionary containing plants information with 'count' and 'plants' keys. + + Raises: + GrowattV1ApiError: If the API returns an error response. + httpx.HTTPError: If there is an issue with the HTTP request. + + """ + request_data = { + "page": "", "perpage": "", "search_type": "", "search_keyword": "", + } + response = await self.session.get(url=self.get_url("plant/list"), data=request_data) + return self.process_response(response.json(), "getting plant list") + + async def plant_details(self, plant_id: int) -> dict[str, Any]: + """ + Get basic information about a power station. + + Args: + plant_id (int): Power Station ID + + Returns: + dict: A dictionary containing the plant details. + + Raises: + GrowattV1ApiError: If the API returns an error response. + httpx.HTTPError: If there is an issue with the HTTP request. + + """ + response = await self.session.get( + self.get_url("plant/details"), params={"plant_id": plant_id} + ) + return self.process_response(response.json(), "getting plant details") + + async def plant_energy_overview(self, plant_id: int) -> dict[str, Any]: + """ + Get an overview of a plant's energy data. + + Args: + plant_id (int): Power Station ID + + Returns: + dict: A dictionary containing the plant energy overview. + + Raises: + GrowattV1ApiError: If the API returns an error response. + httpx.HTTPError: If there is an issue with the HTTP request. + + """ + response = await self.session.get( + self.get_url("plant/data"), params={"plant_id": plant_id} + ) + return self.process_response(response.json(), "getting plant energy overview") + + async def plant_power_overview(self, plant_id: int, day: str | date | None = None) -> dict: + """ + Obtain power data of a certain power station. + + Args: + plant_id (int): Power Station ID + day (date): Date - defaults to today in the local system timezone + + Returns: + dict: A dictionary containing the plants power data. + + Raises: + GrowattV1ApiError: If the API returns an error response. + httpx.HTTPError: If there is an issue with the HTTP request. + + """ + if day is None: + day = datetime.now(tz=UTC).astimezone().date() + + response = await self.session.get( + self.get_url("plant/power"), + params={"plant_id": plant_id, "date": day}, + ) + return self.process_response(response.json(), "getting plant power overview") + + async def plant_energy_history( + self, + plant_id: int, + start_date: date | None = None, + end_date: date | None = None, + time_unit: str = "day", + page: int | None = None, + perpage: int | None = None, + ) -> dict[str, Any]: + """ + Retrieve plant energy data for multiple days/months/years. + + Args: + plant_id (int): Power Station ID + start_date (date, optional): Start Date - defaults to today in the local system timezone + end_date (date, optional): End Date - defaults to today in the local system timezone + time_unit (str, optional): Time unit ('day', 'month', 'year') - defaults to 'day' + page (int, optional): Page number - defaults to 1 + perpage (int, optional): Number of items per page - defaults to 20, max 100 + + Returns: + dict: A dictionary containing the plant energy history. + + Raises: + GrowattV1ApiError: If the API returns an error response. + httpx.HTTPError: If there is an issue with the HTTP request. + + """ + max_day_interval = 7 + max_year_interval = 20 + + if start_date is None and end_date is None: + start_date = datetime.now(tz=UTC).astimezone().date() + end_date = datetime.now(tz=UTC).astimezone().date() + elif start_date is None: + start_date = end_date + elif end_date is None: + end_date = start_date + + if time_unit == "day" and (end_date - start_date).days > max_day_interval: + warnings.warn( + "Date interval must not exceed 7 days in 'day' mode.", + RuntimeWarning, stacklevel=2, + ) + elif time_unit == "month" and (end_date.year - start_date.year > 1): + warnings.warn( + "Start date must be within same or previous year in 'month' mode.", + RuntimeWarning, stacklevel=2, + ) + elif time_unit == "year" and (end_date.year - start_date.year > max_year_interval): + warnings.warn( + "Date interval must not exceed 20 years in 'year' mode.", + RuntimeWarning, stacklevel=2, + ) + + response = await self.session.get( + self.get_url("plant/energy"), + params={ + "plant_id": plant_id, + "start_date": start_date.strftime("%Y-%m-%d"), + "end_date": end_date.strftime("%Y-%m-%d"), + "time_unit": time_unit, + "page": page, + "perpage": perpage, + }, + ) + return self.process_response(response.json(), "getting plant energy history") + + async def device_list(self, plant_id: int) -> dict[str, Any]: + """ + Get devices associated with plant. + + Args: + plant_id (int): Power Station ID + + Returns: + dict: A dictionary containing device list data. + + """ + response = await self.session.get( + url=self.get_url("device/list"), + params={"plant_id": plant_id, "page": "", "perpage": ""}, + ) + return self.process_response(response.json(), "getting device list") + + def get_device(self, device_sn: str, device_type: int): + """Get the device class by serial number and device_type id.""" + match device_type: + case AsyncSph.DEVICE_TYPE_ID: + return AsyncSph(self, device_sn) + case AsyncMin.DEVICE_TYPE_ID: + return AsyncMin(self, device_sn) + case _: + warnings.warn( + f"Device for type id: {device_type} has not been implemented yet.", + stacklevel=2, + ) + return None + + async def min_detail(self, device_sn: str) -> dict[str, Any]: + """Get detailed data for a MIN inverter.""" + return await AsyncMin(self, device_sn).detail() + + async def min_energy(self, device_sn: str) -> dict[str, Any]: + """Get energy data for a MIN inverter.""" + return await AsyncMin(self, device_sn).energy() + + async def min_energy_history( + self, device_sn: str, start_date: date | None = None, end_date: date | None = None, + timezone: str | None = None, page: int | None = None, limit: int | None = None, + ) -> dict[str, Any]: + """Get MIN inverter data history.""" + return await AsyncMin(self, device_sn).energy_history( + start_date, end_date, timezone, page, limit + ) + + async def min_settings(self, device_sn: str) -> dict[str, Any]: + """Get settings for a MIN inverter.""" + return await AsyncMin(self, device_sn).settings() + + async def min_read_parameter( + self, device_sn: str, parameter_id: str, start_address: int | None = None, end_address: int | None = None + ) -> dict[str, Any]: + """Read setting from MIN inverter.""" + return await AsyncMin(self, device_sn).read_parameter( + parameter_id, start_address, end_address + ) + + async def min_write_parameter(self, device_sn: str, parameter_id: str, parameter_values=None) -> dict[str, Any]: + """Set parameters on a MIN inverter.""" + return await AsyncMin(self, device_sn).write_parameter(parameter_id, parameter_values) + + async def min_write_time_segment( + self, device_sn: str, segment_id: int, batt_mode: int, start_time: time, end_time: time, enabled: bool = True + ) -> dict[str, Any]: + """Set a time segment for a MIN inverter.""" + return await AsyncMin(self, device_sn).write_time_segment( + segment_id, batt_mode, start_time, end_time, enabled + ) + + async def min_read_time_segments(self, device_sn: str, settings_data: dict[str, Any] | None = None) -> list[dict[str, Any]]: + """Read Time-of-Use (TOU) settings from a Growatt MIN/TLX inverter.""" + return await AsyncMin(self, device_sn).read_time_segments(settings_data) + + # SPH Device Methods (Device Type 5) + + async def sph_detail(self, device_sn: str) -> dict[str, Any]: + """Get detailed data for an SPH inverter.""" + return await AsyncSph(self, device_sn).detail() + + async def sph_energy(self, device_sn: str) -> dict[str, Any]: + """Get energy data for an SPH inverter.""" + return await AsyncSph(self, device_sn).energy() + + async def sph_energy_history( + self, device_sn: str, start_date: date | None = None, end_date: date | None = None, + timezone: str | None = None, page: int | None = None, limit: int | None = None, + ) -> dict[str, Any]: + """Get SPH inverter data history.""" + return await AsyncSph(self, device_sn).energy_history( + start_date, end_date, timezone, page, limit + ) + + async def sph_read_parameter( + self, device_sn: str, parameter_id: str | None = None, start_address: int | None = None, end_address: int | None = None + ) -> dict[str, Any]: + """Read setting from SPH inverter.""" + return await AsyncSph(self, device_sn).read_parameter( + parameter_id, start_address, end_address + ) + + async def sph_write_parameter(self, device_sn: str, parameter_id: str, parameter_values=None) -> dict[str, Any]: + """Set parameters on an SPH inverter.""" + return await AsyncSph(self, device_sn).write_parameter(parameter_id, parameter_values) + + async def sph_write_ac_charge_times( + self, device_sn: str, charge_power: int, charge_stop_soc: int, mains_enabled: bool, periods: list[dict[str, Any]] + ) -> dict[str, Any]: + """Set AC charge time periods for an SPH inverter.""" + return await AsyncSph(self, device_sn).write_ac_charge_times( + charge_power, charge_stop_soc, mains_enabled, periods + ) + + async def sph_write_ac_discharge_times( + self, device_sn: str, discharge_power: int, discharge_stop_soc: int, periods: list[dict[str, Any]] + ) -> dict[str, Any]: + """Set AC discharge time periods for an SPH inverter.""" + return await AsyncSph(self, device_sn).write_ac_discharge_times( + discharge_power, discharge_stop_soc, periods + ) + + async def sph_read_ac_charge_times(self, device_sn: str, settings_data: dict[str, Any] | None = None) -> dict[str, Any]: + """Read AC charge time periods and settings from an SPH inverter.""" + return await AsyncSph(self, device_sn).read_ac_charge_times(settings_data) + + async def sph_read_ac_discharge_times(self, device_sn: str, settings_data: dict[str, Any] | None = None) -> dict[str, Any]: + """Read AC discharge time periods and settings from an SPH inverter.""" + return await AsyncSph(self, device_sn).read_ac_discharge_times(settings_data) diff --git a/growattServer/open_api_v1/devices/__init__.py b/growattServer/open_api_v1/devices/__init__.py index 86c616e..846d266 100644 --- a/growattServer/open_api_v1/devices/__init__.py +++ b/growattServer/open_api_v1/devices/__init__.py @@ -4,3 +4,17 @@ from .abstract_device import AbstractDevice, ParameterValue # noqa: F401 from .min import Min # noqa: F401 from .sph import Sph # noqa: F401 + + +def __getattr__(name: str): + """Lazy imports for async device classes (requires httpx).""" + if name == "AsyncMin": + from .async_min import AsyncMin # noqa: PLC0415 + + return AsyncMin + if name == "AsyncSph": + from .async_sph import AsyncSph # noqa: PLC0415 + + return AsyncSph + msg = f"module {__name__!r} has no attribute {name!r}" + raise AttributeError(msg) diff --git a/growattServer/open_api_v1/devices/async_min.py b/growattServer/open_api_v1/devices/async_min.py new file mode 100644 index 0000000..188915b --- /dev/null +++ b/growattServer/open_api_v1/devices/async_min.py @@ -0,0 +1,222 @@ +"""Async Min/TLX device file using httpx.""" + +from __future__ import annotations + +from datetime import UTC, date, datetime, time, timedelta +from typing import Any + +from growattServer.exceptions import GrowattParameterError + +from .abstract_device import AbstractDevice, ParameterValue + + +class AsyncMin(AbstractDevice): + """Async Min/TLX device type.""" + + DEVICE_TYPE_ID = 7 + + async def detail(self) -> dict[str, Any]: + """Get detailed data for a MIN inverter.""" + response = await self.api.session.get( + self.api.get_url("device/tlx/tlx_data_info"), + params={"device_sn": self.device_sn}, + ) + return self.api.process_response( + response.json(), "getting MIN inverter details" + ) + + async def energy(self) -> dict[str, Any]: + """Get energy data for a MIN inverter.""" + response = await self.api.session.post( + url=self.api.get_url("device/tlx/tlx_last_data"), + data={"tlx_sn": self.device_sn}, + ) + return self.api.process_response( + response.json(), "getting MIN inverter energy data" + ) + + async def energy_history( + self, start_date: date | None = None, end_date: date | None = None, timezone: str | None = None, page: int | None = None, limit: int | None = None + ) -> dict[str, Any]: + """Get MIN inverter data history.""" + if start_date is None and end_date is None: + start_date = datetime.now(tz=UTC).astimezone().date() + end_date = datetime.now(tz=UTC).astimezone().date() + elif start_date is None: + start_date = end_date + elif end_date is None: + end_date = start_date + + if end_date - start_date > timedelta(days=7): + raise GrowattParameterError("date interval must not exceed 7 days") + + response = await self.api.session.post( + url=self.api.get_url("device/tlx/tlx_data"), + data={ + "tlx_sn": self.device_sn, + "start_date": start_date.strftime("%Y-%m-%d"), + "end_date": end_date.strftime("%Y-%m-%d"), + "timezone_id": timezone, + "page": page, + "perpage": limit, + }, + ) + return self.api.process_response( + response.json(), "getting MIN inverter energy history" + ) + + async def settings(self) -> dict[str, Any]: + """Get settings for a MIN inverter.""" + response = await self.api.session.get( + self.api.get_url("device/tlx/tlx_set_info"), + params={"device_sn": self.device_sn}, + ) + return self.api.process_response( + response.json(), "getting MIN inverter settings" + ) + + async def read_parameter( + self, parameter_id: str, start_address: int | None = None, end_address: int | None = None + ) -> dict[str, Any]: + """Read setting from MIN inverter.""" + self.validate_read_parameter_input(parameter_id, start_address, end_address) + + if parameter_id is not None: + start_address = 0 + end_address = 0 + else: + parameter_id = "set_any_reg" + if start_address is None: + start_address = end_address + if end_address is None: + end_address = start_address + + response = await self.api.session.post( + self.api.get_url("readMinParam"), + data={ + "device_sn": self.device_sn, + "paramId": parameter_id, + "startAddr": start_address, + "endAddr": end_address, + }, + ) + return self.api.process_response( + response.json(), f"reading parameter {parameter_id}" + ) + + async def write_parameter(self, parameter_id: str, parameter_values: ParameterValue | None = None) -> dict[str, Any]: + """Set parameters on a MIN inverter.""" + max_min_params = 19 + parameters = dict.fromkeys(range(1, max_min_params + 1), "") + + if parameter_values is not None: + if isinstance(parameter_values, (str, int, float, bool)): + parameters[1] = str(parameter_values) + elif isinstance(parameter_values, list): + for i, value in enumerate(parameter_values, 1): + if i <= max_min_params: + parameters[i] = str(value) + elif isinstance(parameter_values, dict): + for pos_raw, value in parameter_values.items(): + pos = int(pos_raw) if not isinstance(pos_raw, int) else pos_raw + if 1 <= pos <= max_min_params: + parameters[pos] = str(value) + + request_data = {"tlx_sn": self.device_sn, "type": parameter_id} + for i in range(1, max_min_params + 1): + request_data[f"param{i}"] = str(parameters[i]) + + response = await self.api.session.post(self.api.get_url("tlxSet"), data=request_data) + return self.api.process_response( + response.json(), f"writing parameter {parameter_id}" + ) + + async def write_time_segment( + self, segment_id: int, batt_mode: int, start_time: time, end_time: time, enabled: bool = True + ) -> dict[str, Any]: + """Set a time segment for a MIN inverter.""" + max_min_params = 19 + max_min_segments = 9 + max_batt_mode = 2 + + if not 1 <= segment_id <= max_min_segments: + msg = f"segment_id must be between 1 and {max_min_segments}" + raise GrowattParameterError(msg) + + if not 0 <= batt_mode <= max_batt_mode: + msg = f"batt_mode must be between 0 and {max_batt_mode}" + raise GrowattParameterError(msg) + + all_params = {"tlx_sn": self.device_sn, "type": f"time_segment{segment_id}"} + all_params["param1"] = str(batt_mode) + all_params["param2"] = str(start_time.hour) + all_params["param3"] = str(start_time.minute) + all_params["param4"] = str(end_time.hour) + all_params["param5"] = str(end_time.minute) + all_params["param6"] = "1" if enabled else "0" + + for i in range(7, max_min_params + 1): + all_params[f"param{i}"] = "" + + response = await self.api.session.post(self.api.get_url("tlxSet"), data=all_params) + return self.api.process_response( + response.json(), f"writing time segment {segment_id}" + ) + + async def read_time_segments(self, settings_data: dict[str, Any] | None = None) -> list[dict[str, Any]]: + """Read Time-of-Use (TOU) settings from a Growatt MIN/TLX inverter.""" + if settings_data is None: + settings_data = await self.settings() + + mode_names = {0: "Load First", 1: "Battery First", 2: "Grid First"} + segments = [] + + for i in range(1, 10): + start_time_raw = settings_data.get(f"forcedTimeStart{i}", "0:0") + end_time_raw = settings_data.get(f"forcedTimeStop{i}", "0:0") + + if start_time_raw == "null" or not start_time_raw: + start_time_raw = "0:0" + if end_time_raw == "null" or not end_time_raw: + end_time_raw = "0:0" + + try: + start_parts = start_time_raw.split(":") + start_time = f"{int(start_parts[0]):02d}:{int(start_parts[1]):02d}" + except (ValueError, IndexError): + start_time = "00:00" + + try: + end_parts = end_time_raw.split(":") + end_time = f"{int(end_parts[0]):02d}:{int(end_parts[1]):02d}" + except (ValueError, IndexError): + end_time = "00:00" + + mode_raw = settings_data.get(f"time{i}Mode") + if mode_raw == "null" or mode_raw is None: + batt_mode = None + else: + try: + batt_mode = int(mode_raw) + except (ValueError, TypeError): + batt_mode = None + + enabled_raw = settings_data.get(f"forcedStopSwitch{i}", 0) + if enabled_raw == "null" or enabled_raw is None: + enabled = False + else: + try: + enabled = int(enabled_raw) == 1 + except (ValueError, TypeError): + enabled = False + + segments.append({ + "segment_id": i, + "batt_mode": batt_mode, + "mode_name": mode_names.get(batt_mode, "Unknown"), + "start_time": start_time, + "end_time": end_time, + "enabled": enabled, + }) + + return segments diff --git a/growattServer/open_api_v1/devices/async_sph.py b/growattServer/open_api_v1/devices/async_sph.py new file mode 100644 index 0000000..c428382 --- /dev/null +++ b/growattServer/open_api_v1/devices/async_sph.py @@ -0,0 +1,270 @@ +"""Async SPH/MIX device file using httpx.""" + +from __future__ import annotations + +from datetime import UTC, date, datetime, timedelta +from typing import Any + +from growattServer.exceptions import GrowattParameterError + +from .abstract_device import AbstractDevice, ParameterValue + + +class AsyncSph(AbstractDevice): + """Async SPH/MIX device type.""" + + DEVICE_TYPE_ID = 5 + + async def detail(self) -> dict[str, Any]: + """Get detailed data for an SPH inverter.""" + response = await self.api.session.get( + self.api.get_url("device/mix/mix_data_info"), + params={"device_sn": self.device_sn}, + ) + return self.api.process_response( + response.json(), "getting SPH inverter details" + ) + + async def energy(self) -> dict[str, Any]: + """Get energy data for an SPH inverter.""" + response = await self.api.session.post( + url=self.api.get_url("device/mix/mix_last_data"), + data={"mix_sn": self.device_sn}, + ) + return self.api.process_response( + response.json(), "getting SPH inverter energy data" + ) + + async def energy_history( + self, start_date: date | None = None, end_date: date | None = None, timezone: str | None = None, page: int | None = None, limit: int | None = None + ) -> dict[str, Any]: + """Get SPH inverter data history.""" + if start_date is None and end_date is None: + start_date = datetime.now(tz=UTC).astimezone().date() + end_date = datetime.now(tz=UTC).astimezone().date() + elif start_date is None: + start_date = end_date + elif end_date is None: + end_date = start_date + + if end_date - start_date > timedelta(days=7): + raise GrowattParameterError("date interval must not exceed 7 days") + + response = await self.api.session.post( + url=self.api.get_url("device/mix/mix_data"), + data={ + "mix_sn": self.device_sn, + "start_date": start_date.strftime("%Y-%m-%d"), + "end_date": end_date.strftime("%Y-%m-%d"), + "timezone_id": timezone, + "page": page, + "perpage": limit, + }, + ) + return self.api.process_response( + response.json(), "getting SPH inverter energy history" + ) + + async def read_parameter(self, parameter_id: str | None = None, start_address: int | None = None, end_address: int | None = None) -> dict[str, Any]: + """Read setting from SPH inverter.""" + if parameter_id is None and start_address is None: + raise GrowattParameterError( + "specify either parameter_id or start_address/end_address" + ) + if parameter_id is not None and start_address is not None: + raise GrowattParameterError( + "specify either parameter_id or start_address/end_address - not both." + ) + if parameter_id is not None: + start_address = 0 + end_address = 0 + else: + parameter_id = "set_any_reg" + + response = await self.api.session.post( + self.api.get_url("readMixParam"), + data={ + "device_sn": self.device_sn, + "paramId": parameter_id, + "startAddr": start_address, + "endAddr": end_address, + }, + ) + return self.api.process_response( + response.json(), f"reading parameter {parameter_id}" + ) + + async def write_parameter(self, parameter_id: str, parameter_values: ParameterValue | None = None) -> dict[str, Any]: + """Set parameters on an SPH inverter.""" + max_sph_params = 18 + parameters = dict.fromkeys(range(1, max_sph_params + 1), "") + + if parameter_values is not None: + if isinstance(parameter_values, (str, int, float, bool)): + parameters[1] = str(parameter_values) + elif isinstance(parameter_values, list): + for i, value in enumerate(parameter_values, 1): + if i <= max_sph_params: + parameters[i] = str(value) + elif isinstance(parameter_values, dict): + for pos_raw, value in parameter_values.items(): + pos = int(pos_raw) if not isinstance(pos_raw, int) else pos_raw + if 1 <= pos <= max_sph_params: + parameters[pos] = str(value) + + request_data = {"mix_sn": self.device_sn, "type": parameter_id} + for i in range(1, max_sph_params + 1): + request_data[f"param{i}"] = str(parameters[i]) + + response = await self.api.session.post(self.api.get_url("mixSet"), data=request_data) + return self.api.process_response( + response.json(), f"writing parameter {parameter_id}" + ) + + async def write_ac_charge_times( + self, charge_power: int, charge_stop_soc: int, mains_enabled: bool, periods: list[dict[str, Any]] + ) -> dict[str, Any]: + """Set AC charge time periods for an SPH inverter.""" + if not 0 <= charge_power <= 100: # noqa: PLR2004 + raise GrowattParameterError("charge_power must be between 0 and 100") + if not 0 <= charge_stop_soc <= 100: # noqa: PLR2004 + raise GrowattParameterError("charge_stop_soc must be between 0 and 100") + if len(periods) != 3: # noqa: PLR2004 + raise GrowattParameterError("periods must contain exactly 3 period definitions") + + request_data = { + "mix_sn": self.device_sn, + "type": "mix_ac_charge_time_period", + "param1": str(charge_power), + "param2": str(charge_stop_soc), + "param3": "1" if mains_enabled else "0", + } + + for i, period in enumerate(periods): + base = i * 5 + 4 + request_data[f"param{base}"] = str(period["start_time"].hour) + request_data[f"param{base + 1}"] = str(period["start_time"].minute) + request_data[f"param{base + 2}"] = str(period["end_time"].hour) + request_data[f"param{base + 3}"] = str(period["end_time"].minute) + request_data[f"param{base + 4}"] = "1" if period["enabled"] else "0" + + response = await self.api.session.post(self.api.get_url("mixSet"), data=request_data) + return self.api.process_response( + response.json(), "writing AC charge time periods" + ) + + async def write_ac_discharge_times(self, discharge_power: int, discharge_stop_soc: int, periods: list[dict[str, Any]]) -> dict[str, Any]: + """Set AC discharge time periods for an SPH inverter.""" + if not 0 <= discharge_power <= 100: # noqa: PLR2004 + raise GrowattParameterError("discharge_power must be between 0 and 100") + if not 0 <= discharge_stop_soc <= 100: # noqa: PLR2004 + raise GrowattParameterError("discharge_stop_soc must be between 0 and 100") + if len(periods) != 3: # noqa: PLR2004 + raise GrowattParameterError("periods must contain exactly 3 period definitions") + + request_data = { + "mix_sn": self.device_sn, + "type": "mix_ac_discharge_time_period", + "param1": str(discharge_power), + "param2": str(discharge_stop_soc), + } + + for i, period in enumerate(periods): + base = i * 5 + 3 + request_data[f"param{base}"] = str(period["start_time"].hour) + request_data[f"param{base + 1}"] = str(period["start_time"].minute) + request_data[f"param{base + 2}"] = str(period["end_time"].hour) + request_data[f"param{base + 3}"] = str(period["end_time"].minute) + request_data[f"param{base + 4}"] = "1" if period["enabled"] else "0" + + response = await self.api.session.post(self.api.get_url("mixSet"), data=request_data) + return self.api.process_response( + response.json(), "writing AC discharge time periods" + ) + + def _parse_time_periods(self, settings_data: dict[str, Any], time_type: str) -> list[dict[str, Any]]: + """Parse time periods from settings data.""" + periods = [] + + for i in range(1, 4): + start_time_raw = settings_data.get(f"forced{time_type}TimeStart{i}", "0:0") + end_time_raw = settings_data.get(f"forced{time_type}TimeStop{i}", "0:0") + enabled_raw = settings_data.get(f"forced{time_type}StopSwitch{i}", 0) + + if start_time_raw == "null" or not start_time_raw: + start_time_raw = "0:0" + if end_time_raw == "null" or not end_time_raw: + end_time_raw = "0:0" + + try: + start_parts = start_time_raw.split(":") + start_time = f"{int(start_parts[0]):02d}:{int(start_parts[1]):02d}" + except (ValueError, IndexError): + start_time = "00:00" + + try: + end_parts = end_time_raw.split(":") + end_time = f"{int(end_parts[0]):02d}:{int(end_parts[1]):02d}" + except (ValueError, IndexError): + end_time = "00:00" + + if enabled_raw == "null" or enabled_raw is None: + enabled = False + else: + try: + enabled = int(enabled_raw) == 1 + except (ValueError, TypeError): + enabled = False + + periods.append({ + "period_id": i, + "start_time": start_time, + "end_time": end_time, + "enabled": enabled, + }) + + return periods + + async def read_ac_charge_times(self, settings_data: dict[str, Any] | None = None) -> dict[str, Any]: + """Read AC charge time periods and settings from an SPH inverter.""" + if settings_data is None: + settings_data = await self.detail() + + charge_power = settings_data.get("chargePowerCommand", 0) + charge_stop_soc = settings_data.get("wchargeSOCLowLimit", 100) + mains_enabled_raw = settings_data.get("acChargeEnable", 0) + + if charge_power == "null" or charge_power is None or charge_power == "": + charge_power = 0 + if charge_stop_soc == "null" or charge_stop_soc is None or charge_stop_soc == "": + charge_stop_soc = 100 + if mains_enabled_raw == "null" or mains_enabled_raw is None or mains_enabled_raw == "": + mains_enabled = False + else: + mains_enabled = int(mains_enabled_raw) == 1 + + return { + "charge_power": int(charge_power), + "charge_stop_soc": int(charge_stop_soc), + "mains_enabled": mains_enabled, + "periods": self._parse_time_periods(settings_data, "Charge"), + } + + async def read_ac_discharge_times(self, settings_data: dict[str, Any] | None = None) -> dict[str, Any]: + """Read AC discharge time periods and settings from an SPH inverter.""" + if settings_data is None: + settings_data = await self.detail() + + discharge_power = settings_data.get("disChargePowerCommand", 0) + discharge_stop_soc = settings_data.get("wdisChargeSOCLowLimit", 10) + + if discharge_power == "null" or discharge_power is None or discharge_power == "": + discharge_power = 0 + if discharge_stop_soc == "null" or discharge_stop_soc is None or discharge_stop_soc == "": + discharge_stop_soc = 10 + + return { + "discharge_power": int(discharge_power), + "discharge_stop_soc": int(discharge_stop_soc), + "periods": self._parse_time_periods(settings_data, "Discharge"), + } diff --git a/setup.py b/setup.py index 387b3ac..6720dd6 100755 --- a/setup.py +++ b/setup.py @@ -27,4 +27,7 @@ install_requires=[ "requests", ], + extras_require={ + "async": ["httpx"], + }, )