Bases: DataProcessor
Default on‑demand processing core for derived observables.
Overview
Provides a processing-functions registry and a cache for computed results.
Delegates raw observables to the wrapped Data object and computes others
using registered functions.
- Maintains
_processing_functions and processed_data cache.
- get_data/get_units compute lazily and raise ValueError if unknown.
- validate_observables remains abstract for concrete checks.
Usage Notes
Register per-observable processing functions in _processing_functions
and ensure they return {"units": str, "data": ...}.
Source code in contracts\data_processors.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108 | @decorate_class_with_logging(log_level=DEBUG_DATA_PROCESSOR)
class DataProcessorCore(DataProcessor):
"""
Default on‑demand processing core for derived observables.
Overview:
Provides a processing-functions registry and a cache for computed results.
Delegates raw observables to the wrapped Data object and computes others
using registered functions.
- Maintains `_processing_functions` and `processed_data` cache.
- get_data/get_units compute lazily and raise ValueError if unknown.
- validate_observables remains abstract for concrete checks.
Usage Notes:
Register per-observable processing functions in `_processing_functions`
and ensure they return `{"units": str, "data": ...}`.
"""
processed_data: dict[str, Observable]
processing_functions: dict[str, Callable]
def __init__(self, data: Data):
self.data = data
self._processing_functions = {
"elapsed_time": self.elapsed_time
}
self.processed_data: dict[str, Observable] = {}
for key in self._processing_functions:
self.processed_data[key] = None
self._processed_observables = self.processed_data.keys()
def get_data(self, observable: str, *args, **kwargs):
# If observable is available from raw data delegate to Data
if observable in self.data.get_allowed_observables():
return self.data.get_data(observable)
# Compute processed data if needed
elif observable in self._processed_observables:
if self.processed_data[observable] is None:
# Adds the data to the processed_data dict after computing it
self.processed_data[observable] = self._processing_functions[observable](*args, **kwargs)
# Simply return if already set
return self.processed_data[observable]['data']
else:
# FIXME: Apparently object has no attribute '__name__'. Did you mean: '__ne__'? gets triggered when ValueError is raised
raise ValueError(f"{self.__class__.__name__} does not contain {observable} data")
def get_units(self, observable: str, *args, **kwargs) -> str:
self.get_data(observable)
# Return raw data
if observable in self.data.get_allowed_observables():
return self.data.get_units(observable)
elif observable in self._processed_observables:
return self.processed_data[observable]["units"]
else:
raise ValueError(f"{self.__class__.__name__} does not contain {observable} data")
@abstractmethod
def validate_observables(self, *args, **kwargs) -> None:
"""
This function will check whether all requested observables are available.
This should be implemented by the individual subclasses
"""
pass
def elapsed_time(self, *args, **kwargs) -> Observable:
# Get a reference timestamp from *args
reference_datetime = kwargs["experiment_datetime"]
data_datetime = self.get_data("datetime")
return {"units": "$Elapsed ~time ~(hrs)$", "data": data_datetime - reference_datetime}
|
validate_observables(*args, **kwargs)
abstractmethod
This function will check whether all requested observables are available.
This should be implemented by the individual subclasses
Source code in contracts\data_processors.py
| @abstractmethod
def validate_observables(self, *args, **kwargs) -> None:
"""
This function will check whether all requested observables are available.
This should be implemented by the individual subclasses
"""
pass
|