diff --git a/.cspell/custom-dictionary-workspace.txt b/.cspell/custom-dictionary-workspace.txt index 8fbb4c99d..59302470f 100644 --- a/.cspell/custom-dictionary-workspace.txt +++ b/.cspell/custom-dictionary-workspace.txt @@ -76,6 +76,7 @@ dayname daynumber daysymbol dedup +delayedstart dend denorm derating diff --git a/apps/predbat/config.py b/apps/predbat/config.py index fe483bbf6..3efeb9b0e 100644 --- a/apps/predbat/config.py +++ b/apps/predbat/config.py @@ -1133,6 +1133,16 @@ "restore": False, "api": True, }, + { + "name": "load_forecast_delta_api", + "friendly_name": "Load forecast delta API controls", + "type": "select", + "options": ["off"], + "icon": "mdi:dishwasher", + "default": "off", + "restore": False, + "api": True, + }, { "name": "manual_freeze_charge", "friendly_name": "Manual force charge freeze", @@ -2043,6 +2053,7 @@ "type": "sensor_list", "sensor_type": "dict|list", }, + "house_load_additional_forecast": {"type": "dict_list"}, "ge_cloud_data": {"type": "boolean"}, "ge_cloud_serial": {"type": "string", "empty": False}, "ge_cloud_key": {"type": "string", "empty": False}, diff --git a/apps/predbat/fetch.py b/apps/predbat/fetch.py index b4150e0b3..66f6e5fcb 100644 --- a/apps/predbat/fetch.py +++ b/apps/predbat/fetch.py @@ -26,6 +26,7 @@ from axle import fetch_axle_sessions, load_axle_slot, fetch_axle_active import copy +import re class Fetch: @@ -61,6 +62,819 @@ def get_cloud_factor(self, minutes_now, pv_data, pv_data10): else: return None + def parse_additional_load_weighting(self, weighting, periods): + """ + Parse additional load forecast weighting into per-slot multipliers. + """ + if periods <= 0: + return [] + if weighting is None: + return [1.0 for _ in range(periods)] + if isinstance(weighting, (int, float)): + return [float(weighting) for _ in range(periods)] + + weighting = str(weighting).strip() + if not weighting: + return [1.0 for _ in range(periods)] + + weights = [] + weight_separator = "," + if "|" in weighting: + weight_separator = "|" + for weight in weighting.split(weight_separator): + weight = weight.strip() + if weight == "*": + weights.append(1.0) + else: + try: + weights.append(float(weight)) + except (ValueError, TypeError): + self.log("Warn: Bad weighting {} provided in house_load_additional_forecast, using 1.0".format(weight)) + weights.append(1.0) + + if not weights: + weights = [1.0] + while len(weights) < periods: + weights.append(weights[-1]) + return weights[:periods] + + def get_additional_load_time_minutes(self, load_item, key, default=None): + """ + Resolve a time field on an additional load forecast item to minutes from midnight. + """ + value = load_item.get(key, default) + if value is None: + return None + value = self.resolve_arg(key, value, default) + if value is None: + return None + value = str(value) + if value.count(":") < 2: + value += ":00" + try: + stamp = time_string_to_stamp(value) + except (ValueError, TypeError): + self.log("Warn: Bad {} {} provided in house_load_additional_forecast".format(key, value)) + self.record_status("Warn: Bad {} {} provided in house_load_additional_forecast".format(key, value), had_errors=True) + return None + return minutes_to_time(stamp, time_string_to_stamp("00:00:00")) + + def get_additional_load_float(self, load_item, key, default=0.0): + """ + Resolve a numeric field on an additional load forecast item. + """ + value = load_item.get(key, default) + if isinstance(value, str): + try: + return float(value) + except (ValueError, TypeError): + pass + value = self.resolve_arg(key, value, default) + try: + return float(value) + except (ValueError, TypeError): + self.log("Warn: Bad {} {} provided in house_load_additional_forecast".format(key, value)) + self.record_status("Warn: Bad {} {} provided in house_load_additional_forecast".format(key, value), had_errors=True) + return float(default) + + def get_additional_load_bool(self, load_item, key, default=True): + """ + Resolve a boolean field on an additional load forecast item. + """ + value = load_item.get(key, default) + value = self.resolve_arg(key, value, default) + if isinstance(value, str): + return value.lower() in ["on", "true", "yes", "enable", "enabled", "1"] + return bool(value) + + def additional_load_entity_name(self, name): + """ + Make the binary sensor entity name for a named additional load forecast. + """ + safe_name = self.additional_load_safe_name(name) + return "binary_sensor.{}_load_forecast_delta_{}".format(self.prefix, safe_name) + + def additional_load_safe_name(self, name): + """ + Return the Home Assistant-safe suffix for a named additional load forecast. + """ + safe_name = re.sub(r"[^a-z0-9_]+", "_", str(name).lower()).strip("_") + if not safe_name: + safe_name = "unknown" + return safe_name + + def additional_load_delete_entity_name(self, name): + """ + Make the delete button entity name for a named additional load forecast. + """ + return self.additional_load_entity_name(name).replace("binary_sensor.", "button.", 1) + "_delete" + + def additional_load_name_from_entity(self, entity_id): + """ + Return additional load forecast name from a binary sensor or button entity id. + """ + marker = "_load_forecast_delta_" + if entity_id and marker in entity_id: + safe_name = entity_id.split(marker, 1)[1].replace("_delete", "") + for name in list(getattr(self, "house_load_additional_forecasts", {}).keys()) + list(getattr(self, "house_load_additional_forecast_overrides", {}).keys()): + if self.additional_load_safe_name(name) == safe_name: + return str(name) + return self.resolve_additional_load_name(safe_name) + return None + + def additional_load_command_name(self, value): + """ + Return the forecast name from a load_forecast_delta_api command. + """ + return value.split("?", 1)[0].split("=", 1)[0].replace("[", "").replace("]", "") + + def additional_load_command_args(self, value): + """ + Return a load_forecast_delta_api command name and query arguments. + """ + value = value.replace("[", "").replace("]", "") + if "?" not in value: + return self.additional_load_command_name(value), {} + name, command_args = value.split("?", 1) + args = {} + for arg in command_args.split("&"): + arg_split = arg.split("=", 1) + if len(arg_split) > 1: + args[arg_split[0]] = arg_split[1] + else: + args[arg_split[0]] = True + return name, args + + def additional_load_build_api_command(self, name, args): + """ + Build a load_forecast_delta_api command from a name and arguments. + """ + return "{}?{}".format(name, "&".join("{}={}".format(key, value) if value is not True else key for key, value in args.items())) + + def additional_load_minutes_to_stamp(self, minutes): + """ + Convert forecast minutes from midnight into a durable timestamp string. + """ + return str(int((self.midnight_utc + timedelta(minutes=int(minutes))).timestamp())) + + def additional_load_stamp_to_minutes(self, stamp): + """ + Convert a durable timestamp string into forecast minutes from the current midnight. + """ + try: + stamp_datetime = datetime.fromtimestamp(int(stamp), tz=self.midnight_utc.tzinfo) if str(stamp).isdigit() else datetime.fromisoformat(str(stamp)) + except (ValueError, TypeError): + return None + return int((stamp_datetime - self.midnight_utc).total_seconds() / 60) + + def additional_load_minutes_to_iso(self, minutes): + """ + Convert forecast minutes from midnight into an ISO timestamp. + """ + return (self.midnight_utc + timedelta(minutes=minutes)).isoformat() if minutes is not None else None + + def additional_load_forecast_record( + self, + entity_id, + enabled, + mode, + energy_total, + slot_energy, + duration, + weighting, + load_mode, + plan_interval, + requested_start_minutes, + requested_end_minutes, + periods, + weights, + weight_total, + source, + auto_expire, + expires_minutes, + target_times=None, + total_energy=0.0, + suggested_start_minutes=None, + suggested_end_minutes=None, + selection_reason=None, + candidate_count=0, + selected_metric=None, + baseline_metric=None, + selection_locked=False, + state=None, + ): + """ + Build the published and internal metadata for one additional load forecast. + """ + if target_times is None: + target_times = [] + if state is None: + state = "on" if target_times else "off" + return { + "entity_id": entity_id, + "state": state, + "target_times": target_times, + "enabled": enabled, + "mode": mode, + "energy": energy_total, + "slot_energy": slot_energy, + "duration": duration, + "weighting": weighting, + "load_mode": load_mode, + "plan_interval_minutes": plan_interval, + "slots": len(target_times), + "total_energy": dp4(total_energy), + "requested_start": self.additional_load_minutes_to_iso(requested_start_minutes), + "requested_end": self.additional_load_minutes_to_iso(requested_end_minutes), + "suggested_start": self.additional_load_minutes_to_iso(suggested_start_minutes), + "suggested_end": self.additional_load_minutes_to_iso(suggested_end_minutes), + "selection_reason": selection_reason, + "candidate_count": candidate_count, + "selected_metric": selected_metric, + "baseline_metric": baseline_metric, + "selection_locked": selection_locked, + "source": source, + "auto_expire": auto_expire, + "expires_at": self.additional_load_minutes_to_iso(expires_minutes), + "_requested_start_minutes": requested_start_minutes, + "_requested_end_minutes": requested_end_minutes, + "_periods": periods, + "_weights": weights, + "_weight_total": weight_total, + } + + def additional_load_api_metadata(self, name): + """ + Return persisted hidden metadata for a stored load_forecast_delta_api command. + """ + item = self.config_index.get("load_forecast_delta_api") if "load_forecast_delta_api" in self.config_index else None + if not item: + return {} + values = (item.get("value", "") or "").replace("+", "") + for value in values.split(",") if values else []: + command_name, args = self.additional_load_command_args(value) + if command_name == name or self.additional_load_safe_name(command_name) == self.additional_load_safe_name(name): + return {key: value for key, value in args.items() if str(key).startswith("_")} + return {} + + def preserve_additional_load_api_metadata(self, value): + """ + Preserve hidden one-shot metadata when an active API command is sent again. + """ + name, args = self.additional_load_command_args(value) + metadata = self.additional_load_api_metadata(name) + if not metadata: + return value + for key, metadata_value in metadata.items(): + args.setdefault(key, metadata_value) + return self.additional_load_build_api_command(name, args) + + def update_additional_load_api_command_metadata(self, name, metadata): + """ + Persist one-shot runtime metadata into the stored API selector command. + """ + item = self.config_index.get("load_forecast_delta_api") if "load_forecast_delta_api" in self.config_index else None + if not item: + return + values = (item.get("value", "") or "").replace("+", "") + if not values: + return + changed = False + new_values = [] + for value in values.split(","): + if value == "off": + continue + command_name, args = self.additional_load_command_args(value) + if command_name == name or self.additional_load_safe_name(command_name) == self.additional_load_safe_name(name): + for key, metadata_value in metadata.items(): + if metadata_value is not None and args.get(key) != str(metadata_value): + args[key] = str(metadata_value) + changed = True + value = self.additional_load_build_api_command(command_name, args) + new_values.append(value) + if changed: + self.api_select_update("load_forecast_delta_api", new_value="+" + ",".join(new_values) if new_values else "off") + + def resolve_additional_load_name(self, name): + """ + Resolve a forecast name or safe entity suffix to the active configured name. + """ + name = str(name) + safe_name = self.additional_load_safe_name(name) + candidates = list(getattr(self, "house_load_additional_forecasts", {}).keys()) + list(getattr(self, "house_load_additional_forecast_overrides", {}).keys()) + item = self.config_index.get("load_forecast_delta_api") if "load_forecast_delta_api" in self.config_index else None + if item: + values = (item.get("value", "") or "").replace("+", "") + candidates += [self.additional_load_command_name(value) for value in values.split(",") if value and value != "off"] + for candidate in candidates: + if str(candidate) == name or self.additional_load_safe_name(candidate) == safe_name: + return str(candidate) + return name + + def delete_additional_load_forecast(self, name): + """ + Delete a named one-shot additional load forecast. + """ + name = self.resolve_additional_load_name(name) + if not self.has_additional_load_api_command(name) and name not in self.house_load_additional_forecast_overrides: + self.log("Warn: Ignoring delete for inactive additional load forecast {}".format(name)) + self.unpublish_additional_load_name(name) + return False + self.house_load_additional_forecast_overrides.pop(name, None) + self.remove_additional_load_api_command(name) + self.refresh_additional_load_forecast_api() + return True + + def unpublish_additional_load_name(self, name): + """ + Remove stale additional load forecast entities for a named forecast without replanning. + """ + for entity_id in [self.additional_load_entity_name(name), self.additional_load_delete_entity_name(name)]: + self.unpublish_additional_load_entity(entity_id) + if hasattr(self, "house_load_additional_forecast_entities"): + self.house_load_additional_forecast_entities.discard(entity_id) + + def has_additional_load_api_command(self, name): + """ + Return True if a named forecast command is active in the load_forecast_delta_api selector. + """ + item = self.config_index.get("load_forecast_delta_api") if "load_forecast_delta_api" in self.config_index else None + if not item: + return False + values = item.get("value", "") or "" + values = values.replace("+", "") + values_list = values.split(",") if values else [] + for value in values_list: + if value == "off": + continue + command_name = self.additional_load_command_name(value) + if command_name == name or self.additional_load_safe_name(command_name) == self.additional_load_safe_name(name): + return True + return False + + def remove_additional_load_api_command(self, name): + """ + Remove a named forecast command from the load_forecast_delta_api selector. + """ + item = self.config_index.get("load_forecast_delta_api") if "load_forecast_delta_api" in self.config_index else None + if not item: + return + values = item.get("value", "") or "" + values = values.replace("+", "") + values_list = values.split(",") if values else [] + new_values_list = [] + for value in values_list: + if value == "off": + continue + command_name = self.additional_load_command_name(value) + if command_name != name and self.additional_load_safe_name(command_name) != self.additional_load_safe_name(name): + new_values_list.append(value) + new_value = "+" + ",".join(new_values_list) if new_values_list else "off" + self.api_select_update("load_forecast_delta_api", new_value=new_value) + + def additional_load_slot_energies(self, energy_total, slot_energy, weights, weight_total, period, slot_minutes, plan_interval): + """ + Return the published slot energy and adjustment rate for one forecast slot. + """ + if energy_total is not None: + target_energy = dp4(energy_total * weights[period] / weight_total) if weight_total else 0.0 + else: + target_energy = dp4(slot_energy * weights[period]) + adjustment_energy = dp4(target_energy * plan_interval / float(slot_minutes)) if slot_minutes else 0.0 + return target_energy, adjustment_energy + + def get_additional_load_window(self, load_item, mode, duration, plan_interval, minutes_now_slot): + """ + Return start/end minutes for fixed or flexible additional load scheduling. + """ + start_minutes = self.get_additional_load_time_minutes(load_item, "start_time") if "start_time" in load_item else load_item.get("_requested_start_minutes", None) + end_minutes = self.get_additional_load_time_minutes(load_item, "end_time") if "end_time" in load_item else None + duration_minutes = int(duration * 60) + + if mode == "flexible": + if start_minutes is None and end_minutes is None: + return minutes_now_slot, minutes_now_slot + self.forecast_minutes + if start_minutes is None: + start_minutes = minutes_now_slot + if end_minutes is None: + end_minutes = start_minutes + self.forecast_minutes + + windows = [] + for day_offset in [0, 24 * 60]: + window_start = start_minutes + day_offset + window_end = end_minutes + day_offset + if window_end <= window_start: + window_end += 24 * 60 + windows.append((window_start, window_end)) + if end_minutes <= start_minutes: + windows.append((window_start - 24 * 60, window_end - 24 * 60)) + + for window_start, window_end in sorted(windows): + usable_start = max(window_start, minutes_now_slot) + if usable_start + duration_minutes > window_end: + window_end += 24 * 60 + if usable_start + duration_minutes <= window_end and usable_start < minutes_now_slot + self.forecast_minutes: + return usable_start, window_end + return None, None + + if start_minutes is None: + return None, end_minutes + if end_minutes is None: + end_minutes = start_minutes + int(duration * 60) + elif end_minutes <= start_minutes: + end_minutes += 24 * 60 + + if end_minutes <= minutes_now_slot: + start_minutes += 24 * 60 + end_minutes += 24 * 60 + return start_minutes, end_minutes + + def parse_additional_load_api_command(self, api_command): + """ + Parse one load_forecast_delta_api command into a forecast override. + """ + if "?" not in api_command: + self.log("Warn: Bad load_forecast_delta_api command {}, expected name?start_time=...&duration=...".format(api_command)) + return None + + name, command_args = self.additional_load_command_args(api_command) + if not name: + self.log("Warn: Bad load_forecast_delta_api command {}, missing name".format(api_command)) + return None + + override = {"name": name, "_source": "api", "_auto_expire": True} + override.update(command_args) + requested_start_minutes = self.additional_load_stamp_to_minutes(override.get("_requested_start", None)) if "_requested_start" in override else None + selected_start_minutes = self.additional_load_stamp_to_minutes(override.get("_selected_start", None)) if "_selected_start" in override else None + expires_minutes = self.additional_load_stamp_to_minutes(override.get("_expires_at", None)) if "_expires_at" in override else None + if requested_start_minutes is not None: + override["_requested_start_minutes"] = requested_start_minutes + if selected_start_minutes is not None: + override["_selected_start_minutes"] = selected_start_minutes + if expires_minutes is not None: + override["_expires_minutes"] = expires_minutes + for key in ["_candidate_count"]: + if key in override: + try: + override[key] = int(override[key]) + except (ValueError, TypeError): + override.pop(key, None) + for key in ["_selected_metric", "_baseline_metric"]: + if key in override: + try: + override[key] = float(override[key]) + except (ValueError, TypeError): + override.pop(key, None) + if "start_time" not in override: + existing_override = self.house_load_additional_forecast_overrides.get(str(name), {}) + requested_start_minutes = override.get("_requested_start_minutes", existing_override.get("_requested_start_minutes", None)) + if requested_start_minutes is None: + plan_interval = self.get_arg("plan_interval_minutes", 30) + requested_start_minutes = int(self.minutes_now / plan_interval) * plan_interval + override["_requested_start_minutes"] = requested_start_minutes + self.house_load_additional_forecast_overrides.setdefault(str(name), {"name": str(name)})["_requested_start_minutes"] = requested_start_minutes + self.update_additional_load_api_command_metadata(str(name), {"_requested_start": self.additional_load_minutes_to_stamp(requested_start_minutes)}) + return override + + def expire_additional_load_api_commands(self): + """ + Remove expired one-shot additional load forecast API commands. + """ + expired_names = [] + minutes_now_slot = int(self.minutes_now / self.get_arg("plan_interval_minutes", 30)) * self.get_arg("plan_interval_minutes", 30) + item = self.config_index.get("load_forecast_delta_api") if "load_forecast_delta_api" in self.config_index else None + values = (item.get("value", "") or "").replace("+", "") if item else "" + for value in values.split(",") if values else []: + name, args = self.additional_load_command_args(value) + expires_minutes = self.additional_load_stamp_to_minutes(args.get("_expires_at", None)) if "_expires_at" in args else None + if expires_minutes is not None and expires_minutes <= minutes_now_slot: + expired_names.append(name) + for name, override in list(self.house_load_additional_forecast_overrides.items()): + expires_minutes = override.get("_expires_minutes", None) + if expires_minutes is not None and expires_minutes <= minutes_now_slot: + expired_names.append(name) + for name in set(expired_names): + self.log("Expired additional load forecast {}".format(name)) + self.house_load_additional_forecast_overrides.pop(name, None) + self.remove_additional_load_api_command(name) + + def get_additional_load_api_overrides(self): + """ + Return load_forecast_delta_api overrides by name. + """ + api_forecast_overrides = {} + api_overrides = self.api_select_update("load_forecast_delta_api") if "load_forecast_delta_api" in self.config_index else [] + for api_command in api_overrides: + override = self.parse_additional_load_api_command(api_command) + if override: + api_forecast_overrides[str(override["name"])] = override + return api_forecast_overrides + + def refresh_additional_load_forecast_api(self): + """ + Rebuild additional load forecast data after the HA select API changes. + """ + self.load_forecast_delta_api = self.api_select_update("load_forecast_delta_api") if "load_forecast_delta_api" in self.config_index else [] + self.house_load_additional_forecast_adjust, self.house_load_additional_forecasts = self.fetch_additional_load_forecast() + self.publish_additional_load_forecasts() + + def get_additional_load_forecast_config(self): + """ + Return additional load forecast config with runtime API overrides applied by name. + """ + config = self.get_arg("house_load_additional_forecast", [], indirect=False) + if not config: + config = [] + if isinstance(config, dict): + config = [config] + if isinstance(config, str): + self.log("Warn: house_load_additional_forecast should be a list of dictionaries") + return [] + + forecast_items = [] + for load_item in config: + if not isinstance(load_item, dict): + self.log("Warn: Bad house_load_additional_forecast item {}, expected dictionary".format(load_item)) + continue + load_item = load_item.copy() + load_item["_source"] = "yaml" + load_item["_auto_expire"] = False + forecast_items.append(load_item) + + self.expire_additional_load_api_commands() + runtime_overrides = self.get_additional_load_api_overrides() + for name, override in self.house_load_additional_forecast_overrides.items(): + runtime_overrides.setdefault(name, {}).update(override) + for name, override in runtime_overrides.items(): + found = False + for index, load_item in enumerate(forecast_items): + if str(load_item.get("name", "")) == name: + forecast_items[index].update(override) + found = True + break + if not found: + forecast_items.append(override.copy()) + return forecast_items + + def fetch_additional_load_forecast(self, selected_flexible=None): + """ + Build per-minute additional load adjustments from named forecast config. + """ + if selected_flexible is None: + selected_flexible = {} + load_adjust = {} + forecasts = {} + plan_interval = self.get_arg("plan_interval_minutes", 30) + minutes_now_slot = int(self.minutes_now / plan_interval) * plan_interval + + for load_item in self.get_additional_load_forecast_config(): + name = load_item.get("name") + if not name: + self.log("Warn: house_load_additional_forecast item missing name") + continue + name = str(name) + entity_id = self.additional_load_entity_name(name) + mode = str(self.resolve_arg("mode", load_item.get("mode", "fixed"), "fixed")).lower() + if mode not in ["fixed", "flexible"]: + self.log("Warn: Bad mode {} provided in house_load_additional_forecast {}, using fixed".format(mode, name)) + mode = "fixed" + if mode == "flexible" and name in selected_flexible: + load_item.update(selected_flexible[name]) + enabled = self.get_additional_load_bool(load_item, "enabled", True) + duration_configured = "duration" in load_item + duration = self.get_additional_load_float(load_item, "duration", 0.0) + slot_energy = self.get_additional_load_float(load_item, "slot_energy", 0.0) + energy_total = self.get_additional_load_float(load_item, "energy", 0.0) if "energy" in load_item else None + weighting = self.resolve_arg("weighting", load_item.get("weighting", None), None) + source = load_item.get("_source", "yaml") + auto_expire = load_item.get("_auto_expire", False) + expires_minutes = load_item.get("_expires_minutes", None) + target_times = [] + load_mode = "total_energy" if energy_total is not None else "slot_energy" + total_energy = 0.0 + start_minutes, end_minutes = self.get_additional_load_window(load_item, mode, duration, plan_interval, minutes_now_slot) + requested_start_minutes = load_item.get("_requested_start_minutes", start_minutes) if "start_time" not in load_item else start_minutes + requested_end_minutes = end_minutes + if mode == "fixed" and duration <= 0 and not duration_configured and start_minutes is not None and end_minutes is not None: + duration = (end_minutes - start_minutes) / 60.0 + periods = int((int(duration * 60) + plan_interval - 1) / plan_interval) if duration > 0 else 0 + weights = self.parse_additional_load_weighting(weighting, periods) + weight_total = sum(weights) + + selected_start_minutes = load_item.get("_selected_start_minutes", None) + selection_locked = False + if selected_start_minutes is not None: + start_minutes = int(selected_start_minutes) + if requested_start_minutes is not None and start_minutes < requested_start_minutes: + start_minutes = requested_start_minutes + end_minutes = start_minutes + int(duration * 60) + selection_locked = mode == "flexible" and minutes_now_slot >= start_minutes and minutes_now_slot < end_minutes + if auto_expire: + expires_minutes = end_minutes + if selection_locked and source != "yaml": + self.house_load_additional_forecast_overrides.setdefault(name, {"name": name})["_selection_locked"] = True + if auto_expire and expires_minutes is None and end_minutes is not None: + expires_minutes = end_minutes + if auto_expire and source != "yaml" and expires_minutes is not None: + self.house_load_additional_forecast_overrides.setdefault(name, {"name": name})["_expires_minutes"] = expires_minutes + + if source == "yaml" and energy_total is None and slot_energy == 0 and duration == 0 and not duration_configured: + continue + + if not enabled or start_minutes is None or (energy_total is None and slot_energy == 0) or (energy_total == 0) or duration == 0 or end_minutes is None: + forecasts[name] = self.additional_load_forecast_record( + entity_id, + enabled, + mode, + energy_total, + slot_energy, + duration, + weighting, + load_mode, + plan_interval, + requested_start_minutes, + requested_end_minutes, + periods, + weights, + weight_total, + source, + auto_expire, + expires_minutes, + selection_locked=load_item.get("_selection_locked", False) or selection_locked, + state="off", + ) + continue + + if mode == "flexible" and selected_start_minutes is None: + forecasts[name] = self.additional_load_forecast_record( + entity_id, + enabled, + mode, + energy_total, + slot_energy, + duration, + weighting, + load_mode, + plan_interval, + requested_start_minutes, + requested_end_minutes, + periods, + weights, + weight_total, + source, + auto_expire, + expires_minutes, + selection_reason="pending_prediction_metric", + selection_locked=load_item.get("_selection_locked", False) or selection_locked, + state="off", + ) + continue + + if mode == "flexible" and selected_start_minutes is not None and not selection_locked: + forecasts[name] = self.additional_load_forecast_record( + entity_id, + enabled, + mode, + energy_total, + slot_energy, + duration, + weighting, + load_mode, + plan_interval, + requested_start_minutes, + requested_end_minutes, + periods, + weights, + weight_total, + source, + auto_expire, + expires_minutes, + suggested_start_minutes=start_minutes, + suggested_end_minutes=end_minutes, + selection_reason=load_item.get("_selection_reason", "prediction_metric"), + candidate_count=load_item.get("_candidate_count", 0), + selected_metric=load_item.get("_selected_metric", None), + baseline_metric=load_item.get("_baseline_metric", None), + state="off", + ) + continue + + for period in range(periods): + slot_start = start_minutes + period * plan_interval + slot_end = min(slot_start + plan_interval, end_minutes) + slot_minutes = slot_end - slot_start + if slot_end <= minutes_now_slot: + continue + if (slot_start - minutes_now_slot) >= self.forecast_minutes: + continue + energy, adjustment_energy = self.additional_load_slot_energies(energy_total, slot_energy, weights, weight_total, period, slot_minutes, plan_interval) + total_energy += energy + for minute in range(slot_start, slot_end): + load_adjust[minute] = dp4(load_adjust.get(minute, 0.0) + adjustment_energy) + target_times.append( + { + "start": self.additional_load_minutes_to_iso(slot_start), + "end": self.additional_load_minutes_to_iso(slot_end), + "energy": energy, + } + ) + + forecasts[name] = self.additional_load_forecast_record( + entity_id, + enabled, + mode, + energy_total, + slot_energy, + duration, + weighting, + load_mode, + plan_interval, + requested_start_minutes, + requested_end_minutes, + periods, + weights, + weight_total, + source, + auto_expire, + expires_minutes, + target_times=target_times, + total_energy=total_energy, + suggested_start_minutes=start_minutes if mode == "flexible" and target_times else None, + suggested_end_minutes=end_minutes if mode == "flexible" and target_times else None, + selection_reason=load_item.get("_selection_reason", "prediction_metric" if mode == "flexible" and target_times else None), + candidate_count=load_item.get("_candidate_count", 0), + selected_metric=load_item.get("_selected_metric", None), + baseline_metric=load_item.get("_baseline_metric", None), + selection_locked=load_item.get("_selection_locked", False) or selection_locked, + ) + + return load_adjust, forecasts + + def publish_additional_load_forecasts(self): + """ + Publish named additional load forecast binary sensors for visibility and automation targeting. + """ + if not hasattr(self, "house_load_additional_forecast_entities"): + self.house_load_additional_forecast_entities = set() + published_entities = set() + for name, forecast in self.house_load_additional_forecasts.items(): + attributes = { + "friendly_name": "Predbat load forecast delta {}".format(name), + "icon": "mdi:dishwasher", + "name": name, + "enabled": forecast.get("enabled", False), + "mode": forecast.get("mode", "fixed"), + "energy": forecast.get("energy", None), + "slot_energy": forecast.get("slot_energy", 0.0), + "duration": forecast.get("duration", 0.0), + "weighting": forecast.get("weighting", None), + "load_mode": forecast.get("load_mode", "total_energy"), + "plan_interval_minutes": forecast.get("plan_interval_minutes", self.plan_interval_minutes), + "slots": forecast.get("slots", 0), + "total_energy": forecast.get("total_energy", 0.0), + "requested_start": forecast.get("requested_start", None), + "requested_end": forecast.get("requested_end", None), + "suggested_start": forecast.get("suggested_start", None), + "suggested_end": forecast.get("suggested_end", None), + "selection_reason": forecast.get("selection_reason", None), + "candidate_count": forecast.get("candidate_count", 0), + "selected_metric": forecast.get("selected_metric", None), + "baseline_metric": forecast.get("baseline_metric", None), + "selection_locked": forecast.get("selection_locked", False), + "source": forecast.get("source", "yaml"), + "auto_expire": forecast.get("auto_expire", False), + "expires_at": forecast.get("expires_at", None), + "target_times": forecast.get("target_times", []), + } + self.dashboard_item(forecast["entity_id"], state=forecast.get("state", "off"), attributes=attributes) + published_entities.add(forecast["entity_id"]) + if forecast.get("source", "yaml") != "yaml": + delete_entity = self.additional_load_delete_entity_name(name) + self.dashboard_item( + delete_entity, + state="idle", + attributes={ + "friendly_name": "Delete Predbat load forecast delta {}".format(name), + "icon": "mdi:delete", + "name": name, + "source": forecast.get("source", "api"), + }, + ) + published_entities.add(delete_entity) + for entity_id in self.house_load_additional_forecast_entities - published_entities: + self.unpublish_additional_load_entity(entity_id) + self.house_load_additional_forecast_entities = published_entities + + def unpublish_additional_load_entity(self, entity_id): + """ + Remove a stale additional load forecast entity from HA and the local dashboard cache. + """ + if hasattr(self, "delete_state_wrapper"): + self.delete_state_wrapper(entity_id) + self.dashboard_values.pop(entity_id, None) + if entity_id in self.dashboard_index: + self.dashboard_index.remove(entity_id) + def filtered_today(self, time_data, resetmidnight=False, stamp=None): """ Grab figure for today (midnight) @@ -700,6 +1514,8 @@ def fetch_sensor_data(self, save=True): self.load_minutes_age = 0 self.load_forecast = {} self.load_forecast_array = [] + self.house_load_additional_forecast_adjust = {} + self.house_load_additional_forecasts = {} self.pv_forecast_minute = {} self.pv_forecast_minute10 = {} self.load_scaling_dynamic = {} @@ -739,6 +1555,10 @@ def fetch_sensor_data(self, save=True): # Fetch extra load forecast self.load_forecast, self.load_forecast_array = self.fetch_extra_load_forecast(self.now_utc, load_ml_forecast) + # Fetch named additional load forecast deltas + self.house_load_additional_forecast_adjust, self.house_load_additional_forecasts = self.fetch_additional_load_forecast() + self.publish_additional_load_forecasts() + # Load previous load data if self.get_arg("ge_cloud_data", False): self.download_ge_data(self.now_utc) @@ -2337,6 +3157,7 @@ def fetch_config_options(self): self.manual_demand_times = self.manual_times("manual_demand") self.manual_all_times = self.manual_charge_times + self.manual_export_times + self.manual_demand_times + self.manual_freeze_charge_times + self.manual_freeze_export_times self.manual_api = self.api_select_update("manual_api") + self.load_forecast_delta_api = self.api_select_update("load_forecast_delta_api") self.manual_import_rates = self.manual_rates("manual_import_rates", default_rate=self.get_arg("manual_import_value")) self.manual_export_rates = self.manual_rates("manual_export_rates", default_rate=self.get_arg("manual_export_value")) self.manual_load_adjust = self.manual_rates("manual_load_adjust", default_rate=self.get_arg("manual_load_value")) diff --git a/apps/predbat/ha.py b/apps/predbat/ha.py index 5e5fea180..a90161c77 100644 --- a/apps/predbat/ha.py +++ b/apps/predbat/ha.py @@ -890,13 +890,26 @@ def call_service(self, service, **kwargs): data_frame = {"domain": domain, "service": service, "service_data": data} return run_async(self.base.trigger_callback(data_frame)) - def api_call(self, endpoint, data_in=None, post=False, core=True, silent=False): + def delete_state(self, entity_id): + """ + Delete a state from Home Assistant. + """ + entity_id_lower = entity_id.lower() + self.db_mirror_list.pop(entity_id, None) + self.db_mirror_list.pop(entity_id_lower, None) + self.state_data.pop(entity_id_lower, None) + if self.ha_key: + self.api_call("/api/states/{}".format(entity_id), delete=True) + return True + + def api_call(self, endpoint, data_in=None, post=False, delete=False, core=True, silent=False): """ Make an API call to Home Assistant. :param endpoint: The API endpoint to call. :param data_in: The data to send in the body of the request. :param post: True if this is a POST request, False for GET. + :param delete: True if this is a DELETE request :param core: True is this is a call to HA Core, False if it is a Supervisor call :param silent: True if warning message from the API call is to be suppressed :return: The response from the API. @@ -918,7 +931,9 @@ def api_call(self, endpoint, data_in=None, post=False, core=True, silent=False): "Accept": "application/json", } try: - if post: + if delete: + response = requests.delete(url, headers=headers, timeout=TIMEOUT) + elif post: if data_in: response = requests.post(url, headers=headers, json=data_in, timeout=TIMEOUT) else: @@ -928,7 +943,7 @@ def api_call(self, endpoint, data_in=None, post=False, core=True, silent=False): response = requests.get(url, headers=headers, params=data_in, timeout=TIMEOUT) else: response = requests.get(url, headers=headers, timeout=TIMEOUT) - data = response.json() + data = {} if delete and not response.text else response.json() self.api_errors = 0 except requests.exceptions.JSONDecodeError: if not silent: # suppress warning message for call to get slug id from supervisor because in docker installs this will always error (no supervisor) diff --git a/apps/predbat/output.py b/apps/predbat/output.py index d7efa23da..ab8d3630c 100644 --- a/apps/predbat/output.py +++ b/apps/predbat/output.py @@ -32,6 +32,66 @@ class Output: charging schedules, and financial metric summaries. """ + def additional_load_plan_time(self, timestamp): + """ + Return a compact local time string for an additional load timestamp. + """ + return datetime.fromisoformat(timestamp).strftime("%H:%M") + + def get_additional_load_text(self): + """ + Return a textual summary of planned and suggested additional load forecasts. + """ + planned_loads = [] + for name, forecast in sorted(getattr(self, "house_load_additional_forecasts", {}).items()): + if not forecast.get("enabled", False): + continue + target_times = forecast.get("target_times", []) + total_energy = forecast.get("total_energy", 0.0) + if target_times and total_energy > 0: + running = forecast.get("selection_locked", False) + start = forecast.get("suggested_start") if running and forecast.get("suggested_start") else target_times[0].get("start") + end = forecast.get("suggested_end") if running and forecast.get("suggested_end") else target_times[-1].get("end") + if running: + total_energy = forecast.get("energy", total_energy) or total_energy + status = "running" + text = "{} is running from {} to {} using {:.2f} kWh".format(name, self.additional_load_plan_time(start), self.additional_load_plan_time(end), dp2(total_energy)) if start and end else None + else: + status = "planned" + text = "{} from {} to {} using {:.2f} kWh is planned".format(name, self.additional_load_plan_time(start), self.additional_load_plan_time(end), dp2(total_energy)) if start and end else None + else: + start = forecast.get("suggested_start") + end = forecast.get("suggested_end") + total_energy = forecast.get("energy", 0.0) + if not total_energy: + plan_interval = forecast.get("plan_interval_minutes", self.plan_interval_minutes) + periods = int((int(forecast.get("duration", 0.0) * 60) + plan_interval - 1) / plan_interval) if plan_interval > 0 else 0 + total_energy = forecast.get("slot_energy", 0.0) * periods + status = "suggested" + text = "{} is suggested from {} to {} using {:.2f} kWh".format(name, self.additional_load_plan_time(start), self.additional_load_plan_time(end), dp2(total_energy)) if start and end and total_energy > 0 else None + + if not start or not end: + continue + if not text: + continue + planned_loads.append( + { + "name": name, + "start": start, + "end": end, + "status": status, + "text": text, + } + ) + + if not planned_loads: + return "" + + planned_loads = sorted(planned_loads, key=lambda load: load["start"]) + if len(planned_loads) == 1: + return "- Additional load {}.\n".format(planned_loads[0]["text"]) + return "- Additional loads are planned/suggested: {}.\n".format("; ".join(load["text"] for load in planned_loads)) + def publish_car_plan(self): """ Publish the car charging plan @@ -915,6 +975,8 @@ def short_textual_plan(self, soc_min, soc_min_minute, pv_forecast_minute_step, p if car_charging_kwh > 0: sentence += "- Your car is currently charging.\n" + sentence += self.get_additional_load_text() + charge_window_n_next = self.get_next_charge_window(self.minutes_now) export_window_n_next = self.get_next_export_window(self.minutes_now) if charge_window_n < 0 and charge_window_n_next >= 0: diff --git a/apps/predbat/plan.py b/apps/predbat/plan.py index da575048b..b6069f89a 100644 --- a/apps/predbat/plan.py +++ b/apps/predbat/plan.py @@ -67,6 +67,134 @@ class Plan: runs to minimise the overall cost metric. """ + def additional_load_candidate_profile(self, forecast, start_minutes): + """ + Build absolute-minute adjustment and target metadata for one flexible load candidate. + """ + plan_interval = forecast.get("plan_interval_minutes", self.plan_interval_minutes) + duration_minutes = int(forecast.get("duration", 0.0) * 60) + end_minutes = start_minutes + duration_minutes + periods = forecast.get("_periods", 0) + weights = forecast.get("_weights", []) + weight_total = forecast.get("_weight_total", sum(weights)) + energy_total = forecast.get("energy", None) + slot_energy = forecast.get("slot_energy", 0.0) + load_adjust = {} + target_times = [] + total_energy = 0.0 + + for period in range(periods): + slot_start = start_minutes + period * plan_interval + slot_end = min(slot_start + plan_interval, end_minutes) + slot_minutes = slot_end - slot_start + if slot_end <= self.minutes_now: + continue + if (slot_start - self.minutes_now) >= self.forecast_minutes: + continue + energy, adjustment_energy = self.additional_load_slot_energies(energy_total, slot_energy, weights, weight_total, period, slot_minutes, plan_interval) + total_energy += energy + for minute in range(slot_start, slot_end): + load_adjust[minute] = dp4(load_adjust.get(minute, 0.0) + adjustment_energy) + target_times.append({"start": (self.midnight_utc + timedelta(minutes=slot_start)).isoformat(), "end": (self.midnight_utc + timedelta(minutes=slot_end)).isoformat(), "energy": energy}) + + return load_adjust, target_times, dp4(total_energy) + + def add_additional_load_to_step_data(self, load_minutes_step, load_adjust): + """ + Add absolute-minute additional load adjustment into prediction step data. + """ + modified_load = copy.deepcopy(load_minutes_step) + for minute_absolute, energy in load_adjust.items(): + minute_relative = minute_absolute - self.minutes_now + if minute_relative < 0 or minute_relative >= self.forecast_minutes: + continue + step_minute = int(minute_relative / PREDICT_STEP) * PREDICT_STEP + modified_load[step_minute] = dp4(modified_load.get(step_minute, 0.0) + energy * PREDICT_STEP / float(self.plan_interval_minutes)) + return modified_load + + def select_flexible_additional_loads(self, load_minutes_step, load_minutes_step10, pv_forecast_minute_step, pv_forecast_minute10_step): + """ + Select flexible additional load start times using full prediction metric impact. + """ + flexible_forecasts = {name: forecast for name, forecast in self.house_load_additional_forecasts.items() if forecast.get("enabled") and forecast.get("mode") == "flexible" and not forecast.get("selection_locked", False)} + if not flexible_forecasts: + return False, load_minutes_step, load_minutes_step10 + + selected_flexible = {} + working_load_step = load_minutes_step + working_load_step10 = load_minutes_step10 + + for name, forecast in flexible_forecasts.items(): + start_minutes = forecast.get("_requested_start_minutes", None) + end_minutes = forecast.get("_requested_end_minutes", None) + duration_minutes = int(forecast.get("duration", 0.0) * 60) + plan_interval = forecast.get("plan_interval_minutes", self.plan_interval_minutes) + if start_minutes is None or end_minutes is None or duration_minutes <= 0: + continue + + candidate = max(start_minutes, self.minutes_now) + candidate = int((candidate + plan_interval - 1) / plan_interval) * plan_interval + latest_start = min(end_minutes - duration_minutes, self.minutes_now + self.forecast_minutes - duration_minutes) + if latest_start < candidate: + continue + + baseline_prediction = Prediction(self, pv_forecast_minute_step, pv_forecast_minute10_step, working_load_step, working_load_step10) + baseline_metric = baseline_prediction.run_prediction(self.charge_limit_best, self.charge_window_best, self.export_window_best, self.export_limits_best, False, self.end_record)[0] + best_start = None + best_metric = None + candidate_count = 0 + + while candidate <= latest_start: + candidate_adjust, _, _ = self.additional_load_candidate_profile(forecast, candidate) + candidate_load_step = self.add_additional_load_to_step_data(working_load_step, candidate_adjust) + candidate_load_step10 = self.add_additional_load_to_step_data(working_load_step10, candidate_adjust) + candidate_prediction = Prediction(self, pv_forecast_minute_step, pv_forecast_minute10_step, candidate_load_step, candidate_load_step10) + candidate_metric = candidate_prediction.run_prediction(self.charge_limit_best, self.charge_window_best, self.export_window_best, self.export_limits_best, False, self.end_record)[0] + candidate_count += 1 + if best_metric is None or candidate_metric < best_metric: + best_metric = candidate_metric + best_start = candidate + candidate += plan_interval + + if best_start is not None: + best_adjust, _, _ = self.additional_load_candidate_profile(forecast, best_start) + working_load_step = self.add_additional_load_to_step_data(working_load_step, best_adjust) + working_load_step10 = self.add_additional_load_to_step_data(working_load_step10, best_adjust) + selected_flexible[name] = { + "_requested_start_minutes": start_minutes, + "_requested_end_minutes": end_minutes, + "_selected_start_minutes": best_start, + "_selection_reason": "prediction_metric", + "_candidate_count": candidate_count, + "_selected_metric": dp2(best_metric) if best_metric is not None else None, + "_baseline_metric": dp2(baseline_metric), + "_expires_minutes": best_start + duration_minutes if forecast.get("auto_expire", False) else None, + } + if forecast.get("auto_expire", False): + self.house_load_additional_forecast_overrides[name] = {"name": name, **selected_flexible[name]} + self.update_additional_load_api_command_metadata( + name, + { + "_requested_start": self.additional_load_minutes_to_stamp(start_minutes), + "_requested_end": self.additional_load_minutes_to_stamp(end_minutes), + "_selected_start": self.additional_load_minutes_to_stamp(best_start), + "_selected_end": self.additional_load_minutes_to_stamp(best_start + duration_minutes), + "_expires_at": self.additional_load_minutes_to_stamp(best_start + duration_minutes), + "_selection_reason": "prediction_metric", + "_candidate_count": candidate_count, + "_selected_metric": dp2(best_metric) if best_metric is not None else None, + "_baseline_metric": dp2(baseline_metric), + }, + ) + self.log("Flexible additional load {} selected {}-{} using prediction metric {} from {} candidates".format(name, self.time_abs_str(best_start), self.time_abs_str(best_start + duration_minutes), dp2(best_metric), candidate_count)) + + if not selected_flexible: + return False, load_minutes_step, load_minutes_step10 + + self.house_load_additional_forecast_adjust, self.house_load_additional_forecasts = self.fetch_additional_load_forecast(selected_flexible=selected_flexible) + self.publish_additional_load_forecasts() + return True, working_load_step, working_load_step10 + def dynamic_load(self): """ Adjust load prediction based on current load @@ -930,6 +1058,9 @@ def calculate_plan(self, recompute=True, debug_mode=False, publish=True): # Created optimised step data self.metric_cloud_coverage = self.get_cloud_factor(self.minutes_now, self.pv_forecast_minute, self.pv_forecast_minute10) self.metric_load_divergence = self.get_load_divergence(self.minutes_now, self.load_minutes) + load_adjust = self.manual_load_adjust.copy() + for minute, adjustment in self.house_load_additional_forecast_adjust.items(): + load_adjust[minute] = load_adjust.get(minute, 0.0) + adjustment load_minutes_step = self.step_data_history( self.load_minutes, self.minutes_now, @@ -940,7 +1071,7 @@ def calculate_plan(self, recompute=True, debug_mode=False, publish=True): load_forecast=self.load_forecast, load_scaling_dynamic=self.load_scaling_dynamic, cloud_factor=self.metric_load_divergence, - load_adjust=self.manual_load_adjust, + load_adjust=load_adjust, load_baseline=self.dynamic_load_baseline, ) load_minutes_step10 = self.step_data_history( @@ -953,7 +1084,7 @@ def calculate_plan(self, recompute=True, debug_mode=False, publish=True): load_forecast=self.load_forecast, load_scaling_dynamic=self.load_scaling_dynamic, cloud_factor=min(self.metric_load_divergence + 0.5, 1.0) if self.metric_load_divergence else None, - load_adjust=self.manual_load_adjust, + load_adjust=load_adjust, load_baseline=self.dynamic_load_baseline, ) pv_forecast_minute_step = self.step_data_history(self.pv_forecast_minute, self.minutes_now, forward=True, cloud_factor=self.metric_cloud_coverage) @@ -972,6 +1103,12 @@ def calculate_plan(self, recompute=True, debug_mode=False, publish=True): # Creation prediction object self.prediction = Prediction(self, pv_forecast_minute_step, pv_forecast_minute10_step, load_minutes_step, load_minutes_step10) + flexible_selected, load_minutes_step, load_minutes_step10 = self.select_flexible_additional_loads(load_minutes_step, load_minutes_step10, pv_forecast_minute_step, pv_forecast_minute10_step) + if flexible_selected: + self.load_minutes_step = load_minutes_step + self.load_minutes_step10 = load_minutes_step10 + self.prediction = Prediction(self, pv_forecast_minute_step, pv_forecast_minute10_step, load_minutes_step, load_minutes_step10) + # Check if LoadML is active and disable thread pools as it causes lockup due to race conditions with NumPy load_ml_comp = self.components.get_component("load_ml") if self.components else None load_ml_calculating = False diff --git a/apps/predbat/predbat.py b/apps/predbat/predbat.py index f44cfbf82..64c9aef67 100644 --- a/apps/predbat/predbat.py +++ b/apps/predbat/predbat.py @@ -333,6 +333,18 @@ def set_state_wrapper(self, entity_id, state, attributes={}, required_unit=None) state = self.unit_conversion(entity_id, state, None, required_unit, going_to=True) return self.ha_interface.set_state(entity_id, state, attributes=attributes) + def delete_state_wrapper(self, entity_id): + """ + Wrapper function to delete state from HA. + """ + if not self.ha_interface: + self.log("Error: delete_state_wrapper - No HA interface available") + return False + if not hasattr(self.ha_interface, "delete_state"): + return False + + return self.ha_interface.delete_state(entity_id) + def fire_event_wrapper(self, domain, service): """ Wrapper function to fire a HA event @@ -462,9 +474,13 @@ def reset(self): self.manual_demand_times = [] self.manual_all_times = [] self.manual_api = [] + self.load_forecast_delta_api = [] self.manual_import_rates = {} self.manual_export_rates = {} self.manual_load_adjust = {} + self.house_load_additional_forecast_adjust = {} + self.house_load_additional_forecasts = {} + self.house_load_additional_forecast_overrides = {} self.config_index = {} self.dashboard_index = [] self.dashboard_index_app = {} diff --git a/apps/predbat/tests/test_additional_load_forecast.py b/apps/predbat/tests/test_additional_load_forecast.py new file mode 100644 index 000000000..100bc08d0 --- /dev/null +++ b/apps/predbat/tests/test_additional_load_forecast.py @@ -0,0 +1,1001 @@ +# ----------------------------------------------------------------------------- +# Predbat Home Battery System +# Copyright Trefor Southwell 2026 - All Rights Reserved +# This application maybe used for personal use only and not for commercial use +# ----------------------------------------------------------------------------- +# fmt off +# pylint: disable=consider-using-f-string +# pylint: disable=line-too-long +# pylint: disable=attribute-defined-outside-init + +"""Tests for named additional house load forecasts.""" + +from datetime import timedelta + +import plan as plan_module +from tests.test_infra import run_async + + +def configure_additional_load_test(my_predbat): + """Configure deterministic clock and plan settings for additional load tests.""" + my_predbat.minutes_now = 10 * 60 + my_predbat.plan_interval_minutes = 30 + my_predbat.forecast_minutes = 24 * 60 + my_predbat.args["plan_interval_minutes"] = 30 + my_predbat.house_load_additional_forecast_overrides = {} + my_predbat.house_load_additional_forecast_entities = set() + my_predbat.house_load_additional_forecasts = {} + my_predbat.house_load_additional_forecast_adjust = {} + + +def configure_additional_load_rates(my_predbat, cheap_start, cheap_end): + """Configure deterministic import rates for flexible load tests.""" + my_predbat.rate_import = {minute: 30.0 for minute in range(0, 3 * 24 * 60)} + for minute in range(cheap_start, cheap_end): + my_predbat.rate_import[minute] = 5.0 + + +def check_slot(load_adjust, minute, expected, label): + """Check one generated load adjustment minute value.""" + actual = load_adjust.get(minute, 0.0) + if actual != expected: + print("ERROR: {} expected {} at minute {} got {}".format(label, expected, minute, actual)) + return 1 + return 0 + + +def test_additional_load_disabled(my_predbat): + """Test duration 0 disables an additional load forecast.""" + failed = 0 + configure_additional_load_test(my_predbat) + my_predbat.args["house_load_additional_forecast"] = [ + {"name": "dishwasher", "start_time": "20:00", "duration": 0, "energy": 1.2}, + ] + + load_adjust, forecasts = my_predbat.fetch_additional_load_forecast() + if load_adjust: + print("ERROR: Disabled additional load should not produce adjustments, got {}".format(load_adjust)) + failed = 1 + if forecasts.get("dishwasher", {}).get("state") != "off": + print("ERROR: Disabled additional load should publish off state") + failed = 1 + return failed + + +def test_additional_load_enabled_false_profile(my_predbat): + """Test enabled false publishes a disabled profile without load adjustment.""" + failed = 0 + configure_additional_load_test(my_predbat) + my_predbat.args["house_load_additional_forecast"] = [ + {"name": "dishwasher", "enabled": False, "start_time": "20:00", "duration": 2.0, "energy": 1.2}, + ] + + load_adjust, forecasts = my_predbat.fetch_additional_load_forecast() + if load_adjust: + print("ERROR: enabled false profile should not produce adjustments, got {}".format(load_adjust)) + failed = 1 + forecast = forecasts.get("dishwasher", {}) + if forecast.get("state") != "off" or forecast.get("enabled"): + print("ERROR: enabled false profile should publish off and enabled false, got {}".format(forecast)) + failed = 1 + return failed + + +def test_additional_load_dishwasher_simple(my_predbat): + """Test a simple dishwasher total energy forecast.""" + failed = 0 + configure_additional_load_test(my_predbat) + my_predbat.args["house_load_additional_forecast"] = [ + {"name": "dishwasher", "start_time": "20:00", "duration": 2.0, "energy": 2.0}, + ] + + load_adjust, forecasts = my_predbat.fetch_additional_load_forecast() + for minute in [20 * 60, 20 * 60 + 30, 21 * 60, 21 * 60 + 30]: + failed |= check_slot(load_adjust, minute, 0.5, "dishwasher simple") + failed |= check_slot(load_adjust, 22 * 60, 0.0, "dishwasher simple end") + if forecasts.get("dishwasher", {}).get("state") != "on": + print("ERROR: Dishwasher additional load should publish on state") + failed = 1 + if len(forecasts.get("dishwasher", {}).get("target_times", [])) != 4: + print("ERROR: Dishwasher target_times should contain 4 slots") + failed = 1 + my_predbat.house_load_additional_forecasts = forecasts + my_predbat.publish_additional_load_forecasts() + sensor = my_predbat.dashboard_values.get("binary_sensor.predbat_load_forecast_delta_dishwasher", {}) + if sensor.get("state") != "on": + print("ERROR: Dishwasher binary sensor should be published on") + failed = 1 + if len(sensor.get("attributes", {}).get("target_times", [])) != 4: + print("ERROR: Dishwasher binary sensor should publish target_times") + failed = 1 + return failed + + +def test_additional_load_end_time_without_duration(my_predbat): + """Test fixed additional load can use end_time instead of duration.""" + failed = 0 + configure_additional_load_test(my_predbat) + my_predbat.args["house_load_additional_forecast"] = [ + {"name": "cooking", "start_time": "18:00", "end_time": "19:30", "energy": 1.2}, + ] + + load_adjust, forecasts = my_predbat.fetch_additional_load_forecast() + failed |= check_slot(load_adjust, 18 * 60, 0.4, "end time without duration") + failed |= check_slot(load_adjust, 18 * 60 + 30, 0.4, "end time without duration") + failed |= check_slot(load_adjust, 19 * 60, 0.4, "end time without duration") + if forecasts.get("cooking", {}).get("slots") != 3: + print("ERROR: end_time without duration should create 3 slots") + failed = 1 + return failed + + +def test_additional_load_slot_energy_weighting(my_predbat): + """Test advanced slot energy weighting multiplies the per-slot energy.""" + failed = 0 + configure_additional_load_test(my_predbat) + my_predbat.args["house_load_additional_forecast"] = [ + {"name": "heating", "start_time": "20:00", "duration": 2.0, "slot_energy": 0.5, "weighting": "2,2,*"}, + ] + + load_adjust, _ = my_predbat.fetch_additional_load_forecast() + failed |= check_slot(load_adjust, 20 * 60, 1.0, "slot energy weighting") + failed |= check_slot(load_adjust, 20 * 60 + 30, 1.0, "slot energy weighting") + failed |= check_slot(load_adjust, 21 * 60, 0.5, "slot energy weighting") + failed |= check_slot(load_adjust, 21 * 60 + 30, 0.5, "slot energy weighting") + return failed + + +def test_additional_load_dishwasher_total_energy(my_predbat): + """Test dishwasher total energy is distributed across plan slots.""" + failed = 0 + configure_additional_load_test(my_predbat) + my_predbat.plan_interval_minutes = 15 + my_predbat.args["plan_interval_minutes"] = 15 + my_predbat.args["house_load_additional_forecast"] = [ + {"name": "dishwasher", "start_time": "20:00", "duration": 2.0, "energy": 1.2}, + ] + + load_adjust, forecasts = my_predbat.fetch_additional_load_forecast() + for minute in [20 * 60, 20 * 60 + 15, 20 * 60 + 30, 20 * 60 + 45, 21 * 60, 21 * 60 + 15, 21 * 60 + 30, 21 * 60 + 45]: + failed |= check_slot(load_adjust, minute, 0.15, "dishwasher total energy") + forecast = forecasts.get("dishwasher", {}) + if forecast.get("load_mode") != "total_energy": + print("ERROR: Dishwasher energy mode should be total_energy") + failed = 1 + if forecast.get("slots") != 8: + print("ERROR: Dishwasher energy mode should create 8 slots, got {}".format(forecast.get("slots"))) + failed = 1 + if forecast.get("total_energy") != 1.2: + print("ERROR: Dishwasher total energy should be 1.2, got {}".format(forecast.get("total_energy"))) + failed = 1 + return failed + + +def test_additional_load_dishwasher_total_energy_weighting(my_predbat): + """Test total energy weighting redistributes, rather than increases, energy.""" + failed = 0 + configure_additional_load_test(my_predbat) + my_predbat.args["house_load_additional_forecast"] = [ + {"name": "dishwasher", "start_time": "20:00", "duration": 2.0, "energy": 1.2, "weighting": "2,2,*"}, + ] + + load_adjust, forecasts = my_predbat.fetch_additional_load_forecast() + failed |= check_slot(load_adjust, 20 * 60, 0.4, "dishwasher total energy weighting") + failed |= check_slot(load_adjust, 20 * 60 + 30, 0.4, "dishwasher total energy weighting") + failed |= check_slot(load_adjust, 21 * 60, 0.2, "dishwasher total energy weighting") + failed |= check_slot(load_adjust, 21 * 60 + 30, 0.2, "dishwasher total energy weighting") + if forecasts.get("dishwasher", {}).get("total_energy") != 1.2: + print("ERROR: Dishwasher weighted total energy should remain 1.2") + failed = 1 + return failed + + +def test_additional_load_partial_duration_keeps_total_energy(my_predbat): + """Test partial final slots preserve the configured total energy.""" + failed = 0 + configure_additional_load_test(my_predbat) + my_predbat.args["house_load_additional_forecast"] = [ + {"name": "dishwasher", "start_time": "20:00", "duration": 1.25, "energy": 1.2}, + ] + + load_adjust, forecasts = my_predbat.fetch_additional_load_forecast() + failed |= check_slot(load_adjust, 20 * 60, 0.4, "partial duration first slot") + failed |= check_slot(load_adjust, 20 * 60 + 30, 0.4, "partial duration second slot") + failed |= check_slot(load_adjust, 21 * 60, 0.8, "partial duration final half slot") + forecast = forecasts.get("dishwasher", {}) + target_total = round(sum(slot.get("energy", 0.0) for slot in forecast.get("target_times", [])), 4) + if forecast.get("total_energy") != 1.2 or target_total != 1.2: + print("ERROR: Partial duration should publish the configured total energy, got forecast {} target total {}".format(forecast, target_total)) + failed = 1 + return failed + + +def test_additional_load_multiple_and_api_override(my_predbat): + """Test multiple loads add together and API override updates one named load.""" + failed = 0 + configure_additional_load_test(my_predbat) + my_predbat.args["house_load_additional_forecast"] = [ + {"name": "dishwasher", "start_time": "20:00", "duration": 2.0, "energy": 2.0}, + {"name": "heating", "start_time": "20:30", "duration": 1.0, "energy": 0.5}, + ] + + load_adjust, _ = my_predbat.fetch_additional_load_forecast() + failed |= check_slot(load_adjust, 20 * 60, 0.5, "multiple loads dishwasher") + failed |= check_slot(load_adjust, 20 * 60 + 30, 0.75, "multiple loads overlap") + failed |= check_slot(load_adjust, 21 * 60, 0.75, "multiple loads overlap") + + my_predbat.api_select("load_forecast_delta_api", "dishwasher?start_time=18:00&duration=1.0&energy=0.8") + load_adjust, _ = my_predbat.fetch_additional_load_forecast() + failed |= check_slot(load_adjust, 18 * 60, 0.4, "api override dishwasher") + failed |= check_slot(load_adjust, 18 * 60 + 30, 0.4, "api override dishwasher") + failed |= check_slot(load_adjust, 20 * 60, 0.0, "api override removed old dishwasher") + failed |= check_slot(load_adjust, 20 * 60 + 30, 0.25, "api override kept heating") + my_predbat.api_select("load_forecast_delta_api", "off") + my_predbat.house_load_additional_forecast_overrides = {} + return failed + + +def test_additional_load_select_api_override(my_predbat): + """Test standard HA select API updates a named load forecast.""" + failed = 0 + configure_additional_load_test(my_predbat) + my_predbat.args["house_load_additional_forecast"] = [ + {"name": "dishwasher", "start_time": "20:00", "duration": 0, "energy": 0}, + ] + + my_predbat.api_select("load_forecast_delta_api", "dishwasher?start_time=18:00&duration=2.0&energy=1.2") + load_adjust, forecasts = my_predbat.fetch_additional_load_forecast() + failed |= check_slot(load_adjust, 18 * 60, 0.3, "select API dishwasher") + failed |= check_slot(load_adjust, 18 * 60 + 30, 0.3, "select API dishwasher") + failed |= check_slot(load_adjust, 19 * 60, 0.3, "select API dishwasher") + failed |= check_slot(load_adjust, 19 * 60 + 30, 0.3, "select API dishwasher") + forecast = forecasts.get("dishwasher", {}) + if forecast.get("state") != "on" or forecast.get("total_energy") != 1.2: + print("ERROR: Select API dishwasher forecast not enabled correctly: {}".format(forecast)) + failed = 1 + + my_predbat.api_select("load_forecast_delta_api", "off") + my_predbat.house_load_additional_forecast_overrides = {} + return failed + + +def test_additional_load_select_api_weighting(my_predbat): + """Test select API accepts pipe-separated weighting.""" + failed = 0 + configure_additional_load_test(my_predbat) + my_predbat.args["house_load_additional_forecast"] = [] + + my_predbat.api_select("load_forecast_delta_api", "dishwasher?start_time=18:00&duration=2.0&energy=1.2&weighting=2|2|*") + load_adjust, forecasts = my_predbat.fetch_additional_load_forecast() + failed |= check_slot(load_adjust, 18 * 60, 0.4, "select API weighting dishwasher") + failed |= check_slot(load_adjust, 18 * 60 + 30, 0.4, "select API weighting dishwasher") + failed |= check_slot(load_adjust, 19 * 60, 0.2, "select API weighting dishwasher") + failed |= check_slot(load_adjust, 19 * 60 + 30, 0.2, "select API weighting dishwasher") + if forecasts.get("dishwasher", {}).get("total_energy") != 1.2: + print("ERROR: Select API weighted dishwasher total energy should remain 1.2") + failed = 1 + + my_predbat.api_select("load_forecast_delta_api", "off") + my_predbat.house_load_additional_forecast_overrides = {} + return failed + + +def test_additional_load_select_event_updates_adjustment(my_predbat): + """Test HA select event immediately rebuilds additional load adjustment.""" + failed = 0 + configure_additional_load_test(my_predbat) + my_predbat.args["house_load_additional_forecast"] = [ + {"name": "dishwasher", "start_time": "20:00", "duration": 0, "energy": 0}, + ] + + service_data = { + "domain": "select", + "service": "select_option", + "service_data": { + "entity_id": "select.predbat_load_forecast_delta_api", + "option": "dishwasher?start_time=18:00&duration=2.0&energy=1.2", + }, + } + run_async(my_predbat.trigger_callback(service_data)) + + failed |= check_slot(my_predbat.house_load_additional_forecast_adjust, 18 * 60, 0.3, "select event immediate adjustment") + failed |= check_slot(my_predbat.house_load_additional_forecast_adjust, 19 * 60 + 30, 0.3, "select event immediate adjustment") + sensor = my_predbat.dashboard_values.get("binary_sensor.predbat_load_forecast_delta_dishwasher", {}) + attributes = sensor.get("attributes", {}) + if sensor.get("state") != "on" or attributes.get("total_energy") != 1.2: + print("ERROR: Select event should immediately publish dishwasher forecast, got {}".format(sensor)) + failed = 1 + + my_predbat.api_select("load_forecast_delta_api", "off") + my_predbat.house_load_additional_forecast_overrides = {} + return failed + + +def test_additional_load_delete_button_removes_api_forecast(my_predbat): + """Test delete button removes a one-shot API forecast.""" + failed = 0 + configure_additional_load_test(my_predbat) + my_predbat.args["house_load_additional_forecast"] = [] + my_predbat.api_select("load_forecast_delta_api", "dishwasher?start_time=20:00&duration=2.0&energy=1.2") + my_predbat.refresh_additional_load_forecast_api() + + button = my_predbat.dashboard_values.get("button.predbat_load_forecast_delta_dishwasher_delete", {}) + if button.get("state") != "idle": + print("ERROR: Dishwasher delete button should publish idle, got {}".format(button)) + failed = 1 + + service_data = { + "domain": "button", + "service": "press", + "service_data": {"entity_id": "button.predbat_load_forecast_delta_dishwasher_delete"}, + } + run_async(my_predbat.trigger_callback(service_data)) + failed |= check_slot(my_predbat.house_load_additional_forecast_adjust, 20 * 60, 0.0, "delete button removed dishwasher") + if my_predbat.api_select_update("load_forecast_delta_api"): + print("ERROR: Delete button should clear API forecast") + failed = 1 + if "binary_sensor.predbat_load_forecast_delta_dishwasher" in my_predbat.dashboard_values: + print("ERROR: Delete button should remove dishwasher binary sensor") + failed = 1 + if "button.predbat_load_forecast_delta_dishwasher_delete" in my_predbat.dashboard_values: + print("ERROR: Delete button should remove dishwasher delete button") + failed = 1 + + my_predbat.house_load_additional_forecast_overrides = {} + return failed + + +def test_additional_load_delete_button_removes_sanitized_api_forecast(my_predbat): + """Test delete buttons remove API forecasts whose names require entity sanitizing.""" + failed = 0 + configure_additional_load_test(my_predbat) + my_predbat.args["house_load_additional_forecast"] = [] + my_predbat.api_select("load_forecast_delta_api", "Dishwasher Eco?start_time=20:00&duration=2.0&energy=1.2") + my_predbat.refresh_additional_load_forecast_api() + + service_data = { + "domain": "button", + "service": "press", + "service_data": {"entity_id": "button.predbat_load_forecast_delta_dishwasher_eco_delete"}, + } + run_async(my_predbat.trigger_callback(service_data)) + if my_predbat.api_select_update("load_forecast_delta_api") or my_predbat.house_load_additional_forecasts: + print("ERROR: Sanitized delete button should remove API forecast, got forecasts {} api {}".format(my_predbat.house_load_additional_forecasts, my_predbat.api_select_update("load_forecast_delta_api"))) + failed = 1 + if "button.predbat_load_forecast_delta_dishwasher_eco_delete" in my_predbat.dashboard_values: + print("ERROR: Sanitized delete button should be unpublished") + failed = 1 + + my_predbat.house_load_additional_forecast_overrides = {} + return failed + + +def test_additional_load_yaml_does_not_publish_delete_button(my_predbat): + """Test YAML forecasts do not get one-shot delete buttons.""" + failed = 0 + configure_additional_load_test(my_predbat) + my_predbat.dashboard_values.pop("button.predbat_load_forecast_delta_dishwasher_delete", None) + my_predbat.args["house_load_additional_forecast"] = [ + {"name": "dishwasher", "start_time": "20:00", "duration": 2.0, "energy": 1.2}, + ] + my_predbat.refresh_additional_load_forecast_api() + + if "button.predbat_load_forecast_delta_dishwasher_delete" in my_predbat.dashboard_values: + print("ERROR: YAML forecast should not publish delete button") + failed = 1 + + my_predbat.house_load_additional_forecast_overrides = {} + return failed + + +def test_additional_load_api_forecast_auto_expires(my_predbat): + """Test one-shot API forecasts are removed after their finish time.""" + failed = 0 + configure_additional_load_test(my_predbat) + my_predbat.args["house_load_additional_forecast"] = [] + my_predbat.api_select("load_forecast_delta_api", "dishwasher?start_time=18:00&duration=1.0&energy=0.8") + my_predbat.refresh_additional_load_forecast_api() + failed |= check_slot(my_predbat.house_load_additional_forecast_adjust, 18 * 60, 0.4, "api forecast before expiry") + + my_predbat.minutes_now = 19 * 60 + my_predbat.refresh_additional_load_forecast_api() + if my_predbat.api_select_update("load_forecast_delta_api"): + print("ERROR: API forecast should be removed after expiry") + failed = 1 + if my_predbat.house_load_additional_forecasts: + print("ERROR: Expired API forecast should not remain active, got {}".format(my_predbat.house_load_additional_forecasts)) + failed = 1 + if "binary_sensor.predbat_load_forecast_delta_dishwasher" in my_predbat.dashboard_values: + print("ERROR: Expired API forecast should remove dishwasher binary sensor") + failed = 1 + if "button.predbat_load_forecast_delta_dishwasher_delete" in my_predbat.dashboard_values: + print("ERROR: Expired API forecast should remove dishwasher delete button") + failed = 1 + + my_predbat.house_load_additional_forecast_overrides = {} + return failed + + +def test_additional_load_yaml_placeholder_not_published(my_predbat): + """Test empty YAML placeholders do not publish dead forecast entities.""" + failed = 0 + configure_additional_load_test(my_predbat) + my_predbat.args["house_load_additional_forecast"] = [ + {"name": "dishwasher"}, + ] + my_predbat.refresh_additional_load_forecast_api() + + if "binary_sensor.predbat_load_forecast_delta_dishwasher" in my_predbat.dashboard_values: + print("ERROR: Empty YAML placeholder should not publish dishwasher binary sensor") + failed = 1 + + my_predbat.house_load_additional_forecast_overrides = {} + return failed + + +def test_additional_load_stale_delete_button_no_replan(my_predbat): + """Test stale delete button press does not invalidate a plan.""" + failed = 0 + configure_additional_load_test(my_predbat) + my_predbat.args["house_load_additional_forecast"] = [] + my_predbat.dashboard_values["binary_sensor.predbat_load_forecast_delta_dishwasher"] = {"state": "off", "attributes": {}} + my_predbat.dashboard_values["button.predbat_load_forecast_delta_dishwasher_delete"] = {"state": "idle", "attributes": {}} + my_predbat.dashboard_index.append("binary_sensor.predbat_load_forecast_delta_dishwasher") + my_predbat.dashboard_index.append("button.predbat_load_forecast_delta_dishwasher_delete") + my_predbat.update_pending = False + my_predbat.plan_valid = True + + service_data = { + "domain": "button", + "service": "press", + "service_data": {"entity_id": "button.predbat_load_forecast_delta_dishwasher_delete"}, + } + run_async(my_predbat.trigger_callback(service_data)) + if my_predbat.update_pending or not my_predbat.plan_valid: + print("ERROR: Stale delete button should not invalidate plan") + failed = 1 + if "binary_sensor.predbat_load_forecast_delta_dishwasher" in my_predbat.dashboard_values: + print("ERROR: Stale delete button should remove stale dishwasher binary sensor") + failed = 1 + if "button.predbat_load_forecast_delta_dishwasher_delete" in my_predbat.dashboard_values: + print("ERROR: Stale delete button should remove stale dishwasher delete button") + failed = 1 + + my_predbat.house_load_additional_forecast_overrides = {} + return failed + + +def test_additional_load_flexible_api_selection_survives_refresh(my_predbat): + """Test selected flexible API metadata augments, not replaces, the API command.""" + failed = 0 + configure_additional_load_test(my_predbat) + my_predbat.plan_interval_minutes = 15 + my_predbat.args["plan_interval_minutes"] = 15 + my_predbat.args["house_load_additional_forecast"] = [] + my_predbat.api_select("load_forecast_delta_api", "dishwasher?enabled=true&mode=flexible&end_time=22:00&duration=2.0&energy=1.2") + my_predbat.house_load_additional_forecast_overrides["dishwasher"] = { + "name": "dishwasher", + "_selected_start_minutes": 11 * 60 + 15, + "_selection_reason": "prediction_metric", + "_candidate_count": 50, + "_selected_metric": 1615.32, + "_baseline_metric": 1600.0, + "_expires_minutes": 13 * 60 + 15, + } + my_predbat.refresh_additional_load_forecast_api() + + forecast = my_predbat.house_load_additional_forecasts.get("dishwasher", {}) + if forecast.get("source") != "api" or not forecast.get("auto_expire"): + print("ERROR: Flexible API forecast should keep API source after selection refresh, got {}".format(forecast)) + failed = 1 + if forecast.get("mode") != "flexible" or forecast.get("energy") != 1.2 or forecast.get("duration") != 2.0: + print("ERROR: Flexible API forecast should keep command fields after selection refresh, got {}".format(forecast)) + failed = 1 + if forecast.get("state") != "off" or forecast.get("slots") != 0 or forecast.get("target_times") or forecast.get("selection_locked"): + print("ERROR: Flexible API forecast before suggested start should publish suggestion only, got {}".format(forecast)) + failed = 1 + if "T11:15:00" not in forecast.get("suggested_start", "") or "T13:15:00" not in forecast.get("suggested_end", ""): + print("ERROR: Flexible API forecast should publish selected window after refresh, got {}".format(forecast)) + failed = 1 + my_predbat.api_select("load_forecast_delta_api", "off") + my_predbat.house_load_additional_forecast_overrides = {} + return failed + + +def test_additional_load_flexible_api_reselects_before_suggested_start(my_predbat): + """Test selected flexible API forecasts can be reselected before their suggested start.""" + failed = 0 + configure_additional_load_test(my_predbat) + my_predbat.minutes_now = 10 * 60 + my_predbat.plan_interval_minutes = 15 + my_predbat.args["plan_interval_minutes"] = 15 + my_predbat.args["house_load_additional_forecast"] = [] + my_predbat.api_select("load_forecast_delta_api", "dishwasher?enabled=true&mode=flexible&start_time=10:00&end_time=22:00&duration=2.0&energy=1.2") + my_predbat.house_load_additional_forecast_overrides["dishwasher"] = { + "name": "dishwasher", + "_selected_start_minutes": 18 * 60, + "_selection_reason": "prediction_metric", + "_candidate_count": 20, + "_selected_metric": 200.0, + "_baseline_metric": 100.0, + "_expires_minutes": 20 * 60, + } + my_predbat.refresh_additional_load_forecast_api() + + forecast = my_predbat.house_load_additional_forecasts.get("dishwasher", {}) + if "T18:00:00" not in forecast.get("suggested_start", "") or forecast.get("target_times") or forecast.get("selection_locked"): + print("ERROR: Pre-start selected flexible API forecast should be suggestion only before reselection, got {}".format(forecast)) + failed = 1 + + my_predbat.charge_limit_best = [] + my_predbat.charge_window_best = [] + my_predbat.export_window_best = [] + my_predbat.export_limits_best = [] + my_predbat.end_record = my_predbat.forecast_minutes + original_prediction = plan_module.Prediction + + class FakePrediction: + """Fake prediction scores 12:00 as cheapest regardless of the previous suggestion.""" + + def __init__(self, base, pv_step, pv10_step, load_step, load10_step): + """Store load step data.""" + self.load_step = load_step + + def run_prediction(self, charge_limit, charge_window, export_window, export_limits, pv10, end_record): + """Return a metric based on when the injected load appears.""" + first_load_minute = None + for minute, load in self.load_step.items(): + if load > 0: + first_load_minute = my_predbat.minutes_now + minute if first_load_minute is None else min(first_load_minute, my_predbat.minutes_now + minute) + metric = abs(first_load_minute - 12 * 60) if first_load_minute is not None else 1000.0 + return (metric, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) + + try: + plan_module.Prediction = FakePrediction + selected, load_step, _ = my_predbat.select_flexible_additional_loads({}, {}, {}, {}) + finally: + plan_module.Prediction = original_prediction + + forecast = my_predbat.house_load_additional_forecasts.get("dishwasher", {}) + if not selected or "T12:00:00" not in forecast.get("suggested_start", "") or forecast.get("target_times") or forecast.get("selection_locked"): + print("ERROR: Pre-start flexible API forecast should reselect to 12:00 without committing target slots, got {}".format(forecast)) + failed = 1 + + my_predbat.api_select("load_forecast_delta_api", "off") + my_predbat.house_load_additional_forecast_overrides = {} + return failed + + +def test_additional_load_flexible_api_stale_selection_not_before_requested_start(my_predbat): + """Test stale selected flexible metadata is not published before the frozen requested start.""" + failed = 0 + configure_additional_load_test(my_predbat) + my_predbat.minutes_now = 12 * 60 + 30 + my_predbat.plan_interval_minutes = 15 + my_predbat.args["plan_interval_minutes"] = 15 + my_predbat.args["house_load_additional_forecast"] = [] + my_predbat.api_select("load_forecast_delta_api", "dishwasher?enabled=true&mode=flexible&end_time=07:00&duration=5.0&energy=0.7") + my_predbat.house_load_additional_forecast_overrides["dishwasher"] = { + "name": "dishwasher", + "_requested_start_minutes": 12 * 60 + 30, + "_selected_start_minutes": 12 * 60, + "_selection_reason": "prediction_metric", + "_candidate_count": 57, + "_selected_metric": -1737.07, + "_baseline_metric": -2007.2, + "_expires_minutes": 17 * 60, + } + my_predbat.refresh_additional_load_forecast_api() + + forecast = my_predbat.house_load_additional_forecasts.get("dishwasher", {}) + if "T12:30:00" not in forecast.get("requested_start", "") or "T12:30:00" not in forecast.get("suggested_start", ""): + print("ERROR: Flexible API stale selection should not start before requested_start, got {}".format(forecast)) + failed = 1 + if forecast.get("total_energy") != 0.7 or forecast.get("slots") != 20 or "T17:30:00" not in forecast.get("expires_at", ""): + print("ERROR: Flexible API stale selection should keep full shifted load and expiry, got {}".format(forecast)) + failed = 1 + + my_predbat.api_select("load_forecast_delta_api", "off") + my_predbat.house_load_additional_forecast_overrides = {} + return failed + + +def test_additional_load_flexible_api_locks_after_suggested_start(my_predbat): + """Test a selected flexible API forecast locks once the suggested start is reached and then expires.""" + failed = 0 + configure_additional_load_test(my_predbat) + my_predbat.minutes_now = 13 * 60 + my_predbat.plan_interval_minutes = 15 + my_predbat.args["plan_interval_minutes"] = 15 + my_predbat.args["house_load_additional_forecast"] = [] + my_predbat.api_select("load_forecast_delta_api", "dishwasher?enabled=true&mode=flexible&end_time=07:00&duration=5.0&energy=0.7") + my_predbat.house_load_additional_forecast_overrides["dishwasher"] = { + "name": "dishwasher", + "_requested_start_minutes": 12 * 60 + 30, + "_selected_start_minutes": 12 * 60 + 30, + "_selection_reason": "prediction_metric", + "_candidate_count": 57, + "_selected_metric": -1737.07, + "_baseline_metric": -2007.2, + "_expires_minutes": 17 * 60 + 30, + } + my_predbat.refresh_additional_load_forecast_api() + + forecast = my_predbat.house_load_additional_forecasts.get("dishwasher", {}) + target_times = forecast.get("target_times", []) + if not forecast.get("selection_locked") or not my_predbat.house_load_additional_forecast_overrides.get("dishwasher", {}).get("_selection_locked"): + print("ERROR: Flexible API forecast should lock after suggested start, got {}".format(forecast)) + failed = 1 + if "T12:30:00" not in forecast.get("suggested_start", "") or forecast.get("slots") != 18 or forecast.get("total_energy") != 0.63: + print("ERROR: Locked flexible API forecast should keep original start with remaining slots, got {}".format(forecast)) + failed = 1 + if not target_times or "T13:00:00" not in target_times[0].get("start", ""): + print("ERROR: Locked flexible API forecast should only publish remaining target slots, got {}".format(target_times)) + failed = 1 + + my_predbat.minutes_now = 17 * 60 + 30 + my_predbat.refresh_additional_load_forecast_api() + if my_predbat.house_load_additional_forecasts or my_predbat.api_select_update("load_forecast_delta_api"): + print("ERROR: Locked flexible API forecast should expire at suggested end, got forecasts {} api {}".format(my_predbat.house_load_additional_forecasts, my_predbat.api_select_update("load_forecast_delta_api"))) + failed = 1 + + my_predbat.api_select("load_forecast_delta_api", "off") + my_predbat.house_load_additional_forecast_overrides = {} + return failed + + +def test_additional_load_flexible_api_metadata_survives_restart(my_predbat): + """Test omitted start_time and selected flexible metadata survive API command reparse.""" + failed = 0 + configure_additional_load_test(my_predbat) + original_midnight = my_predbat.midnight_utc + my_predbat.minutes_now = 20 * 60 + 45 + my_predbat.plan_interval_minutes = 15 + my_predbat.args["plan_interval_minutes"] = 15 + my_predbat.args["house_load_additional_forecast"] = [] + my_predbat.api_select("load_forecast_delta_api", "dishwasher?enabled=true&mode=flexible&end_time=07:00&duration=5.0&energy=0.7") + my_predbat.refresh_additional_load_forecast_api() + my_predbat.update_additional_load_api_command_metadata( + "dishwasher", + { + "_selected_start": my_predbat.additional_load_minutes_to_stamp(21 * 60), + "_selected_end": my_predbat.additional_load_minutes_to_stamp(26 * 60), + "_expires_at": my_predbat.additional_load_minutes_to_stamp(26 * 60), + }, + ) + + api_command = my_predbat.api_select_update("load_forecast_delta_api")[0] + if "_requested_start=" not in api_command or "_selected_start=" not in api_command or "_expires_at=" not in api_command: + print("ERROR: API command should persist requested, selected, and expiry metadata, got {}".format(api_command)) + failed = 1 + + my_predbat.house_load_additional_forecast_overrides = {} + my_predbat.midnight_utc = original_midnight + timedelta(days=1) + my_predbat.minutes_now = 30 + my_predbat.refresh_additional_load_forecast_api() + + forecast = my_predbat.house_load_additional_forecasts.get("dishwasher", {}) + if not forecast.get("selection_locked") or "T21:00:00" not in forecast.get("suggested_start", "") or "T02:00:00" not in forecast.get("suggested_end", ""): + print("ERROR: Reparsed API command should restore old locked selection after restart, got {}".format(forecast)) + failed = 1 + + my_predbat.minutes_now = 20 * 60 + 45 + my_predbat.refresh_additional_load_forecast_api() + if my_predbat.house_load_additional_forecasts or my_predbat.api_select_update("load_forecast_delta_api"): + print("ERROR: Reparsed expired API command should be removed, got forecasts {} api {}".format(my_predbat.house_load_additional_forecasts, my_predbat.api_select_update("load_forecast_delta_api"))) + failed = 1 + + my_predbat.midnight_utc = original_midnight + my_predbat.api_select("load_forecast_delta_api", "off") + my_predbat.house_load_additional_forecast_overrides = {} + return failed + + +def test_additional_load_flexible_api_repeat_preserves_metadata(my_predbat): + """Test repeating an active API command preserves existing one-shot metadata.""" + failed = 0 + configure_additional_load_test(my_predbat) + my_predbat.minutes_now = 20 * 60 + 45 + my_predbat.plan_interval_minutes = 15 + my_predbat.args["plan_interval_minutes"] = 15 + my_predbat.args["house_load_additional_forecast"] = [] + api_command = "dishwasher?enabled=true&mode=flexible&end_time=07:00&duration=5.0&energy=0.7" + my_predbat.api_select("load_forecast_delta_api", api_command) + my_predbat.refresh_additional_load_forecast_api() + my_predbat.update_additional_load_api_command_metadata( + "dishwasher", + { + "_selected_start": my_predbat.additional_load_minutes_to_stamp(21 * 60), + "_selected_end": my_predbat.additional_load_minutes_to_stamp(26 * 60), + "_expires_at": my_predbat.additional_load_minutes_to_stamp(26 * 60), + }, + ) + + my_predbat.minutes_now = 21 * 60 + 15 + my_predbat.api_select("load_forecast_delta_api", api_command) + my_predbat.refresh_additional_load_forecast_api() + + stored_command = my_predbat.api_select_update("load_forecast_delta_api")[0] + forecast = my_predbat.house_load_additional_forecasts.get("dishwasher", {}) + if "_selected_start=" not in stored_command or "T21:00:00" not in forecast.get("suggested_start", "") or not forecast.get("selection_locked"): + print("ERROR: Repeated active API command should preserve selected metadata, command {} forecast {}".format(stored_command, forecast)) + failed = 1 + + my_predbat.api_select("load_forecast_delta_api", "off") + my_predbat.house_load_additional_forecast_overrides = {} + return failed + + +def test_additional_load_flexible_pending_until_plan(my_predbat): + """Test flexible additional load is left for plan-time prediction selection.""" + failed = 0 + configure_additional_load_test(my_predbat) + my_predbat.args["house_load_additional_forecast"] = [ + {"name": "dishwasher", "mode": "flexible", "duration": 2.0, "energy": 1.2}, + ] + + load_adjust, forecasts = my_predbat.fetch_additional_load_forecast() + if load_adjust: + print("ERROR: Flexible load should not produce adjustments until plan-time selection, got {}".format(load_adjust)) + failed = 1 + forecast = forecasts.get("dishwasher", {}) + if forecast.get("state") != "off" or forecast.get("selection_reason") != "pending_prediction_metric": + print("ERROR: Flexible load should publish pending prediction selection, got {}".format(forecast)) + failed = 1 + return failed + + +def test_additional_load_flexible_done_by_window(my_predbat): + """Test flexible end_time means done by, with omitted start_time using now.""" + failed = 0 + configure_additional_load_test(my_predbat) + my_predbat.minutes_now = 16 * 60 + my_predbat.args["house_load_additional_forecast"] = [ + {"name": "dishwasher", "mode": "flexible", "end_time": "07:00", "duration": 2.0, "energy": 1.2}, + ] + + _, forecasts = my_predbat.fetch_additional_load_forecast() + forecast = forecasts.get("dishwasher", {}) + if "T16:00:00" not in forecast.get("requested_start", "") or "T07:00:00" not in forecast.get("requested_end", ""): + print("ERROR: Flexible done-by window should run from now until 07:00, got {}".format(forecast)) + failed = 1 + return failed + + +def test_additional_load_flexible_done_by_next_reachable_deadline(my_predbat): + """Test flexible end_time rolls to the next reachable deadline when today's deadline cannot fit.""" + failed = 0 + configure_additional_load_test(my_predbat) + my_predbat.forecast_minutes = 30 * 60 + my_predbat.minutes_now = 6 * 60 + my_predbat.args["house_load_additional_forecast"] = [ + {"name": "dishwasher", "mode": "flexible", "end_time": "07:00", "duration": 5.0, "energy": 0.7}, + ] + + _, forecasts = my_predbat.fetch_additional_load_forecast() + forecast = forecasts.get("dishwasher", {}) + if "T06:00:00" not in forecast.get("requested_start", "") or "T07:00:00" not in forecast.get("requested_end", ""): + print("ERROR: Flexible done-by window should run from 06:00 to the next reachable 07:00, got {}".format(forecast)) + failed = 1 + if forecast.get("_requested_end_minutes") != 31 * 60: + print("ERROR: Flexible done-by deadline should roll to tomorrow 07:00, got {}".format(forecast)) + failed = 1 + + my_predbat.minutes_now = 30 + _, forecasts = my_predbat.fetch_additional_load_forecast() + forecast = forecasts.get("dishwasher", {}) + if forecast.get("_requested_end_minutes") != 7 * 60: + print("ERROR: Flexible done-by deadline should use today's 07:00 when the load fits, got {}".format(forecast)) + failed = 1 + + my_predbat.minutes_now = 3 * 60 + _, forecasts = my_predbat.fetch_additional_load_forecast() + forecast = forecasts.get("dishwasher", {}) + if forecast.get("_requested_end_minutes") != 31 * 60: + print("ERROR: Flexible done-by deadline should roll when 5h cannot fit by today's 07:00, got {}".format(forecast)) + failed = 1 + return failed + + +def test_additional_load_flexible_api_omitted_start_is_frozen(my_predbat): + """Test API flexible forecasts without start_time keep their initial requested start.""" + failed = 0 + configure_additional_load_test(my_predbat) + my_predbat.minutes_now = 16 * 60 + 15 + my_predbat.plan_interval_minutes = 15 + my_predbat.args["plan_interval_minutes"] = 15 + my_predbat.args["house_load_additional_forecast"] = [] + my_predbat.api_select("load_forecast_delta_api", "dishwasher?enabled=true&mode=flexible&end_time=07:00&duration=2.0&energy=1.2") + my_predbat.refresh_additional_load_forecast_api() + + forecast = my_predbat.house_load_additional_forecasts.get("dishwasher", {}) + first_requested_start = forecast.get("requested_start", "") + if "T16:15:00" not in first_requested_start: + print("ERROR: Flexible API omitted start should stamp initial plan slot, got {}".format(forecast)) + failed = 1 + + my_predbat.minutes_now = 17 * 60 + my_predbat.refresh_additional_load_forecast_api() + forecast = my_predbat.house_load_additional_forecasts.get("dishwasher", {}) + if forecast.get("requested_start", "") != first_requested_start: + print("ERROR: Flexible API omitted start should not drift after refresh, got {}".format(forecast)) + failed = 1 + + my_predbat.api_select("load_forecast_delta_api", "off") + my_predbat.house_load_additional_forecast_overrides = {} + return failed + + +def test_additional_load_flexible_yaml_omitted_start_rolls(my_predbat): + """Test YAML flexible forecasts without start_time continue using the current plan slot.""" + failed = 0 + configure_additional_load_test(my_predbat) + my_predbat.minutes_now = 16 * 60 + my_predbat.args["house_load_additional_forecast"] = [ + {"name": "dishwasher", "mode": "flexible", "end_time": "07:00", "duration": 2.0, "energy": 1.2}, + ] + + _, forecasts = my_predbat.fetch_additional_load_forecast() + first_requested_start = forecasts.get("dishwasher", {}).get("requested_start", "") + my_predbat.minutes_now = 17 * 60 + _, forecasts = my_predbat.fetch_additional_load_forecast() + second_requested_start = forecasts.get("dishwasher", {}).get("requested_start", "") + + if "T16:00:00" not in first_requested_start or "T17:00:00" not in second_requested_start: + print("ERROR: Flexible YAML omitted start should roll with current time, got {} then {}".format(first_requested_start, second_requested_start)) + failed = 1 + return failed + + +def test_additional_load_flexible_prediction_metric_selection(my_predbat): + """Test flexible additional load uses prediction metric, not raw import rate order.""" + failed = 0 + configure_additional_load_test(my_predbat) + my_predbat.minutes_now = 16 * 60 + my_predbat.forecast_minutes = 24 * 60 + my_predbat.args["house_load_additional_forecast"] = [ + {"name": "dishwasher", "mode": "flexible", "end_time": "07:00", "duration": 2.0, "energy": 1.2}, + ] + my_predbat.house_load_additional_forecast_adjust, my_predbat.house_load_additional_forecasts = my_predbat.fetch_additional_load_forecast() + my_predbat.charge_limit_best = [] + my_predbat.charge_window_best = [] + my_predbat.export_window_best = [] + my_predbat.export_limits_best = [] + my_predbat.end_record = my_predbat.forecast_minutes + + original_prediction = plan_module.Prediction + + class FakePrediction: + """Fake prediction scores 01:00 as cheapest regardless of candidate order.""" + + def __init__(self, base, pv_step, pv10_step, load_step, load10_step): + """Store load step data.""" + self.load_step = load_step + + def run_prediction(self, charge_limit, charge_window, export_window, export_limits, pv10, end_record): + """Return a metric based on when the injected load appears.""" + metric = 1000.0 + first_load_minute = None + for minute, load in self.load_step.items(): + if load > 0: + first_load_minute = my_predbat.minutes_now + minute if first_load_minute is None else min(first_load_minute, my_predbat.minutes_now + minute) + if first_load_minute is not None: + metric = abs(first_load_minute - 25 * 60) + return (metric, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) + + try: + plan_module.Prediction = FakePrediction + selected, load_step, _ = my_predbat.select_flexible_additional_loads({}, {}, {}, {}) + finally: + plan_module.Prediction = original_prediction + + if not selected: + print("ERROR: Flexible prediction metric selection should select a slot") + failed = 1 + forecast = my_predbat.house_load_additional_forecasts.get("dishwasher", {}) + if "T01:00:00" not in forecast.get("suggested_start", "") or forecast.get("selection_reason") != "prediction_metric" or forecast.get("target_times"): + print("ERROR: Flexible prediction metric should select 01:00, got {}".format(forecast)) + failed = 1 + if not load_step: + print("ERROR: Flexible prediction metric should include selected load in returned plan step data") + failed = 1 + return failed + + +def test_additional_load_textual_plan_summary(my_predbat): + """Test textual plan includes confirmed and suggested additional load forecasts only.""" + failed = 0 + configure_additional_load_test(my_predbat) + my_predbat.house_load_additional_forecasts = { + "dishwasher": { + "enabled": True, + "total_energy": 1.2, + "target_times": [ + {"start": "2026-05-07T10:00:00+02:00", "end": "2026-05-07T10:30:00+02:00", "energy": 0.6}, + {"start": "2026-05-07T10:30:00+02:00", "end": "2026-05-07T11:00:00+02:00", "energy": 0.6}, + ], + }, + "washer": { + "enabled": True, + "mode": "flexible", + "energy": 0.7, + "slot_energy": 0.0, + "duration": 5.0, + "plan_interval_minutes": 15, + "target_times": [], + "total_energy": 0.0, + "suggested_start": "2026-05-07T20:15:00+02:00", + "suggested_end": "2026-05-08T01:15:00+02:00", + }, + "dryer": { + "enabled": True, + "mode": "flexible", + "energy": 0.9, + "slot_energy": 0.0, + "duration": 3.0, + "plan_interval_minutes": 15, + "selection_locked": True, + "target_times": [ + {"start": "2026-05-07T21:15:00+02:00", "end": "2026-05-07T21:30:00+02:00", "energy": 0.075}, + {"start": "2026-05-07T21:30:00+02:00", "end": "2026-05-08T00:00:00+02:00", "energy": 0.825}, + ], + "total_energy": 0.825, + "suggested_start": "2026-05-07T21:00:00+02:00", + "suggested_end": "2026-05-08T00:00:00+02:00", + }, + "pending": {"enabled": True, "total_energy": 1.0, "target_times": []}, + } + + text = my_predbat.get_additional_load_text() + if "dishwasher from 10:00 to 11:00 using 1.20 kWh is planned" not in text: + print("ERROR: Textual plan should include planned dishwasher load, got {}".format(text)) + failed = 1 + if "washer is suggested from 20:15 to 01:15 using 0.70 kWh" not in text: + print("ERROR: Textual plan should include suggested washer load, got {}".format(text)) + failed = 1 + if "dryer is running from 21:00 to 00:00 using 0.90 kWh" not in text: + print("ERROR: Textual plan should include running dryer load, got {}".format(text)) + failed = 1 + if "pending" in text: + print("ERROR: Textual plan should not include pending load, got {}".format(text)) + failed = 1 + my_predbat.house_load_additional_forecasts = {} + return failed + + +def run_additional_load_forecast_tests(my_predbat): + """Run additional load forecast tests.""" + failed = 0 + print("Test additional load forecast") + failed |= test_additional_load_disabled(my_predbat) + failed |= test_additional_load_enabled_false_profile(my_predbat) + failed |= test_additional_load_dishwasher_simple(my_predbat) + failed |= test_additional_load_end_time_without_duration(my_predbat) + failed |= test_additional_load_slot_energy_weighting(my_predbat) + failed |= test_additional_load_dishwasher_total_energy(my_predbat) + failed |= test_additional_load_dishwasher_total_energy_weighting(my_predbat) + failed |= test_additional_load_partial_duration_keeps_total_energy(my_predbat) + failed |= test_additional_load_multiple_and_api_override(my_predbat) + failed |= test_additional_load_select_api_override(my_predbat) + failed |= test_additional_load_select_api_weighting(my_predbat) + failed |= test_additional_load_select_event_updates_adjustment(my_predbat) + failed |= test_additional_load_delete_button_removes_api_forecast(my_predbat) + failed |= test_additional_load_delete_button_removes_sanitized_api_forecast(my_predbat) + failed |= test_additional_load_yaml_does_not_publish_delete_button(my_predbat) + failed |= test_additional_load_api_forecast_auto_expires(my_predbat) + failed |= test_additional_load_yaml_placeholder_not_published(my_predbat) + failed |= test_additional_load_stale_delete_button_no_replan(my_predbat) + failed |= test_additional_load_flexible_api_selection_survives_refresh(my_predbat) + failed |= test_additional_load_flexible_api_reselects_before_suggested_start(my_predbat) + failed |= test_additional_load_flexible_api_stale_selection_not_before_requested_start(my_predbat) + failed |= test_additional_load_flexible_api_locks_after_suggested_start(my_predbat) + failed |= test_additional_load_flexible_api_metadata_survives_restart(my_predbat) + failed |= test_additional_load_flexible_api_repeat_preserves_metadata(my_predbat) + failed |= test_additional_load_flexible_pending_until_plan(my_predbat) + failed |= test_additional_load_flexible_done_by_window(my_predbat) + failed |= test_additional_load_flexible_done_by_next_reachable_deadline(my_predbat) + failed |= test_additional_load_flexible_api_omitted_start_is_frozen(my_predbat) + failed |= test_additional_load_flexible_yaml_omitted_start_rolls(my_predbat) + failed |= test_additional_load_flexible_prediction_metric_selection(my_predbat) + failed |= test_additional_load_textual_plan_summary(my_predbat) + return failed diff --git a/apps/predbat/unit_test.py b/apps/predbat/unit_test.py index 5d03b84af..6933c5a43 100644 --- a/apps/predbat/unit_test.py +++ b/apps/predbat/unit_test.py @@ -118,6 +118,7 @@ from tests.test_discard_unused_charge_slots import run_discard_unused_charge_slots_tests from tests.test_discard_unused_export_slots import run_discard_unused_export_slots_tests from tests.test_marginal_costs import test_marginal_costs +from tests.test_additional_load_forecast import run_additional_load_forecast_tests from tests.test_savings_stability import test_savings_stability @@ -215,6 +216,7 @@ def main(): ("dynamic_load_car", test_dynamic_load_car_slot_cancellation, "Dynamic load car slot cancellation tests", False), ("units", run_test_units, "Unit tests", False), ("manual_api", run_test_manual_api, "Manual API tests", False), + ("additional_load_forecast", run_additional_load_forecast_tests, "Additional load forecast tests", False), ("manual_soc", run_test_manual_soc, "Manual SOC target tests", False), ("manual_times", run_test_manual_times, "Manual times tests", False), ("manual_select", run_test_manual_select, "Manual select tests", False), diff --git a/apps/predbat/userinterface.py b/apps/predbat/userinterface.py index d2617f4dc..2d1212b6f 100644 --- a/apps/predbat/userinterface.py +++ b/apps/predbat/userinterface.py @@ -309,8 +309,9 @@ async def select_event(self, event, data, kwargs): if isinstance(entities, str): entities = [entities] - for entity_id in entities: - await self.components.select_event(entity_id, value) + if self.components: + for entity_id in entities: + await self.components.select_event(entity_id, value) for item in self.CONFIG_ITEMS: if ("entity" in item) and (item["entity"] in entities): @@ -331,6 +332,8 @@ async def select_event(self, event, data, kwargs): await self.async_manual_select(item["name"], value) elif item.get("api"): await self.async_api_select(item["name"], value) + if item["name"] == "load_forecast_delta_api": + await self.run_in_executor(self.refresh_additional_load_forecast_api) else: if item.get("value", None) != value: await self.async_expose_config(item["name"], value, event=True) @@ -406,8 +409,9 @@ async def switch_event(self, event, data, kwargs): if isinstance(entities, str): entities = [entities] - for entity_id in entities: - await self.components.switch_event(entity_id, service) + if self.components: + for entity_id in entities: + await self.components.switch_event(entity_id, service) for item in self.CONFIG_ITEMS: if ("entity" in item) and (item["entity"] in entities): @@ -428,6 +432,24 @@ async def switch_event(self, event, data, kwargs): self.update_pending = True self.plan_valid = False + async def button_event(self, event, data, kwargs): + """ + Catch HA button press events. + """ + service_data = data.get("service_data", {}) + entities = service_data.get("entity_id", []) + + if isinstance(entities, str): + entities = [entities] + + for entity_id in entities: + if entity_id.startswith("button.{}_load_forecast_delta_".format(self.prefix)) and entity_id.endswith("_delete"): + name = self.additional_load_name_from_entity(entity_id) + if name: + if self.delete_additional_load_forecast(name): + self.update_pending = True + self.plan_valid = False + def get_ha_config(self, name, default): """ Get Home assistant config value, use default if not set @@ -866,6 +888,7 @@ def define_service_list(self): {"domain": "switch", "service": "turn_on"}, {"domain": "switch", "service": "turn_off"}, {"domain": "switch", "service": "toggle"}, + {"domain": "button", "service": "press"}, {"domain": "select", "service": "select_option"}, {"domain": "select", "service": "select_first"}, {"domain": "select", "service": "select_last"}, @@ -876,6 +899,7 @@ def define_service_list(self): {"domain": "switch", "service": "turn_on", "callback": self.switch_event}, {"domain": "switch", "service": "turn_off", "callback": self.switch_event}, {"domain": "switch", "service": "toggle", "callback": self.switch_event}, + {"domain": "button", "service": "press", "callback": self.button_event}, {"domain": "input_number", "service": "set_value", "callback": self.number_event}, {"domain": "input_number", "service": "increment", "callback": self.number_event}, {"domain": "input_number", "service": "decrement", "callback": self.number_event}, @@ -1165,6 +1189,12 @@ def manual_select(self, config_item, value): if value.startswith("+"): # Ignore selections which are just the current value return + if config_item == "load_forecast_delta_api" and value != "off": + name = value.split("?", 1)[0].split("=", 1)[0].replace("[", "").replace("]", "") + if "[" not in value and self.has_additional_load_api_command(name): + value = self.preserve_additional_load_api_metadata(value) + else: + self.house_load_additional_forecast_overrides.pop(name, None) values = item.get("value", "") if not values: values = "" @@ -1247,6 +1277,12 @@ def api_select(self, config_item, value): if value.startswith("+"): # Ignore selections which are just the current value return + if config_item == "load_forecast_delta_api" and value != "off" and "[" not in value: + name = value.split("?", 1)[0].split("=", 1)[0] + if self.has_additional_load_api_command(name): + value = self.preserve_additional_load_api_metadata(value) + else: + self.house_load_additional_forecast_overrides.pop(name, None) values = item.get("value", "") if not values: values = "" diff --git a/docs/apps-yaml.md b/docs/apps-yaml.md index 26b48fa79..751ee8b07 100644 --- a/docs/apps-yaml.md +++ b/docs/apps-yaml.md @@ -1647,6 +1647,149 @@ Set **load_forecast_only** to `true` if you do not wish to use the Predbat forec - sensor.givtcp_{geserial}_load_energy_today_kwh_prediction$results ``` +## Additional House Load Forecast + +In addition to the normal historical or ML load forecast, Predbat can add named future load deltas to the forward plan. This is intended for known loads that may not be represented well by history, such as a dishwasher, cooking, hot water, or heating demand. + +Each item in **house_load_additional_forecast** is labelled by **name** and can be updated later from a Home Assistant automation using **select.predbat_load_forecast_delta_api**. + +For appliances where you know the total cycle energy, use **energy** in kWh. Predbat will divide the total across the generated plan slots: + +```yaml + house_load_additional_forecast: + - name: dishwasher + start_time: "20:00" + duration: 2.0 + energy: 1.2 +``` + +With a 15-minute plan interval this adds 0.15kWh to each of the eight slots from 20:00 to 22:00, for a total of 1.2kWh. + +For advanced cases, use **slot_energy** when you want to set the kWh for each Predbat plan slot directly: + +```yaml + house_load_additional_forecast: + - name: heating + start_time: "20:00" + duration: 2.0 + slot_energy: 0.5 +``` + +The **slot_energy** value is kWh per Predbat plan slot, not the total energy for the full duration. With the default 30-minute plan interval, the example above adds 0.5kWh to each of the four slots from 20:00 to 22:00, for a total of 2.0kWh. With a 15-minute plan interval it would add 0.5kWh to each of eight slots, for a total of 4.0kWh. + +Set **enabled** to `false` to leave a named load configured but disabled by default. Predbat will still publish the binary sensor, but it will not add any load to the plan until the API sends `enabled=true`. + +```yaml + house_load_additional_forecast: + - name: dishwasher_eco + enabled: false + start_time: "20:00" + duration: 2.0 + energy: 1.2 +``` + +Set **duration** to `0` to disable an entry completely. + +You can optionally set **end_time** instead of **duration**: + +```yaml + house_load_additional_forecast: + - name: cooking + start_time: "18:00" + end_time: "19:30" + energy: 1.2 +``` + +You can optionally set **weighting** to change the load profile across the duration. A `*` means normal weight `1.0`; if fewer weights are supplied than slots, the final weight is repeated. With **energy**, weighting redistributes the total energy without changing the total. With **slot_energy**, weighting multiplies the per-slot energy. + +```yaml + house_load_additional_forecast: + - name: heating + start_time: "20:00" + duration: 2.0 + slot_energy: 0.5 + weighting: "2,2,*" +``` + +With a 30-minute plan interval this adds 1.0kWh, 1.0kWh, 0.5kWh, and 0.5kWh over the four slots. + +Using total **energy** with weighting: + +```yaml + house_load_additional_forecast: + - name: dishwasher + start_time: "20:00" + duration: 2.0 + energy: 1.2 + weighting: "2,2,*" +``` + +With a 30-minute plan interval this redistributes 1.2kWh as 0.4kWh, 0.4kWh, 0.2kWh, and 0.2kWh. + +Set **mode** to `flexible` when the load can run at any time before a deadline. For flexible loads, **start_time** means the earliest allowed start and **end_time** means the load must be done by that time. If **start_time** is omitted in `apps.yaml`, Predbat uses the current plan slot each time the forecast is refreshed; if **end_time** is omitted, Predbat uses the remaining forecast horizon. + +Predbat scores flexible candidates with the prediction metric, not just the import rate. This means the suggested time considers the current plan, solar forecast, battery state, import/export rates, losses, and other predicted load. + +```yaml + house_load_additional_forecast: + - name: dishwasher_eco + enabled: false + mode: flexible + duration: 2.0 + energy: 1.2 +``` + +You can also restrict a flexible load to a same-day or overnight done-by window: + +```yaml + house_load_additional_forecast: + - name: dishwasher_eco + enabled: false + mode: flexible + start_time: "22:00" + end_time: "07:00" + duration: 2.0 + energy: 1.2 +``` + +If this is enabled at 16:00, the example above means the load may start any time from 22:00 and must finish by 07:00. If **start_time** is omitted, for example `end_time: "07:00"`, the YAML load may start any time from now and must finish by 07:00. Because YAML entries are static configuration, this "now" rolls forward on each forecast refresh. + +Predbat publishes each named load as a binary sensor, for example **binary_sensor.predbat_load_forecast_delta_dishwasher**, with a **target_times** attribute showing the generated slots. The sensor attributes also show **enabled**, **mode**, **energy**, **slot_energy**, **load_mode**, **plan_interval_minutes**, **slots**, **total_energy**, **source**, **auto_expire**, **expires_at**, and for flexible loads **requested_start**, **requested_end**, **suggested_start**, **suggested_end**, **selection_reason**, and **candidate_count** so you can confirm how much load will be added and when. + +Forecasts created through **select.predbat_load_forecast_delta_api** are one-shot dynamic loads. Predbat publishes a delete button for these forecasts, for example **button.predbat_load_forecast_delta_dishwasher_delete**, and automatically removes them after their finish time. YAML entries are static load injections and do not get delete buttons; remove or edit them in `apps.yaml` instead. + +To update a named load from a Home Assistant automation, call **select.select_option** on **select.predbat_load_forecast_delta_api** with the format `name?start_time=HH:MM&duration=hours&energy=kWh`: + +```yaml +action: select.select_option +target: + entity_id: select.predbat_load_forecast_delta_api +data: + option: "dishwasher?start_time=20:00&duration=2.0&energy=1.2" +``` + +For a flexible API update, include `mode=flexible` and optionally `enabled=true`: + +```yaml +action: select.select_option +target: + entity_id: select.predbat_load_forecast_delta_api +data: + option: "dishwasher_eco?enabled=true&mode=flexible&start_time=22:00&end_time=07:00&duration=2.0&energy=1.2" +``` + +To run any time from now but be done by 07:00, omit **start_time**: + +```yaml +action: select.select_option +target: + entity_id: select.predbat_load_forecast_delta_api +data: + option: "dishwasher_eco?enabled=true&mode=flexible&end_time=07:00&duration=2.0&energy=1.2" +``` + +For one-shot forecasts created through **select.predbat_load_forecast_delta_api**, an omitted **start_time** is frozen to the current Predbat plan slot when the command is received. This prevents the published **requested_start** drifting forward on later replans. Sending the command again creates a fresh request and freezes a new start time. + ## Balance Inverters When you have two or more inverters it's possible they get out of sync so they are at different charge levels or they start to cross-charge (one discharges into another). diff --git a/docs/manual-api.md b/docs/manual-api.md index 47276f29b..7de97353f 100644 --- a/docs/manual-api.md +++ b/docs/manual-api.md @@ -155,3 +155,250 @@ entities: You simply enter the date, start time, end time and load percentage adjustment (e.g. 0.5=50%), then click the 'Execute' button. The load adjustment details will be sent to the Predbat manual API and you will see the load change and a small +/- symbol against the export rate in the Predbat plan. + +## Updating additional house load forecasts + +Named entries configured with [house_load_additional_forecast](apps-yaml.md#additional-house-load-forecast) can be updated from a Home Assistant automation using **select.predbat_load_forecast_delta_api**. This uses Home Assistant's standard **select.select_option** action, so it is visible in Developer Tools and automations. + +For example, to schedule a dishwasher load: + +```yaml +action: select.select_option +target: + entity_id: select.predbat_load_forecast_delta_api +data: + option: "dishwasher?start_time=20:00&duration=2.0&energy=1.2" +``` + +The **energy** value is the total kWh across the full duration. Predbat divides it across the generated plan slots. + +Forecasts created through **select.predbat_load_forecast_delta_api** are one-shot dynamic loads. Predbat publishes a delete button for each of these forecasts, for example **button.predbat_load_forecast_delta_dishwasher_delete**, and automatically removes the forecast after its finish time. If you want the same forecast again, send the select command again. + +While a one-shot forecast is active, Predbat preserves hidden request and selected-window metadata in the stored selector option so the schedule survives Home Assistant or Predbat restarts. Sending the same command again while it is still active does not reset the frozen request time or move a locked/running forecast. To force a fresh schedule, press the forecast delete button first and then send the request again. + +If the appliance can run at any time before a deadline, send `mode=flexible`. For flexible loads, `start_time` is the earliest allowed start and `end_time` means done by. Predbat chooses the best block using the full prediction metric, so the selection considers solar, battery state, import/export rates, losses, and the current plan rather than just the import rate: + +```yaml +action: select.select_option +target: + entity_id: select.predbat_load_forecast_delta_api +data: + option: "dishwasher?enabled=true&mode=flexible&start_time=22:00&end_time=07:00&duration=2.0&energy=1.2" +``` + +To allow the dishwasher to start any time from now but be done by 07:00, omit `start_time`: + +```yaml +action: select.select_option +target: + entity_id: select.predbat_load_forecast_delta_api +data: + option: "dishwasher?enabled=true&mode=flexible&end_time=07:00&duration=2.0&energy=1.2" +``` + +For one-shot forecasts created through **select.predbat_load_forecast_delta_api**, an omitted `start_time` is frozen to the current Predbat plan slot when the command is received. This prevents the published `requested_start` moving forward on later replans. If you send the same command again, Predbat treats it as a fresh request and freezes a new start time. Static YAML forecasts are different: if a YAML flexible load omits `start_time`, it continues to mean the current plan slot each time Predbat refreshes the forecast. + +Predbat publishes the result as a binary sensor, for example **binary_sensor.predbat_load_forecast_delta_dishwasher**. For flexible loads, the most useful attributes are **requested_start**, **requested_end**, **suggested_start**, **suggested_end**, **target_times**, and **expires_at**. + +A typical dishwasher automation can therefore send a one-shot flexible request when the dishwasher is ready: + +```yaml +action: select.select_option +target: + entity_id: select.predbat_load_forecast_delta_api +data: + option: "dishwasher?enabled=true&mode=flexible&end_time=07:00&duration=5&energy=0.7" +``` + +Use **suggested_start** from **binary_sensor.predbat_load_forecast_delta_dishwasher** to trigger the appliance start automation. Use **button.predbat_load_forecast_delta_dishwasher_delete** if you need to cancel the one-shot request before it expires. + +### Example dishwasher scheduling automations + +The following example uses four Home Assistant automations: + +- Request a Predbat flexible load schedule when the dishwasher is ready. +- Start the dishwasher when Predbat reaches the published **suggested_start**. +- Clear the Predbat schedule if the dishwasher is started manually instead. +- Reset the Predbat-start helper after the scheduled run is no longer active. + +First create a helper to track whether Predbat started the dishwasher. This prevents the manual-start cleanup automation deleting the schedule when Predbat itself starts the appliance: + +```yaml +input_boolean: + dishwasher_started_by_predbat: + name: Dishwasher started by Predbat + icon: mdi:dishwasher +``` + +Replace the switch and sensor entity IDs with the entities for your dishwasher. + +```yaml +alias: Dishwasher - Request Predbat Schedule +description: Request cheapest dishwasher run window from Predbat +triggers: + - trigger: state + entity_id: switch.dishwasher_power + to: "on" +conditions: + - condition: state + entity_id: input_boolean.dishwasher_started_by_predbat + state: "off" +actions: + - delay: + seconds: 20 + - condition: state + entity_id: input_boolean.dishwasher_started_by_predbat + state: "off" + - condition: state + entity_id: sensor.dishwasher_operation_state + state: ready + - wait_template: | + {{ is_state('sensor.dishwasher_door', 'closed') }} + timeout: "00:01:00" + continue_on_timeout: false + - action: select.select_option + target: + entity_id: select.predbat_load_forecast_delta_api + data: + option: >- + dishwasher?enabled=true&mode=flexible&end_time=07:00&duration=5&energy=0.7 +mode: single +``` + +The next automation checks every 15 minutes and starts the dishwasher once the current time reaches Predbat's **suggested_start**. Replace **YOUR_DISHWASHER_DEVICE_ID** and the program value with the values for your appliance. + +```yaml +alias: Dishwasher - Start On Predbat Schedule +description: Start dishwasher at Predbat suggested start time +triggers: + - trigger: time_pattern + minutes: /15 +conditions: + - condition: template + value_template: | + {{ + state_attr( + 'binary_sensor.predbat_load_forecast_delta_dishwasher', + 'suggested_start' + ) is not none + }} + - condition: template + value_template: | + {% set start = state_attr( + 'binary_sensor.predbat_load_forecast_delta_dishwasher', + 'suggested_start' + ) %} + {% if start %} + {{ now().timestamp() >= as_timestamp(start) }} + {% else %} + false + {% endif %} + - condition: state + entity_id: input_boolean.dishwasher_started_by_predbat + state: "off" +actions: + - action: input_boolean.turn_on + target: + entity_id: input_boolean.dishwasher_started_by_predbat + - if: + - condition: state + entity_id: switch.dishwasher_power + state: "off" + then: + - action: switch.turn_on + target: + entity_id: switch.dishwasher_power + - delay: + seconds: 20 + - condition: state + entity_id: sensor.dishwasher_operation_state + state: ready + - condition: state + entity_id: sensor.dishwasher_door + state: closed + - action: home_connect.set_program_and_options + data: + device_id: YOUR_DISHWASHER_DEVICE_ID + affects_to: active_program + program: YOUR_DISHWASHER_PROGRAM +mode: single +``` + +The next automation removes the Predbat request if the dishwasher is started manually before Predbat reaches **suggested_start**. It only runs when **input_boolean.dishwasher_started_by_predbat** is off, so it will not delete the schedule when the previous automation started the dishwasher for Predbat: + +```yaml +alias: Dishwasher - Clear Predbat Schedule When Manually Started +description: Remove Predbat schedule if dishwasher starts manually +triggers: + - trigger: state + entity_id: sensor.dishwasher_operation_state + from: ready + to: + - run + - delayedstart + - pause +conditions: + - condition: state + entity_id: input_boolean.dishwasher_started_by_predbat + state: "off" + - condition: template + value_template: | + {{ + state_attr( + 'binary_sensor.predbat_load_forecast_delta_dishwasher', + 'suggested_start' + ) is not none + }} +actions: + - action: button.press + target: + entity_id: button.predbat_load_forecast_delta_dishwasher_delete +mode: single +``` + +The final automation resets **input_boolean.dishwasher_started_by_predbat** after the Predbat one-shot forecast has disappeared or after the dishwasher returns to a non-running state. This makes the manual-start cleanup automation ready for the next dishwasher cycle without clearing the current Predbat schedule too early: + +```yaml +alias: Dishwasher - Reset Predbat Started Marker +description: Reset Predbat helper after scheduled dishwasher run +triggers: + - trigger: state + entity_id: binary_sensor.predbat_load_forecast_delta_dishwasher + to: + - unavailable + - unknown + - trigger: state + entity_id: sensor.dishwasher_operation_state + to: + - ready + - inactive + - finished + - off +conditions: + - condition: state + entity_id: input_boolean.dishwasher_started_by_predbat + state: "on" +actions: + - action: input_boolean.turn_off + target: + entity_id: input_boolean.dishwasher_started_by_predbat +mode: single +``` + +You can add appliance-specific options, such as quiet or night mode, inside the Home Connect action if your appliance supports them. + +Use `enabled=false` in `apps.yaml` to keep static load injection profiles visible but inactive until an automation sends an API forecast with the same name. + +For advanced cases, you can use **slot_energy** instead when you want to set kWh per Predbat plan slot directly. With the default 30-minute plan interval, `slot_energy: 0.5` adds 0.5kWh to each slot for two hours. + +You can also include **weighting** to model a higher load at the start of a cycle: + +```yaml +action: select.select_option +target: + entity_id: select.predbat_load_forecast_delta_api +data: + option: "dishwasher?start_time=20:00&duration=2.0&energy=1.2&weighting=2|2|*" +``` + +With **energy**, weighting redistributes the total energy without changing the total. With **slot_energy**, weighting multiplies the per-slot energy. Use `|` as the weighting separator when sending commands through **select.predbat_load_forecast_delta_api**, because Home Assistant select options are stored as comma-separated values internally.