Source code for tiptop_ipy.result

"""TipTopResult class wrapping FITS responses from the TIPTOP server."""
import numpy as np
from astropy.io import fits


[docs] class TipTopResult: """Wraps the FITS HDUList returned by the TIPTOP server. Provides convenient access to PSF data, coordinates, plotting, and Jupyter display. The TIPTOP server returns a FITS file with 5 HDUs: - [0] PrimaryHDU — header with config parameters and timing - [1] ImageHDU ``PSF CUBE`` — AO-corrected PSF(s), shape (N, FOV, FOV) - [2] ImageHDU ``OPEN-LOOP PSF`` — seeing-limited PSF, shape (FOV, FOV) - [3] ImageHDU ``DIFFRACTION LIMITED PSF`` — perfect PSF, shape (FOV, FOV) - [4] ImageHDU ``Final PSFs profiles`` — radial profiles For multi-wavelength requests, there is one PSF CUBE HDU per wavelength. Coordinates are stored in the PSF CUBE header cards (CCX0000, CCY0000, ...). Parameters ---------- hdulist : fits.HDUList The FITS HDUList from the TIPTOP server. Attributes ---------- hdulist : fits.HDUList The underlying FITS data. """ def __init__(self, hdulist): self.hdulist = hdulist self._psf_indices = [] self._open_loop_index = None self._diffraction_index = None self._profile_index = None self._psd_index = None self._detect_structure() def _detect_structure(self): """Detect HDU roles using the CONTENT header card. Falls back to shape-based heuristics if CONTENT is missing. """ for i, hdu in enumerate(self.hdulist): if i == 0: continue if not isinstance(hdu, fits.ImageHDU) or hdu.data is None: continue content = hdu.header.get("CONTENT", "").upper() if "PSF CUBE" in content: self._psf_indices.append(i) elif "OPEN-LOOP" in content: self._open_loop_index = i elif "DIFFRACTION" in content: self._diffraction_index = i elif "PSD" in content: self._psd_index = i elif "PROFILE" in content: self._profile_index = i else: # Fallback heuristic: 3D data is a PSF cube if hdu.data.ndim == 3 and hdu.data.shape[1] == hdu.data.shape[2]: self._psf_indices.append(i) def _read_coordinates_from_header(self): """Read PSF coordinates from the PSF CUBE header cards.""" if not self._psf_indices: return np.array([]), np.array([]) header = self.hdulist[self._psf_indices[0]].header xs, ys = [], [] for j in range(10000): xkey = f"CCX{j:04d}" ykey = f"CCY{j:04d}" if xkey in header: xs.append(header[xkey]) ys.append(header[ykey]) else: break return np.array(xs, dtype=float), np.array(ys, dtype=float) @property def psf(self): """The PSF image data from the first PSF CUBE HDU. For single-position results this is shape (1, H, W). For multi-position results this is shape (N, H, W). """ if not self._psf_indices: raise ValueError("No PSF data found in FITS file") return self.hdulist[self._psf_indices[0]].data @property def open_loop_psf(self): """The seeing-limited (open-loop) PSF, shape (H, W).""" if self._open_loop_index is None: raise ValueError("No open-loop PSF found in FITS file") return self.hdulist[self._open_loop_index].data @property def diffraction_psf(self): """The diffraction-limited PSF, shape (H, W).""" if self._diffraction_index is None: raise ValueError("No diffraction-limited PSF found in FITS file") return self.hdulist[self._diffraction_index].data @property def has_psd(self): """Whether the result contains a Power Spectral Density HDU.""" return self._psd_index is not None @property def psd(self): """The high-order PSD, shape (nPointings, N, N), units nm².""" if self._psd_index is None: raise ValueError( "No PSD data found. Re-run with save_psds=True." ) return self.hdulist[self._psd_index].data @property def x(self): """X-axis coordinates of PSF positions in arcsec.""" xs, _ = self._read_coordinates_from_header() return xs @property def y(self): """Y-axis coordinates of PSF positions in arcsec.""" _, ys = self._read_coordinates_from_header() return ys @property def n_wavelengths(self): """Number of wavelength channels (PSF CUBE HDUs).""" return len(self._psf_indices) @property def strehl(self): """Strehl ratios for each position (from PSF CUBE header).""" if not self._psf_indices: return np.array([]) header = self.hdulist[self._psf_indices[0]].header values = [] for j in range(10000): key = f"SR{j:04d}" if key in header: values.append(header[key]) else: break return np.array(values, dtype=float) @property def fwhm(self): """FWHM values in mas for each position (from PSF CUBE header).""" if not self._psf_indices: return np.array([]) header = self.hdulist[self._psf_indices[0]].header values = [] for j in range(10000): key = f"FWHM{j:04d}" if key in header: values.append(header[key]) else: break return np.array(values, dtype=float)
[docs] def psf_cube(self, wavelength_index=0): """Get PSF cube for a specific wavelength index. Parameters ---------- wavelength_index : int Index into the list of wavelength channels. Returns ------- data : np.ndarray PSF cube or image. """ if wavelength_index >= len(self._psf_indices): raise IndexError( f"wavelength_index {wavelength_index} out of range " f"(have {len(self._psf_indices)} channels)" ) return self.hdulist[self._psf_indices[wavelength_index]].data
[docs] def nearest_psf(self, x, y, wavelength_index=0): """Return the PSF nearest to (x, y) in arcsec. Parameters ---------- x, y : float Position in arcsec. wavelength_index : int Which wavelength channel to use. Returns ------- psf : np.ndarray 2D PSF image. """ r2 = (x - self.x) ** 2 + (y - self.y) ** 2 i = np.argmin(r2) cube = self.psf_cube(wavelength_index) if cube.ndim == 3: return cube[i, :, :] return cube # single PSF
[docs] def plot(self, log_scale=True, wavelength_index=0, position_index=0, **kwargs): """Quick PSF plot with matplotlib. Works in Jupyter. Parameters ---------- log_scale : bool Use log10 scaling for the colormap. wavelength_index : int Which wavelength channel to plot. position_index : int Which position in the cube to plot. **kwargs Passed to matplotlib.pyplot.imshow. """ import matplotlib.pyplot as plt cube = self.psf_cube(wavelength_index) if cube.ndim == 3: image = cube[position_index, :, :] else: image = cube plot_data = np.log10(np.clip(image, a_min=1e-20, a_max=None)) if log_scale else image fig, ax = plt.subplots(1, 1, figsize=(6, 6)) defaults = {"origin": "lower", "cmap": "inferno"} defaults.update(kwargs) im = ax.imshow(plot_data, **defaults) plt.colorbar(im, ax=ax, label="log10(flux)" if log_scale else "flux") title = "PSF" if self.n_wavelengths > 1: title += f" [wave {wavelength_index}]" if cube.ndim == 3 and cube.shape[0] > 1: title += f" [pos {position_index}]" ax.set_title(title) ax.set_xlabel("pixels") ax.set_ylabel("pixels") plt.tight_layout() return fig, ax
[docs] def writeto(self, filename, overwrite=False): """Save FITS to disk. Parameters ---------- filename : str Output file path. overwrite : bool Overwrite existing file. """ self.hdulist.writeto(filename, overwrite=overwrite)
def _repr_html_(self): """Rich Jupyter display: PSF thumbnail + HDU summary table.""" rows = [] for i, hdu in enumerate(self.hdulist): hdu_type = type(hdu).__name__ shape = str(hdu.data.shape) if hdu.data is not None else "—" content = hdu.header.get("CONTENT", "") if i == 0: content = "Primary header" rows.append(f"<tr><td>{i}</td><td>{hdu_type}</td>" f"<td>{shape}</td><td>{content}</td></tr>") table = ( "<table>" "<tr><th>HDU</th><th>Type</th><th>Shape</th><th>Content</th></tr>" + "".join(rows) + "</table>" ) n_pos = len(self.x) sr = self.strehl sr_str = f", Strehl={sr[0]:.3f}" if len(sr) > 0 else "" return ( f"<b>TipTopResult</b>: {self.n_wavelengths} wavelength(s), " f"{n_pos} position(s){sr_str}<br>{table}" )
[docs] def __repr__(self): n_hdus = len(self.hdulist) n_pos = len(self.x) sr = self.strehl sr_str = f", Strehl={sr[0]:.3f}" if len(sr) > 0 else "" return ( f"TipTopResult({self.n_wavelengths} wavelength(s), " f"{n_pos} position(s), {n_hdus} HDUs{sr_str})" )