Filters & Signal Processing
Pattern: “read input attribute → write output attribute”
Minimal example: moving average
from collections import deque
from spx_sdk.actions import Action
from spx_sdk.registry import register_class
@register_class(name="moving_average")
class MovingAverage(Action):
def _populate(self, definition):
super()._populate(definition)
self.window = int(getattr(self, "window", 8))
self._buf = deque(maxlen=self.window)
def prepare(self, *args, **kwargs):
super().prepare(*args, **kwargs)
self._buf.clear()
def run(self, *args, **kwargs):
self.apply_wrappers()
sample = getattr(self, "source", None)
if sample is None:
return False
self._buf.append(float(sample))
avg = sum(self._buf) / len(self._buf)
return self.write_outputs(avg)Last updated

