Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .cspell/custom-dictionary-workspace.txt
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ derating
devcontainer
devcontainers
dexport
deye
DEYE
Deye
digestmod
dimport
dischargeenergytotal
Expand All @@ -96,6 +99,7 @@ dstart
dwindow
Eddi
elif
EMEA
emszzzz
enctype
endfor
Expand Down
36 changes: 35 additions & 1 deletion apps/predbat/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from web import WebInterface
from ha import HAInterface, HAHistory
from db_manager import DatabaseManager
from deye import DeyeAPI
from fox import FoxAPI
from kraken import KrakenAPI
from web_mcp import PredbatMCPServer
Expand Down Expand Up @@ -238,6 +239,35 @@
},
"phase": 1,
},
"deye": {
"class": DeyeAPI,
"name": "DEYE Cloud",
"event_filter": "predbat_deye_",
"args": {
"key": {
"required": True,
"config": "deye_key",
},
"device_sn": {
"required": False,
"config": "deye_device_sn",
},
"automatic": {
"required": False,
"default": False,
"config": "deye_automatic",
},
"token_expires_at": {
"required": False,
"config": "deye_token_expires_at",
},
"token_hash": {
"required": False,
"config": "deye_token_hash",
},
},
"phase": 1,
},
"kraken": {
"class": KrakenAPI,
"name": "Kraken Energy (EDF/E.ON)",
Expand Down Expand Up @@ -444,7 +474,11 @@ def initialize(self, only=None, phase=0):
default = arg_info.get("default", None)
indirect = arg_info.get("indirect", False)
config_late_resolve = arg_info.get("config_late_resolve", False)
if config_late_resolve:
if "value" in arg_info:
# Pre-computed provider object injected directly — no config lookup needed.
arg_dict[arg] = arg_info["value"]
continue
elif config_late_resolve:
# Defer resolution of config value until later
arg_dict[arg] = arg_info["config"]
continue
Expand Down
5 changes: 5 additions & 0 deletions apps/predbat/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2123,6 +2123,11 @@
"solis_base_url": {"type": "string", "empty": False},
"solis_control_enable": {"type": "boolean"},
"solis_cloud_pv_load_ignore": {"type": "boolean"},
"deye_key": {"type": "string", "empty": False},
"deye_device_sn": {"type": "string|string_list", "empty": False},
"deye_automatic": {"type": "boolean"},
"deye_token_expires_at": {"type": "string", "empty": False},
"deye_token_hash": {"type": "string", "empty": False},
"fox_key": {"type": "string", "empty": False},
"fox_automatic": {"type": "boolean"},
"fox_automatic_ignore_pv": {"type": "boolean"},
Expand Down
178 changes: 178 additions & 0 deletions apps/predbat/deye.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
# -----------------------------------------------------------------------------
# Predbat Home Battery System
# Copyright Trefor Southwell 2026 - All Rights Reserved
# This application maybe used for personal use only and not for commercial use
# -----------------------------------------------------------------------------

"""DEYE Cloud API integration.

Cloud API client for DEYE inverters via the DeyeCloud OpenAPI (EMEA data centre).
Supports real-time monitoring and Time-of-Use charge/discharge schedule control.
The bearer token is injected by the PredBat SaaS platform and refreshed via
the oauth-refresh edge function; this module never calls the token endpoint itself.
"""

import aiohttp
import asyncio
from datetime import datetime
from component_base import ComponentBase
from oauth_mixin import OAuthMixin

DEYE_BASE_URL = "https://eu1-developer.deyecloud.com"
DEYE_TIMEOUT = 30
DEYE_RETRIES = 3


class DeyeAPI(ComponentBase, OAuthMixin):
"""DEYE Cloud API component for PredBat SaaS."""

def initialise(self):
"""Initialise the DEYE API component."""
self.log("Info: DeyeAPI initialising")
self.device_sn = self.get_arg("device_sn", "")
if isinstance(self.device_sn, list):
self.device_sn = self.device_sn[0] if self.device_sn else ""

key = self.get_arg("key", "")
token_expires_at = self.get_arg("token_expires_at", None)
token_hash = self.get_arg("token_hash", "")
self._init_oauth(
auth_method="oauth",
key=key,
token_expires_at=token_expires_at,
provider_name="deye",
)
self.token_hash = token_hash
self.cached_values = {}

def _auth_headers(self):
"""Build Authorization header for a DEYE API request."""
return {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.access_token}",
}

