Data Processors

Bases: ABC

Abstract interface for data processors that derive observables from raw Data.

Overview

Defines the minimal contract for processing layers that expose computed observables and their units.

  • Abstract methods: get_data, get_units, validate_observables.
  • Focused on returning data and unit strings for named observables.
Usage Notes

Implementations should delegate raw observables to a Data instance and compute derived observables on demand.

Source code in contracts\data_processors.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class DataProcessor(ABC):
    """
        Abstract interface for data processors that derive observables from raw Data.

        Overview:
            Defines the minimal contract for processing layers that expose
            computed observables and their units.

        - Abstract methods: get_data, get_units, validate_observables.
        - Focused on returning data and unit strings for named observables.

        Usage Notes:
            Implementations should delegate raw observables to a Data instance
            and compute derived observables on demand.
        """

    @abstractmethod
    def get_data(self, observable: str, *args, **kwargs) -> Any:
        pass

    @abstractmethod
    def get_units(self, observable: str, *args, **kwargs) -> str:
        pass

    @abstractmethod
    def validate_observables(self, *args, **kwargs) -> None:
        pass

Bases: DataProcessor

Default on‑demand processing core for derived observables.

Overview

Provides a processing-functions registry and a cache for computed results. Delegates raw observables to the wrapped Data object and computes others using registered functions.

  • Maintains _processing_functions and processed_data cache.
  • get_data/get_units compute lazily and raise ValueError if unknown.
  • validate_observables remains abstract for concrete checks.
Usage Notes

Register per-observable processing functions in _processing_functions and ensure they return {"units": str, "data": ...}.

Source code in contracts\data_processors.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
@decorate_class_with_logging(log_level=DEBUG_DATA_PROCESSOR)
class DataProcessorCore(DataProcessor):
    """
       Default on‑demand processing core for derived observables.

       Overview:
           Provides a processing-functions registry and a cache for computed results.
           Delegates raw observables to the wrapped Data object and computes others
           using registered functions.

       - Maintains `_processing_functions` and `processed_data` cache.
       - get_data/get_units compute lazily and raise ValueError if unknown.
       - validate_observables remains abstract for concrete checks.

       Usage Notes:
           Register per-observable processing functions in `_processing_functions`
           and ensure they return `{"units": str, "data": ...}`.
       """

    processed_data: dict[str, Observable]
    processing_functions: dict[str, Callable]

    def __init__(self, data: Data):
        self.data = data
        self._processing_functions = {
            "elapsed_time": self.elapsed_time
        }
        self.processed_data: dict[str, Observable] = {}
        for key in self._processing_functions:
            self.processed_data[key] = None

        self._processed_observables = self.processed_data.keys()

    def get_data(self, observable: str, *args, **kwargs):
        # If observable is available from raw data delegate to Data
        if observable in self.data.get_allowed_observables():
            return self.data.get_data(observable)

        # Compute processed data if needed
        elif observable in self._processed_observables:
            if self.processed_data[observable] is None:
                # Adds the data to the processed_data dict after computing it
                self.processed_data[observable] = self._processing_functions[observable](*args, **kwargs)

            # Simply return if already set
            return self.processed_data[observable]['data']
        else:
            # FIXME: Apparently object has no attribute '__name__'. Did you mean: '__ne__'? gets triggered when ValueError is raised
            raise ValueError(f"{self.__class__.__name__} does not contain {observable} data")

    def get_units(self, observable: str, *args, **kwargs) -> str:
        self.get_data(observable)
        # Return raw data
        if observable in self.data.get_allowed_observables():
            return self.data.get_units(observable)
        elif observable in self._processed_observables:
            return self.processed_data[observable]["units"]
        else:
            raise ValueError(f"{self.__class__.__name__} does not contain {observable} data")

    @abstractmethod
    def validate_observables(self, *args, **kwargs) -> None:
        """
            This function will check whether all requested observables are available.
            This should be implemented by the individual subclasses
        """
        pass

    def elapsed_time(self, *args, **kwargs) -> Observable:
        # Get a reference timestamp from *args
        reference_datetime = kwargs["experiment_datetime"]
        data_datetime = self.get_data("datetime")
        return {"units": "$Elapsed ~time ~(hrs)$", "data": data_datetime - reference_datetime}

validate_observables(*args, **kwargs) abstractmethod

This function will check whether all requested observables are available. This should be implemented by the individual subclasses

Source code in contracts\data_processors.py
 96
 97
 98
 99
100
101
102
@abstractmethod
def validate_observables(self, *args, **kwargs) -> None:
    """
        This function will check whether all requested observables are available.
        This should be implemented by the individual subclasses
    """
    pass