Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions apps/predbat/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,18 @@
"enable": "expert_mode",
"default": 0.0,
},
{
"name": "rate_history_days_average",
"friendly_name": "Rate History Days Average",
"type": "input_number",
"min": 0,
"max": 30,
"step": 1,
"unit": "days",
"icon": "mdi:chart-bell-curve",
"enable": "expert_mode",
"default": 0,
},
{
"name": "metric_inday_adjust_damping",
"friendly_name": "In-day adjustment damping factor",
Expand Down Expand Up @@ -2164,6 +2176,10 @@
"futurerate_adjust_import": {"type": "boolean"},
"futurerate_adjust_export": {"type": "boolean"},
"futurerate_adjust_auto": {"type": "boolean"},
"rate_history_source_import": {"type": "string", "empty": False},
"rate_history_source_export": {"type": "string", "empty": False},
"rate_history_scaling_import": {"type": "float"},
"rate_history_scaling_export": {"type": "float"},
"futurerate_peak_start": {"type": "string", "empty": False},
"futurerate_peak_end": {"type": "string", "empty": False},
"octopus_region": {"type": "string", "empty": False},
Expand Down
131 changes: 126 additions & 5 deletions apps/predbat/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
dictionaries for use by the prediction engine.
"""

from datetime import datetime, timedelta
import bisect
from datetime import datetime, timedelta, timezone
from utils import minutes_to_time, str2time, dp1, dp2, dp3, dp4, time_string_to_stamp, minute_data, get_now_from_cumulative
from const import MINUTE_WATT, PREDICT_STEP, TIME_FORMAT, PREDBAT_MODE_OPTIONS, PREDBAT_MODE_CONTROL_SOC, PREDBAT_MODE_CONTROL_CHARGEDISCHARGE, PREDBAT_MODE_CONTROL_CHARGE, PREDBAT_MODE_MONITOR
from predbat_metrics import metrics
Expand Down Expand Up @@ -939,7 +940,17 @@ def fetch_sensor_data(self, save=True):
self.rate_scan(self.rate_import, print=False)
self.rate_max_base = self.rate_max # True peak rate before saving sessions / overrides inflate it
self.rate_min_base = self.rate_min # True off-peak rate before free sessions / overrides deflate it
self.rate_import, self.rate_import_replicated = self.rate_replicate(self.rate_import, self.io_adjusted, is_import=True)
history_buckets_import = None
if self.rate_history_days_average > 0:
source = self.rate_history_source_import or self.get_arg("metric_octopus_import", indirect=False)
history_buckets_import = self.build_rate_history_buckets(
source, self.rate_history_days_average, scaling=self.rate_history_scaling_import
)
if history_buckets_import:
self.log("Built rate history buckets from {} days of {}".format(self.rate_history_days_average, source))
self.rate_import, self.rate_import_replicated = self.rate_replicate(
self.rate_import, self.io_adjusted, is_import=True, history_buckets=history_buckets_import
)
self.rate_import_no_io = self.rate_import.copy()
for car_n in range(self.num_cars):
self.rate_import = self.rate_add_io_slots(car_n, self.rate_import, self.octopus_slots[car_n])
Expand All @@ -957,7 +968,15 @@ def fetch_sensor_data(self, save=True):
# Replicate and scan export rates
if self.rate_export:
self.rate_scan_export(self.rate_export, print=False)
self.rate_export, self.rate_export_replicated = self.rate_replicate(self.rate_export, is_import=False)
history_buckets_export = None
if self.rate_history_days_average > 0:
source = self.rate_history_source_export or self.get_arg("metric_octopus_export", indirect=False)
history_buckets_export = self.build_rate_history_buckets(
source, self.rate_history_days_average, scaling=self.rate_history_scaling_export
)
self.rate_export, self.rate_export_replicated = self.rate_replicate(
self.rate_export, is_import=False, history_buckets=history_buckets_export
)
# For export tariff only load the saving session if enabled
if self.rate_export_max > 0:
self.load_saving_slot(self.octopus_saving_slots, export=True, rate_replicate=self.rate_export_replicated)
Expand Down Expand Up @@ -1372,9 +1391,99 @@ def download_ge_data(self, now_utc):
self.log("Downloaded {} datapoints from GECloudData going back {} days".format(len(self.load_minutes), self.load_minutes_age))
return True

def rate_replicate(self, rates, rate_io={}, is_import=True, is_gas=False):
def build_rate_history_buckets(self, entity_id, days, scaling=1.0):
"""
We don't get enough hours of data for Octopus, so lets assume it repeats until told others
Build 48 half-hour-of-day mean rate buckets from N days of HA recorder history.

For each historical half-hour slot in the lookback window, the rate in effect
at that slot's start is sampled via step-function lookup (latest history row
with timestamp <= slot start). This yields one sample per slot per day,
avoiding the spike bias that comes from counting state-change events directly
(HA's ``minimal_response`` history returns only state transitions, so a brief
spike contributes the same weight as a long plateau).

