Custom Actions & Logic
Action anatomy recap
Example: hysteresis_clamp action
hysteresis_clamp actionImplementation (extensions/actions/hysteresis.py)
extensions/actions/hysteresis.py)# All comments in English only.
from __future__ import annotations
from typing import Any, Optional
from spx_sdk.actions import Action
from spx_sdk.registry import register_class
from spx_sdk.attributes.resolve_attribute import resolve_attribute_reference_hierarchical
def _coerce_float(value: Any, default: float) -> float:
try:
return float(value)
except (TypeError, ValueError):
return default
@register_class(name="hysteresis_clamp")
class HysteresisClamp(Action):
"""Clamp an input attribute using upper/lower thresholds with hysteresis.
YAML definition:
hysteresis_clamp:
input: "#attr(sensor.temperature)"
output: "#attr(controller.input)"
low: 18.0
high: 22.0
hysteresis: 0.3
"""
def _populate(self, definition: Optional[dict]) -> None:
if not isinstance(definition, dict):
raise ValueError("hysteresis_clamp definition must be a mapping")
self._input_ref = definition.get("input")
self._output_ref = definition.get("output")
if not self._input_ref or not self._output_ref:
raise ValueError("hysteresis_clamp requires 'input' and 'output' references")
self.low = _coerce_float(definition.get("low"), 0.0)
self.high = _coerce_float(definition.get("high"), 1.0)
if self.low > self.high:
raise ValueError("low threshold must be <= high threshold")
self.hysteresis = max(0.0, _coerce_float(definition.get("hysteresis"), 0.0))
self._last_output: Optional[float] = None
super()._populate(definition)
def prepare(self, *args, **kwargs) -> bool:
self._input_attr = resolve_attribute_reference_hierarchical(self.parent, self._input_ref)
self._output_attr = resolve_attribute_reference_hierarchical(self.parent, self._output_ref)
if self._input_attr is None or self._output_attr is None:
raise ValueError("Unable to resolve input/output attributes for hysteresis_clamp")
self._last_output = None
return super().prepare(*args, **kwargs)
def run(self, *args, **kwargs) -> bool:
value = self._read_input()
output = self._apply_hysteresis(value)
self._write_output(output)
return super().run(result=output)
# Internal helpers -------------------------------------------------
def _read_input(self) -> float:
if hasattr(self._input_attr, "external_value"):
return float(self._input_attr.external_value)
if hasattr(self._input_attr, "internal_value"):
return float(self._input_attr.internal_value)
return float(self._input_attr.get())
def _write_output(self, value: float) -> None:
if hasattr(self._output_attr, "internal_value"):
self._output_attr.internal_value = value
elif hasattr(self._output_attr, "set"):
self._output_attr.set(value)
else:
raise AttributeError("Output attribute does not support writing")
def _apply_hysteresis(self, value: float) -> float:
# First run: clamp directly
if self._last_output is None:
clamped = min(self.high, max(self.low, value))
self._last_output = clamped
return clamped
lower_band = self.low - self.hysteresis
upper_band = self.high + self.hysteresis
if value < lower_band:
self._last_output = self.low
elif value > upper_band:
self._last_output = self.high
# Otherwise keep previous output
return self._last_outputRegistering and wiring the action
YAML snippet
Testing the extension
Packaging tips
Last updated

