Source code for tiptop_ipy.utils

import os
import os.path as p
import time
from tempfile import TemporaryFile

import requests
from requests_toolbelt.multipart import decoder
import yaml
from astropy.io import fits

from .ini_parser import parse_ini, write_ini


with open(p.join(p.dirname(__file__), "defaults.yaml")) as f:
    DEFAULTS = f.read()
    DEFAULTS_YAML = yaml.full_load(DEFAULTS)

_ESO_URL = "https://www.eso.org/p2services/any/tiptop"
_UNIVIE_URL = "https://tiptop.univie.ac.at/api"

SERVERS = {
    "eso": _ESO_URL,
    "univie": _UNIVIE_URL,
}

_server_url = os.environ.get("TIPTOP_SERVER_URL", _UNIVIE_URL)


[docs] def set_server(name_or_url): """Set the TIPTOP server. Parameters ---------- name_or_url : str Either a server name ("eso", "univie") or a full URL. Examples -------- >>> set_server("univie") # University of Vienna (default) >>> set_server("http://...") # Custom URL """ global _server_url _server_url = SERVERS.get(name_or_url.lower(), name_or_url)
[docs] def get_server(): """Return the current TIPTOP server URL.""" return _server_url
[docs] def list_instruments(include_path=False): """List available instrument template names. Parameters ---------- include_path : bool If True, return full paths instead of just names. Returns ------- names : list[str] Sorted list of instrument names (without .ini extension) or full paths if include_path is True. """ dname = p.join(p.dirname(__file__), "instrument_templates") fnames = sorted( fname for fname in os.listdir(dname) if fname.endswith(".ini") ) if include_path: return [p.join(dname, fname) for fname in fnames] return [fname.replace(".ini", "") for fname in fnames]
[docs] def query_tiptop_server(ini_content, timeout=120, force_simulation=False, save_psds=False): """Send an INI config to the TIPTOP server and return the FITS result. Routes to ESO (synchronous) or custom server (async polling) based on the current server URL set via ``set_server()`` or ``TIPTOP_SERVER_URL``. Parameters ---------- ini_content : str The raw contents of a TIPTOP config .ini file. timeout : int Request timeout in seconds. force_simulation : bool If True, bypass the server cache and force a fresh simulation. Only supported on custom servers (not ESO). save_psds : bool If True, include the high-order PSD in the output FITS file. Only supported on custom servers (not ESO). Returns ------- hdus : fits.HDUList A FITS file with extensions: - [0] PrimaryHDU: header info - [1] ImageHDU: PSF cube(s) - [-1] ImageHDU: (2, N) cartesian coordinates of PSF positions """ if _server_url == _ESO_URL: return _query_eso(ini_content, timeout) return _query_custom(ini_content, timeout, force_simulation=force_simulation, save_psds=save_psds)
def _query_eso(ini_content, timeout=120): """Send config to the ESO TIPTOP endpoint (synchronous).""" with TemporaryFile() as ini_stream: ini_stream.write(ini_content.encode("ascii")) ini_stream.seek(0) desc_file = p.join(p.dirname(__file__), "instrument_templates", "serviceDescription.json") with open(desc_file, "rb") as df: files = { "serviceDescription": (desc_file, df, "application/json"), "parameterFile": ("tiptop_ipy.ini", ini_stream, "text/plain"), } response = requests.post(_ESO_URL, files=files, timeout=timeout) if response.status_code != 200: raise ValueError( f"TIPTOP server returned HTTP {response.status_code}: " f"{response.text[:200]}" ) return _parse_multipart_fits(response) def _query_custom(ini_content, timeout=300, force_simulation=False, save_psds=False): """Send config to a custom TIPTOP server (async polling).""" submit_url = f"{_server_url}/submit.php" with TemporaryFile() as ini_stream: ini_stream.write(ini_content.encode("ascii")) ini_stream.seek(0) desc_file = p.join(p.dirname(__file__), "instrument_templates", "serviceDescription.json") with open(desc_file, "rb") as df: files = { "serviceDescription": (desc_file, df, "application/json"), "parameterFile": ("tiptop_ipy.ini", ini_stream, "text/plain"), } data = {} if force_simulation: data["force_simulation"] = "1" if save_psds: data["save_psds"] = "1" response = requests.post(submit_url, files=files, data=data, timeout=60) if response.status_code != 200: raise ValueError( f"TIPTOP server returned HTTP {response.status_code}: " f"{response.text[:200]}" ) # Cache hit: server returns multipart FITS directly content_type = response.headers.get("Content-Type", "") if "multipart" in content_type: return _parse_multipart_fits(response) # Cache miss: server returns JSON with job_id data = response.json() job_id = data["job_id"] # Poll for completion result_url = f"{_server_url}/result.php" deadline = time.time() + timeout poll_interval = 2 while time.time() < deadline: time.sleep(poll_interval) resp = requests.get(result_url, params={"job_id": job_id}, timeout=30) if resp.status_code == 200: return _parse_multipart_fits(resp) if resp.status_code == 409: status = resp.json() error_msg = status.get("error_message") if status["status"] == "failed": raise ValueError( f"TIPTOP simulation failed: " f"{error_msg or 'Unknown server error'}" ) # "retry" with an error message means the job is stuck failing if status["status"] == "retry" and error_msg: raise ValueError( f"TIPTOP job {job_id} failed (retrying): {error_msg}" ) continue raise ValueError( f"TIPTOP result endpoint returned HTTP {resp.status_code}: " f"{resp.text[:200]}" ) raise TimeoutError( f"TIPTOP job {job_id} did not complete within {timeout}s" ) def _parse_multipart_fits(response): """Parse a multipart response containing a FITS file.""" payload = decoder.MultipartDecoder.from_response(response) if "cannot extract JSON structure from service output" in str( payload.parts[0].content ): raise ValueError("Config file cannot be parsed by server") with TemporaryFile(mode="w+b") as tmp: found_fits_file = False for part in payload.parts: content_type = part.headers.get(b"Content-Type") disposition = part.headers.get(b"Content-Disposition", b"").decode() if (content_type == b"application/octet-stream" and "tiptop_ipy.fits" in disposition): tmp.write(part.content) hdus = fits.open(tmp, mode="update", lazy_load_hdus=False) # Force loading data into RAM before temp file closes _ = [hdu.data for hdu in hdus] found_fits_file = True if not found_fits_file: parts_summary = [] max_msg_len = -1 for i, part in enumerate(payload.parts): ct = part.headers.get(b"Content-Type", b"unknown").decode() disp = part.headers.get(b"Content-Disposition", b"").decode() try: body = part.content.decode(errors="replace")[:max_msg_len] except Exception: body = repr(part.content[:max_msg_len]) parts_summary.append( f" Part {i}: Content-Type={ct}, " f"Disposition={disp}\n {body}" ) detail = "\n".join(parts_summary) raise ValueError( f"TIPTOP did not return a FITS file. " f"Server response ({len(payload.parts)} parts):\n{detail}" ) return hdus