Source code for almanac.utils

import numpy as np
from time import time
from datetime import datetime
from itertools import cycle, groupby
from typing import Tuple, Union, Optional, List, Dict, Any
from astropy.table import Table
from astropy.time import Time
from astropy.io.registry import register_identifier, register_reader, register_writer
from pydl.pydlutils.yanny import is_yanny, read_table_yanny, write_table_yanny


register_identifier("yanny", Table, is_yanny)
register_reader("yanny", Table, read_table_yanny)
register_writer("yanny", Table, write_table_yanny)

[docs] def adjusted_fiber_index_to_fiber_id(adjusted_fiber_index): """ Adjusted fiber index runs from 1 to 600 (inclusive) to account for APO and LCO. Separate from whether it is APO or LCO, the fiber index runs in the opposite direction as what we call `fiber_id`. """ adjusted_fiber_index = np.atleast_1d(adjusted_fiber_index) adjusted_fiber_index = np.where( adjusted_fiber_index > 300, adjusted_fiber_index - 300, adjusted_fiber_index ) return 301 - adjusted_fiber_index
[docs] def group_contiguous(v): groups = [] for k, g in groupby(enumerate(sorted(v)), lambda x: x[1] - x[0]): group = list(map(lambda x: x[1], g)) groups.append((group[0], group[-1])) return groups
[docs] def get_observatories(apo: bool, lco: bool) -> Tuple[str, ...]: """Get observatory names based on boolean flags. Args: apo: Whether to include APO observatory lco: Whether to include LCO observatory Returns: Tuple of observatory names ("apo", "lco", or both) """ if apo and not lco: return ("apo",) elif lco and not apo: return ("lco",) else: return ("apo", "lco")
[docs] def timestamp_to_mjd(v: float) -> float: """Convert Unix timestamp to Modified Julian Date (MJD). Args: v: Unix timestamp in seconds Returns: Modified Julian Date as float """ return (v / 86400.0) + 40587.5
[docs] def get_current_mjd() -> int: """Get current Modified Julian Date as integer. Returns: Current MJD as integer """ return int(timestamp_to_mjd(time()))
[docs] def datetime_to_mjd(date: str) -> int: """Convert date string to Modified Julian Date. Args: date: Date string in format "YYYY-MM-DD" Returns: Modified Julian Date as integer """ return int(timestamp_to_mjd(datetime.strptime(date, "%Y-%m-%d").timestamp()))
[docs] def mjd_to_datetime(mjd: float) -> datetime: """Convert Modified Julian Date to datetime object. Args: mjd: Modified Julian Date Returns: Datetime object """ return Time(mjd, format='mjd').datetime
[docs] def parse_mjds( mjd: Optional[int], mjd_start: Optional[int], mjd_end: Optional[int], date: Optional[str], date_start: Optional[str], date_end: Optional[str], earliest_mjd: int = 0, return_nones: bool = False ) -> Tuple[Union[int, range, Tuple[int, ...]], int, int]: """Parse MJD and date parameters to determine observation date range. Args: mjd: Single MJD value (can be negative for relative to current) mjd_start: Start MJD for range (can be negative for relative to current) mjd_end: End MJD for range (can be negative for relative to current) date: Single date string in "YYYY-MM-DD" format date_start: Start date string in "YYYY-MM-DD" format date_end: End date string in "YYYY-MM-DD" format earliest_mjd: Earliest allowed MJD value (default: 0) return_nones: If True, return None for mjd range if no dates are specified (default: False) Returns: Tuple containing: - MJD values (single int, range, or tuple) - Start MJD (int) - End MJD (int) Raises: ValueError: If more than one time specification method is provided RuntimeError: If no valid time specification is found """ has_mjd_range = mjd_start is not None or mjd_end is not None has_date_range = date_start is not None or date_end is not None current_mjd = get_current_mjd() n_given = sum([has_mjd_range, has_date_range, mjd is not None, date is not None]) if n_given > 1: raise ValueError( "Cannot specify more than one of --mjd, --mjd-start/--mjd-end, --date, --date-start/--date-end" ) if n_given == 0: if return_nones: return (None, None, None) else: return ((current_mjd, ), current_mjd, current_mjd) if mjd is not None: if mjd < 0: mjd += current_mjd return ((mjd, ), mjd, mjd) if has_mjd_range: mjd_start = mjd_start or earliest_mjd if mjd_start < 0: mjd_start += current_mjd mjd_end = mjd_end or current_mjd if mjd_end < 0: mjd_end += current_mjd return (range(mjd_start, 1 + mjd_end), mjd_start, mjd_end) if date is not None: mjd = datetime_to_mjd(date) return ((mjd, ), mjd, mjd) if has_date_range: mjd_start = earliest_mjd if date_start is None else datetime_to_mjd(date_start) mjd_end = current_mjd if date_end is None else datetime_to_mjd(date_end) return (range(mjd_start, 1 + mjd_end), mjd_start, mjd_end) raise RuntimeError("Should not be able to get here")