Source code for koa_middleware.database.metadata_database
from contextlib import contextmanager
from sqlalchemy.orm import Session
from sqlalchemy import create_engine
from sqlalchemy import func
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import SQLAlchemyError
import datetime
from .orm_base import CalibrationORM
__all__ = ['CalibrationDB']
[docs]
class CalibrationDB:
"""Generic utility class to interface with a local SQLite DB or remote PostgreSQL DB."""
def __init__(self, url : str, orm_class : type[CalibrationORM]):
"""
Initialize the CalibrationDB.
Args:
url (str): Database connection URL for SQLite or PostgreSQL.
orm_class (type[CalibrationORM]): The ORM class that defines the database schema.
"""
self.engine = self.get_engine(url=url)
orm_class.metadata.create_all(self.engine)
self.Session = sessionmaker(bind=self.engine)
self.orm_class = orm_class
[docs]
def get_engine(self, url : str, echo : bool = True):
return create_engine(url, echo=echo)
[docs]
def close(self):
self.engine.dispose()
[docs]
@contextmanager
def session_manager(self, external_session: Session | None = None):
"""
Context manager to handle database sessions.
"""
own_session = external_session is None
session = self.Session() if own_session else external_session
try:
yield session
except SQLAlchemyError as e:
session.rollback()
raise e
finally:
if own_session:
session.close()
[docs]
def get_last_updated(self, session : Session | None = None) -> str:
"""
Get most recent LAST_UPDATED timestamp from the database.
Args:
session (Session | None): Optional SQLAlchemy session to use. If None, a new session will be created.
Returns:
str: The most recent LAST_UPDATED timestamp in ISO format.
"""
with self.session_manager(session) as session:
last_updated = session.query(func.max(self.orm_class.last_updated)).scalar()
return last_updated
[docs]
def query(
self,
session : Session | None = None,
cal_type : str | None = None,
date_time_start : str | None = None,
date_time_end : str | None = None,
fetch : str = 'all',
) -> list[CalibrationORM]:
"""
Higher level query method to retrieve calibrations based on a specified calibration type and datetime start/end.
The utility of this method will be revisited as the DRP is developed.
Args:
session (Session | None): Optional SQLAlchemy session to use. If None, a new session will be created.
cal_type (str | None): Optional calibration type to filter results. If None, all types are included.
date_time_start (str | None): Start datetime in ISO format. Defaults to the minimum datetime.
date_time_end (str | None): End datetime in ISO format. Defaults to the maximum datetime.
fetch (str): Specifies whether to fetch 'all' results or just the 'first' result. Defaults to 'all'.
Returns:
list[CalibrationORM]: A list of CalibrationORM objects matching the query criteria.
"""
if date_time_start is None:
date_time_start = datetime.datetime.min.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
if date_time_end is None:
date_time_end = datetime.datetime.max.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
with self.session_manager(session) as session:
_query = session.query(self.orm_class).filter(
self.orm_class.datetime_obs >= date_time_start,
self.orm_class.datetime_obs <= date_time_end
)
if cal_type is not None:
_query = _query.filter(self.orm_class.cal_type == cal_type)
if fetch == 'all':
result = _query.all()
elif fetch == 'first':
result = _query.first()
return result
[docs]
def add(
self,
calibration : CalibrationORM | list[CalibrationORM],
session : Session | None = None,
commit : bool = True,
):
"""
Add one or many calibrations to the database.
Args:
calibration (CalibrationORM | list[CalibrationORM]): A single CalibrationORM object or a list of them to add.
session (Session | None): Optional SQLAlchemy session to use. If None, a new session will be created.
commit (bool): Whether to commit the transaction after adding the calibration(s). Defaults to True.
"""
if not isinstance(calibration, list):
calibration = [calibration]
with self.session_manager(session) as session:
for item in calibration:
session.merge(item)
if commit:
session.commit()
[docs]
def query_by_id(self, calibration_id : str, session : Session | None = None) -> CalibrationORM | None:
"""
Query a calibration by its ID.
Args:
calibration_id (str): The ID of the calibration to query.
session (Session | None): Optional SQLAlchemy session to use. If None, a new session will be created.
Returns:
list[CalibrationORM] | None: The calibration objects found by ID.
"""
with self.session_manager(session) as session:
return session.query(self.orm_class).filter_by(id=calibration_id).all()