Source code for koa_middleware.keck_client

import os
import json
from base64 import b64encode
from pathlib import Path

import requests
from platformdirs import user_state_dir

import logging
logger = logging.getLogger(__name__)

__all__ = ["KeckObserverAuthClient"]

_APP_NAME = "koa_middleware"
_STATE_DIR = Path(user_state_dir(_APP_NAME))
_STATE_DIR.mkdir(parents=True, exist_ok=True)
_COOKIE_PATH = _STATE_DIR / "cookies.json"

_KECK_LOGIN_URL = "https://www3.keck.hawaii.edu"


[docs] class KeckObserverAuthClient: """ Client for authorizing credentials to login to the Keck Observatory. This client manages authentication with the Keck Observatory login system, handling session management and cookie persistence. Eventually a similar client will be made for the Keck Observatory Archive (KOA). """ _cached_session = None _cached_observer_id = None def __init__(self): """ Initialize the KeckObserverAuthClient. Attempts to load cached credentials from disk. If no valid cached credentials are found, performs a new login using KECK_EMAIL and KECK_PASSWORD environment variables. """ self.login_url = _KECK_LOGIN_URL if KeckObserverAuthClient._cached_session is not None: self.session = KeckObserverAuthClient._cached_session self._observer_id = KeckObserverAuthClient._cached_observer_id else: self.session = requests.Session() self._observer_id = None if not self._load_cookies() or not self._validate_login(): logger.info("No valid login detected, logging in...") oid, obs_cookie = self._perform_login() for k, v in obs_cookie.items(): self.session.cookies.set(k, v) self._observer_id = oid logger.info(f"Keck Observer login successful! Observer ID = {oid}") KeckObserverAuthClient._cached_session = self.session KeckObserverAuthClient._cached_observer_id = self._observer_id ############### #### Login #### ############### def _validate_login(self) -> bool: """ Validate that the current session has valid login credentials. Returns ------- bool True if the session is valid, False otherwise. """ r = self.session.get(f"{self.login_url}/userinfo/odb-cookie") if r.status_code != 200: logger.info(f"Session validation returned HTTP {r.status_code}; re-login required.") return False try: valid = r.json().get("Id") is not None if not valid: logger.info("Session validation: no observer ID found; re-login required.") return valid except Exception: logger.info("Session validation: failed to parse response; re-login required.") return False def _perform_login(self) -> tuple[str, dict]: """ Perform login to the Keck Observatory using credentials from environment variables. Requires KECK_EMAIL and KECK_PASSWORD environment variables to be set. Returns ------- tuple[str, dict] A tuple containing: - The observer ID as a string - The observer cookie dictionary Raises ------ ValueError If KECK_EMAIL or KECK_PASSWORD are not set, or if login fails. RuntimeError If the account requires email verification (not supported). """ email = os.getenv("KECK_OBSERVER_EMAIL") password = os.getenv("KECK_OBSERVER_PASSWORD") if not email or not password: msg = "KECK_OBSERVER_EMAIL and KECK_OBSERVER_PASSWORD must be set as environment variables." logger.error(msg) raise ValueError(msg) login_params = dict(email=email, password=password, url=self.login_url) r = requests.get(f"{self.login_url}/login/script", params=login_params) if r.status_code == 401: try: err = r.json() except Exception: err = {"comment": r.text} comment = err.get("comment", "").lower() if "verification" in comment: msg = "Account requires email verification code; token flow not supported here." logger.error(msg) raise RuntimeError(msg) error_msg = err.get("comment", "Invalid credentials.") logger.error(f"Login failed: {error_msg}") raise ValueError(error_msg) api = r.json() uid_cookie = {"KECK-AUTH-UID": api["py_uid"]} u = requests.get(f"{self.login_url}/userinfo/odb-cookie", cookies=uid_cookie) assert u.status_code == 200, f"{u} not successful" logger.info(f"User info request successful: observer ID={u.json().get('Id')!r}") observer_id = str(u.json()["Id"]) encoded = b64encode(observer_id.encode()).decode() observer_cookie = {"observer": f"obsid={encoded}"} _COOKIE_PATH.write_text(json.dumps(observer_cookie)) return observer_id, observer_cookie @property def cookies_dict(self) -> dict: """ Get the session cookies as a dictionary. Returns ------- dict The cookies from the current session as a dictionary. """ return requests.utils.dict_from_cookiejar(self.cookies) @property def cookies(self): """ Get the session cookies. Returns ------- requests.cookies.RequestsCookieJar The cookies from the current session. """ return self.session.cookies def _load_cookies(self) -> bool: """ Load cookies from disk if they exist. Returns ------- bool True if cookies were successfully loaded, False otherwise. """ if not _COOKIE_PATH.exists(): return False try: data = json.loads(_COOKIE_PATH.read_text()) except Exception: logger.warning(f"Cookie file at {_COOKIE_PATH} exists but could not be parsed; will re-authenticate.") return False for k, v in data.items(): self.session.cookies.set(k, v) return True