async def _post(self, path: str, body: dict) -> dict:
"""POST to DEYE API, retrying on transient errors. Returns parsed JSON or raises."""
url = f"{DEYE_BASE_URL}{path}"
timeout = aiohttp.ClientTimeout(total=DEYE_TIMEOUT)
last_err = None

for attempt in range(DEYE_RETRIES):
try:
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post(url, headers=self._auth_headers(), json=body) as resp:
if resp.status in (401, 403):
self.log(f"Warn: DEYE API 401/403 on {path}, attempt {attempt + 1}")
if await self.handle_oauth_401():
continue # retry with refreshed token
raise RuntimeError(f"DEYE OAuth auth failed on {path}")
resp.raise_for_status()
return await resp.json()
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
last_err = e
self.log(f"Warn: DEYE API network error on {path} attempt {attempt + 1}: {e}")
await asyncio.sleep(2**attempt)

raise RuntimeError(f"DEYE API failed after {DEYE_RETRIES} retries on {path}: {last_err}")

async def fetch_device_data(self) -> dict:
"""Fetch latest real-time data for the configured device."""
data = await self._post("/v1.0/device/latest", {"deviceSnList": [self.device_sn]})
if not data.get("success"):
raise RuntimeError(f"DEYE device/latest failed: {data.get('msg', 'unknown')}")

raw_list = data.get("data") or []
if not raw_list:
return {}

device = raw_list[0]
result = {
"soc": float(device.get("batteryPower", {}).get("soc", 0)),
"battery_power": float(device.get("batteryPower", {}).get("power", 0)),
"grid_power": float(device.get("gridPower", {}).get("power", 0)),
"pv_power": float(device.get("pvPower", {}).get("power", 0)),
"load_power": float(device.get("loadOrEpsPower", {}).get("power", 0)),
}
self.cached_values[self.device_sn] = result
return result

async def set_tou_schedule(self, slots: list) -> bool:
"""Write a Time-of-Use schedule to the inverter."""
if not slots:
return False

payload = {
"deviceSn": self.device_sn,
"timeUseSettingItems": slots,
"timeoutSeconds": 30,
}
resp = await self._post("/v1.0/order/sys/tou/update", payload)
if not resp.get("success"):
self.log(f"Warn: DEYE TOU update failed: {resp.get('msg', 'unknown')}")
return False

order_id = resp.get("data", {}).get("orderId")
if order_id:
self.log(f"Info: DEYE TOU update submitted, orderId={order_id}")
return True

async def run(self, seconds: int = 0, first: bool = False) -> None:
"""Main component loop — fetch data and apply charge schedule."""
if not await self.check_and_refresh_oauth_token():
self.log("Warn: DEYE OAuth token invalid, skipping run")
return

if not self.device_sn:
self.log("Warn: DEYE device_sn not configured, skipping run")
return

try:
device_data = await self.fetch_device_data()
self.log(f"Info: DEYE SoC={device_data.get('soc', '?')}% " f"battery={device_data.get('battery_power', '?')}W " f"grid={device_data.get('grid_power', '?')}W " f"pv={device_data.get('pv_power', '?')}W")
except Exception as e:
self.log(f"Warn: DEYE data fetch failed: {e}")

async def charge(self, charge_start: datetime, charge_end: datetime, power_w: int, soc_target: int) -> None:
"""Schedule a grid charge window on the inverter."""
slots = [
{
"time": charge_start.strftime("%H:%M"),
"soc": soc_target,
"power": power_w,
"enableGridCharge": True,
"enableGeneration": True,
},
{
"time": charge_end.strftime("%H:%M"),
"soc": 10,
"power": power_w,
"enableGridCharge": False,
"enableGeneration": True,
},
]
await self.set_tou_schedule(slots)

async def discharge(self, discharge_start: datetime, discharge_end: datetime, power_w: int) -> None:
"""Schedule a forced discharge window on the inverter."""
slots = [
{
"time": discharge_start.strftime("%H:%M"),
"soc": 10,
"power": power_w,
"enableGridCharge": False,
"enableGeneration": True,
},
{
"time": discharge_end.strftime("%H:%M"),
"soc": 10,
"power": power_w,
"enableGridCharge": False,
"enableGeneration": True,
},
]
await self.set_tou_schedule(slots)

async def final(self) -> None:
"""Cleanup on shutdown."""
self.log("Info: DeyeAPI shutdown")
Loading