from importlib.resources import open_text
import numpy as np
import pandas as pd
from scipy.interpolate import interp1d
from similaritymeasures import frechet_dist
from sklearn.metrics import mean_absolute_error
try:
from sklearn.metrics import root_mean_squared_error
except ImportError:
from sklearn.metrics import mean_squared_error
def root_mean_squared_error(y_true, y_pred):
return np.sqrt(mean_squared_error(y_true, y_pred))
from self_driving_lab_demo import data as data_module
from self_driving_lab_demo.core import SelfDrivingLabDemo, SensorSimulator
from self_driving_lab_demo.utils.channel_info import (
CHANNEL_HEX_COLORS,
CHANNEL_NAMES,
CHANNEL_WAVELENGTHS_MEAN_FWHM,
)
from self_driving_lab_demo.utils.observe import mqtt_observe_sensor_data
[docs]
class SensorSimulatorLight(SensorSimulator):
def __init__(self):
self.red_interp = self.create_interpolator("neopixel_red.csv")
self.green_interp = self.create_interpolator("neopixel_green.csv")
self.blue_interp = self.create_interpolator("neopixel_blue.csv")
@property
def channel_wavelengths(self):
return CHANNEL_WAVELENGTHS_MEAN_FWHM
@property
def channel_hex_colors(self):
return CHANNEL_HEX_COLORS
[docs]
def create_interpolator(self, fname):
wavelength_lbl = "wavelength" # nm
intensity_lbl = "relative_intensity" # (uW/cm^2)/nm
df = pd.read_csv(
open_text(data_module, fname),
header=0,
names=[wavelength_lbl, intensity_lbl],
)
df["relative_intensity"] = df["relative_intensity"].clip(lower=0.0)
# average y-values for repeat x-values
# see also https://stackoverflow.com/a/51258988/13697228
df = df.groupby("wavelength", as_index=False).mean()
return interp1d(
df[wavelength_lbl],
df[intensity_lbl],
kind="linear",
bounds_error=False,
fill_value=0.0,
)
def _simulate_sensor_data(
self, wavelengths, R, G, B, atime=100, astep=999, gain=128
):
rI, gI, bI = np.array([R, G, B]) / 255
# TODO: sample based on Gaussian distributions instead of discrete wavelengths
wavelengths = np.array(wavelengths)[:, 0]
channel_data = np.sum(
[
self.red_interp(wavelengths) * rI,
self.green_interp(wavelengths) * gI,
self.blue_interp(wavelengths) * bI,
],
axis=0,
)
integration_time = (astep + 1) * (atime + 1) * 2.78 / 1000 # as7341.py
channel_data = channel_data * integration_time * gain
return {name: data for name, data in zip(CHANNEL_NAMES, channel_data)}
[docs]
def simulate_sensor_data(self, parameters):
R = parameters["R"]
G = parameters["G"]
B = parameters["B"]
atime = parameters.get("atime", 100)
astep = parameters.get("astep", 999)
gain = parameters.get("gain", 128)
return self._simulate_sensor_data(
self.channel_wavelengths, R, G, B, atime=atime, astep=astep, gain=gain
)
[docs]
class SelfDrivingLabDemoLight(SelfDrivingLabDemo):
def __init__(
self,
random_rng=np.random.default_rng(),
target_seed=604523,
target_inputs=None,
rest_seconds=0.0,
max_power=0.35,
autoload=False,
simulation=False,
simulator=SensorSimulatorLight(),
observe_sensor_data_fn=mqtt_observe_sensor_data,
observe_sensor_data_kwargs={}, # dict(PICO_ID="a123b456")
):
"""
This class extends the SelfDrivingLabDemo class to simulate a self-driving
lab demo with light sensors.
Parameters
----------
random_rng : numpy random generator, optional
The random number generator used for generating random inputs, by
default np.random.default_rng()
target_seed : int, optional
The seed used for generating target inputs, as long as target_inputs is
not set directly using the class attribute, by default 604523
target_inputs : dict, optional
The target inputs for the demo, by default None
rest_seconds : float, optional
The rest time in seconds between actions, by default 0.0
max_power : float, optional
The maximum power for the light from 0.0 to 1.0, by default 0.35
autoload : bool, optional
Whether to load target data automatically, by default False
simulation : bool, optional
Whether to run in simulation mode, by default False
simulator : SensorSimulatorLight, optional
The simulator used for the demo, by default SensorSimulatorLight()
observe_sensor_data_fn : function, optional
The function used to observe sensor data, by default
mqtt_observe_sensor_data
observe_sensor_data_kwargs : dict, optional
The keyword arguments for the observe_sensor_data function, by default
{}
Examples
--------
>>> demo = SelfDrivingLabDemoLight(
... random_rng=np.random.default_rng(),
... target_seed=604523,
... target_inputs=None,
... rest_seconds=0.0,
... max_power=0.35,
... autoload=True,
... simulation=False,
... simulator=SensorSimulatorLight(),
... observe_sensor_data_fn=mqtt_observe_sensor_data,
... observe_sensor_data_kwargs={},
... )
>>> demo.evaluate(dict(R=50, G=150, B=250))
"""
super().__init__(
random_rng=random_rng,
target_seed=target_seed,
target_inputs=target_inputs,
rest_seconds=rest_seconds,
max_power=max_power,
autoload=autoload,
simulation=simulation,
simulator=simulator,
observe_sensor_data_fn=observe_sensor_data_fn,
observe_sensor_data_kwargs=observe_sensor_data_kwargs,
)
@property
def bounds(self):
mx = int(np.round(self.max_power * 255))
return dict(
R=[0, mx],
G=[0, mx],
B=[0, mx],
atime=[0, 255],
astep=[0, 65534],
gain=[0.5, 512],
)
@property
def parameters(self):
parameters = []
for nm, bnd in self.bounds.items():
if nm in ["R", "G", "B", "atime", "astep"]:
parameters.append(dict(name=nm, type="range", bounds=bnd))
elif nm in ["gain"]:
parameters.append(
dict(
name=nm,
type="choice",
is_ordered=True,
values=[
0.5,
1.0,
2.0,
4.0,
8.0,
16.0,
32.0,
64.0,
128.0,
256.0,
512.0,
],
)
)
else:
raise ValueError(f"unknown parameter {nm}")
return parameters
@property
def channel_names(self):
return CHANNEL_NAMES
@property
def channel_wavelengths_mean_fwhm(self):
return CHANNEL_WAVELENGTHS_MEAN_FWHM
@property
def channel_wavelengths(self):
return [ch[0] for ch in self.channel_wavelengths_mean_fwhm]
[docs]
def evaluate(self, parameters):
if not hasattr(self, "target_results"):
raise ValueError(
"must call `load_target_data` first or instantiate with autoload=True"
)
results = self.observe_sensor_data(parameters)
if results.get("error", None) is not None:
raise ValueError(results["error"])
target_data = [float(self.target_results[ch]) for ch in self.channel_names]
data = [float(results[ch]) for ch in self.channel_names]
results["mae"] = mean_absolute_error(target_data, data)
results["rmse"] = root_mean_squared_error(target_data, data)
target_dist = np.array([self.channel_wavelengths, target_data]).T
dist = np.array([self.channel_wavelengths, data]).T
results["frechet"] = frechet_dist(target_dist, dist)
return results
[docs]
def clear(self):
self.observe_sensor_data(dict(R=0, G=0, B=0))