Returns a ``{0..47: mean_rate}`` dict in the planner's rate units (after
applying ``scaling`` to the raw sensor state), or ``None`` if no usable
history is available. Empty buckets fall back to the overall mean across all
sampled slots so every bucket has a value.

Args:
entity_id: HA entity to query (e.g. ``sensor.amber_general_price``).
days: Number of days of history to look back (must be > 0).
scaling: Multiplier applied to each raw state value to convert into the
planner's rate units (e.g. ``100.0`` to convert $/kWh to c/kWh).
"""
if not entity_id or days <= 0:
return None

history = self.get_history_wrapper(entity_id=entity_id, days=days, required=False)
if not history or not history[0]:
self.log("Warn: rate_history_days_average enabled but no history for {}".format(entity_id))
return None

# Build a sorted timeline of (utc_timestamp, value) samples from history.
timeline = []
for record in history[0]:
state = record.get("state")
if state in (None, "", "unknown", "unavailable"):
continue
try:
value = float(state) * scaling
except (TypeError, ValueError):
continue
ts_raw = record.get("last_changed") or record.get("last_updated")
if isinstance(ts_raw, datetime):
ts = ts_raw
elif isinstance(ts_raw, str):
try:
ts = datetime.fromisoformat(ts_raw.replace("Z", "+00:00"))
except ValueError:
continue
else:
continue
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
timeline.append((ts, value))

if not timeline:
return None
timeline.sort(key=lambda x: x[0])
timeline_ts = [t for t, _ in timeline]

# Walk every 30-minute slot start over the lookback window. For each slot,
# the rate in effect is the latest sample with ts <= slot_start.
buckets = {i: [] for i in range(48)}
slot = self.now_utc - timedelta(days=days)
# Align to a clean half-hour boundary at or after the lookback start.
if slot.minute % 30 or slot.second or slot.microsecond:
offset = 30 - (slot.minute % 30)
slot = (slot + timedelta(minutes=offset)).replace(second=0, microsecond=0)
Comment on lines +1459 to +1460

end = self.now_utc
while slot < end:
idx = bisect.bisect_right(timeline_ts, slot) - 1
if idx >= 0:
rate_value = timeline[idx][1]
slot_local = slot.astimezone(self.local_tz)
bucket = slot_local.hour * 2 + (1 if slot_local.minute >= 30 else 0)
buckets[bucket].append(rate_value)
slot += timedelta(minutes=30)

all_samples = [v for vs in buckets.values() for v in vs]
if not all_samples:
return None
overall_mean = sum(all_samples) / len(all_samples)
return {i: (sum(vs) / len(vs)) if vs else overall_mean for i, vs in buckets.items()}

def rate_replicate(self, rates, rate_io={}, is_import=True, is_gas=False, history_buckets=None):
"""
We don't get enough hours of data for Octopus, so lets assume it repeats until told others.

When ``history_buckets`` is supplied (from :meth:`build_rate_history_buckets`),
gaps not satisfied by 24h-back copy are filled from the historical half-hour-of-day
mean instead of the ``rate_last`` constant fallback. This restores diurnal shape
for users whose live rate sensor only publishes a short forward window
(e.g. Amber Australia's 16h window).
"""
minute = -24 * 60
rate_last = 0
Expand Down Expand Up @@ -1403,6 +1512,13 @@ def rate_replicate(self, rates, rate_io={}, is_import=True, is_gas=False):
elif minute_mod in rates:
rate_offset = rates[minute_mod]
using_last = False
elif history_buckets:
# No 24h-back or modulo match: use historical half-hour-of-day mean.
slot_local = (self.midnight_utc + timedelta(minutes=minute)).astimezone(self.local_tz)
bucket = slot_local.hour * 2 + (1 if slot_local.minute >= 30 else 0)
rate_offset = history_buckets[bucket]
adjust_type = "history_avg"
using_last = False
else:
# Missing rate within 24 hours - fill with dummy last rate
rate_offset = rate_last
Expand Down Expand Up @@ -2144,6 +2260,11 @@ def fetch_config_options(self):
self.metric_self_sufficiency = self.get_arg("metric_self_sufficiency")
self.metric_future_rate_offset_import = self.get_arg("metric_future_rate_offset_import")
self.metric_future_rate_offset_export = self.get_arg("metric_future_rate_offset_export")
self.rate_history_days_average = int(self.get_arg("rate_history_days_average", 0) or 0)
self.rate_history_source_import = self.get_arg("rate_history_source_import", None, indirect=False) or None
self.rate_history_source_export = self.get_arg("rate_history_source_export", None, indirect=False) or None
self.rate_history_scaling_import = float(self.get_arg("rate_history_scaling_import", 1.0) or 1.0)
self.rate_history_scaling_export = float(self.get_arg("rate_history_scaling_export", 1.0) or 1.0)
self.metric_inday_adjust_damping = self.get_arg("metric_inday_adjust_damping")
self.metric_pv_calibration_enable = self.get_arg("metric_pv_calibration_enable")
self.metric_dynamic_load_adjust = self.get_arg("metric_dynamic_load_adjust")
Expand Down
Loading