Source code for ska_oso_pdm._shared.target

"""
The ska_oso_pdm._shared.target module defines a Python representation of the
target of the observation.
"""

import operator
from abc import ABC, abstractmethod
from typing import Any, List, Literal, Tuple, Union
from warnings import warn

from astropy import units
from astropy.coordinates import SkyCoord, get_body
from astropy.time import Time
from pydantic import (
    AfterValidator,
    ConfigDict,
    Discriminator,
    Field,
    Tag,
    WithJsonSchema,
    field_validator,
    model_validator,
)
from typing_extensions import Annotated

from ska_oso_pdm._shared import (
    AstropyQuantity,
    PdmObject,
    TargetID,
    TerseStrEnum,
    UnitHelpers,
)
from ska_oso_pdm._shared.custom_types import AstropyUnit
from ska_oso_pdm._shared.custom_types import Quantity as PydanticQuantityAnnotations


def _deprecation(old_name, new_name):
    msg = f"please use {new_name}() instead. {old_name}() will be removed in a future release."
    warn(msg, DeprecationWarning, stacklevel=2)


[docs] class RadialVelocityDefinition(TerseStrEnum): """ Enumeration of reference definitions supported by a RadialVelocity. The sky frequency (ν) at which we must observe a spectral line is derived from the rest frequency of the spectral line (ν₀), the line-of-sight velocity of the source (V), and the speed of light (c). The relativistic velocity, or true line-of-sight velocity, is related to the observed and rest frequencies by V= c * (ν₀²− ν²) / (v₀² + v²) This equation is a bit cumbersome to use; in astronomy two different approximations are typically used: Optical Velocity: Voptical = c * (λ − λ₀) / λ₀ = cz (z is the redshift of the source; λ and λ₀ are the corresponding observed and rest wavelengths, respectively) Radio Velocity: Vradio = c * (ν₀ − ν) / v₀ = c * (λ−λ₀) / λ The radio and optical velocities are not identical. Particularly, Voptical and Vradio diverge for large velocities. Optical velocities are commonly used for (Helio/Barycentric) extragalactic observations; (LSRK) radio velocities are typical for Galactic observations. Taken from https://science.nrao.edu/facilities/vla/docs/manuals/obsguide/modes/line """ OPTICAL = "OPTICAL" RADIO = "RADIO" RELATIVISTIC = "RELATIVISTIC"
[docs] class RadialVelocityReferenceFrame(TerseStrEnum): """ Reference frame in which the radial velocity is defined. The Earth rotates, revolves around the Sun, rotates around the Galaxy, moves within the Local Group, and shows motion against the Cosmic Microwave Background. As for the convention above, any source velocity must therefore also always be specified relative to a reference frame. Various velocity rest frames are used in the literature. The following table lists their name, the motion that is corrected for, and the maximum amplitude of the velocity correction. Each rest frame correction is incremental to the preceding row: Velocity Rest Frame Correct for + max correction (km/s) =========================== ====================================== ====================================== Topocentric Telescope Nothing (0) Geocentric Earth Center Earth rotation (0.5) Earth-Moon Barycentric Earth+Moon center of mass Motion around Earth+Moon center of mass (0.013) Heliocentric Center of the Sun Earth orbital motion (30) Barycentric Earth+Sun center of mass Earth+Sun center of mass (0.012) Local Standard of Rest Center of Mass of local stars Solar motion relative to nearby stars (20) Galactocentric Center of Milky Way Milky Way Rotation (230) Local Group Barycentric Local Group center of mass Milky Way Motion (100) Virgocentric Center of the Local Virgo supercluster Local Group motion (300) Cosmic Microwave Background CMB Local Supercluster Motion (600) The velocity frame should be chosen based on the science. For most observations, however, one of the following three reference frames is commonly used: - Topocentric is the reference frame of the observatory (defining the sky frequency of the observations). Visibilities in a measurement set are typically stored in this frame. - Local Standard of Rest is the native output of images in CASA. Note that there are two varieties of LSR: the kinematic LSR (LSRK) and the dynamic (LSRD) definitions for the kinematic and dynamic centers, respectively. In almost all cases LSRK is being used and the less precise name LSR is usually used synonymously with the more modern LSRK definition. - Barycentric is a commonly used frame that has virtually replaced the older Heliocentric standard. Given the small difference between the Barycentric and Heliocentric frames, they are frequently used interchangeably. Taken from https://science.nrao.edu/facilities/vla/docs/manuals/obsguide/modes/line """ TOPOCENTRIC = "TOPOCENTRIC" LSRK = "LSRK" BARYCENTRIC = "BARYCENTRIC"
[docs] class RadialVelocityUnits(TerseStrEnum): """ Units for radial velocities. """ # Values MUST be str of an AstroPy unit. M_PER_SEC = "m / s" KM_PER_SEC = "km / s"
RadialVelocityUnitType = Annotated[ AstropyUnit, AfterValidator(UnitHelpers.constrain_unit_to(RadialVelocityUnits)), WithJsonSchema(UnitHelpers.enum_jsonschema(RadialVelocityUnits)), ]
[docs] class RadialVelocityQuantity(PydanticQuantityAnnotations): model_config = ConfigDict(json_schema_extra={"title": "RadialVelocityQuantity"}) unit: RadialVelocityUnitType = units.Unit(RadialVelocityUnits.KM_PER_SEC)
RadialVelocityQuantityType = Annotated[ AstropyQuantity, RadialVelocityQuantity, AfterValidator(UnitHelpers.constrain_unit_to(RadialVelocityUnits)), ]
[docs] class RadialVelocity(PdmObject): """ Radial velocity measures the line-of-sight velocity of an astronomical source. In principle, the radial velocity can be converted to and from the target redshift. However, these values are persisted separately to give the user the option of inputting either value. A velocity must also define the reference frame and definition that are applicable to the velocity. By default, these have values of: - definition = RADIO - reference_frame = LSRK - redshift = 0.0 """ quantity: RadialVelocityQuantityType = units.Quantity( value=0.0, unit=RadialVelocityUnits.KM_PER_SEC ) definition: RadialVelocityDefinition = RadialVelocityDefinition.RADIO reference_frame: RadialVelocityReferenceFrame = RadialVelocityReferenceFrame.LSRK redshift: float = 0.0
[docs] class PointingKind(TerseStrEnum): FIVE_POINT = "FivePointParameters" SINGLE_POINT = "SinglePointParameters" CROSS_SCAN = "CrossScanParameters" RASTER = "RasterParameters" STAR_RASTER = "StarRasterParameters" SPIRAL = "SpiralParameters" POINTED_MOSAIC = "PointedMosaicParameters"
[docs] class PointingPatternParameters(PdmObject, ABC): """ PointingPatternParameters is an abstract base class extended by classes that define receptor pointing patterns. """
[docs] class FivePointParameters(PointingPatternParameters): """ FivePointParameters defines the properties of an observing pattern that uses a five-point observing pattern centred on a reference position. """ kind: Literal[PointingKind.FIVE_POINT] = PointingKind.FIVE_POINT offset_arcsec: float = 0.0
[docs] class CrossScanParameters(PointingPatternParameters): """ CrossScanParameters defines the properties of an observing pattern that uses a cross scan observing pattern, typically used for pointing calibrations. """ kind: Literal[PointingKind.CROSS_SCAN] = PointingKind.CROSS_SCAN offset_arcsec: float = 0.0
[docs] class SinglePointParameters(PointingPatternParameters): """ SinglePointParameters defines the properties for an observing pattern consisting of a single receptor pointing with an optional offset from the reference position. """ kind: Literal[PointingKind.SINGLE_POINT] = PointingKind.SINGLE_POINT offset_x_arcsec: float = 0.0 offset_y_arcsec: float = 0.0
[docs] class CoordinatesOffset(PdmObject): """ CoordinatesOffset defines a unitless pointing offset from a reference position. Units are expected to be provided in a separate field. """ x: float = 0.0 y: float = 0.0
[docs] class AngleUnits(TerseStrEnum): """ Enumeration of units used to measure angles. """ # Values MUST be str of an AstroPy angle unit. DEGREES = "deg" ARCMINUTES = "arcmin" ARCSECONDS = "arcsec"
[docs] class PointedMosaicParameters(PointingPatternParameters): """ PointedMosaicParameters defines the properties for an observing pattern consisting of a single receptor pointing with n optional offsets from the reference position. """ kind: Literal[PointingKind.POINTED_MOSAIC] = PointingKind.POINTED_MOSAIC offsets: list[CoordinatesOffset] = Field(default_factory=list) units: AngleUnits
[docs] class RasterParameters(PointingPatternParameters): """ RasterParameters defines the properties of an observing pattern that uses a raster pattern centred on a reference position. """ kind: Literal[PointingKind.RASTER] = PointingKind.RASTER row_length_arcsec: float = 0.0 row_offset_arcsec: float = 0.0 n_rows: int = 1 pa: float = 0.0 unidirectional: bool = False
[docs] class StarRasterParameters(PointingPatternParameters): """ StarRasterParameters defines the properties of an observing pattern that uses a star raster pattern centred on a reference position. """ kind: Literal[PointingKind.STAR_RASTER] = PointingKind.STAR_RASTER row_length_arcsec: float = 0.0 n_rows: int = 1 row_offset_angle: float = 0.0 unidirectional: bool = False
[docs] class SpiralParameters(PointingPatternParameters): """ SpiralParameters defines the properties of holography observations to support MID holography observing These parameters are accepted by the MeerKAT holography script but are not included in the trajectory JSON. * scan_extent: Diameter of beam pattern to measure, in degrees (default 10.0). * track_time: Extra time in seconds for scanning antennas to track when passing over the target (default 10.0) * cycle_track_time: Extra time or scanning antennas to track when passing over target (default 30.0) * slow_time: Time in seconds to slow down at start and end of each spiral arm (default 6.0) * sample_time: Time in seconds to spend on each sample point generated (default 0.25) * scan_speed: Scan speed in degrees per second (default 0.1) * slew_speed: Speed at which to slew in degrees per second, or if negative number then this multiplied by scan_speed (default -1.0) * twist_factor Spiral twist factor (0.0 for straight radial, 1.0 standard spiral) (default standard spiral = 1.0) * high_el_slowdown_factor: Factor by which to slow down nominal scanning speed at 90 degree elevation, linearly scaled from factor of 1 at 60 degrees elevation (default 2.0) """ kind: Literal[PointingKind.SPIRAL] = PointingKind.SPIRAL scan_extent: AstropyQuantity = units.Quantity(value=10.0, unit="deg") track_time: AstropyQuantity = units.Quantity(value=10.0, unit="s") cycle_track_time: AstropyQuantity = units.Quantity(value=30.0, unit="s") slow_time: AstropyQuantity = units.Quantity(value=6.0, unit="s") sample_time: AstropyQuantity = units.Quantity(value=0.25, unit="s") scan_speed: AstropyQuantity = units.Quantity(value=0.1, unit="deg / s") slew_speed: AstropyQuantity = units.Quantity(value=-1.0, unit="deg / s") twist_factor: float = 1.0 high_el_slowdown_factor: float = 2.0
PointingParametersUnion = Annotated[ Union[ FivePointParameters, CrossScanParameters, SinglePointParameters, RasterParameters, StarRasterParameters, SpiralParameters, PointedMosaicParameters, ], Field(discriminator="kind"), ]
[docs] class PointingPattern(PdmObject): """ PointingPattern holds the user-configured pointing patterns and current active pattern for receptor pointing patterns associated with a target. One of each pointing pattern type can be held in the parameters list. Only the active pattern will be used for observing; the remainder provide an easy way to recover previously edited observing parameters for the target. """ active: PointingKind = PointingKind.SINGLE_POINT parameters: list[PointingParametersUnion] = Field( default_factory=lambda: [SinglePointParameters()], min_length=1 ) @model_validator(mode="before") @classmethod def active_xor_parameters(cls, data: Any) -> Any: if (data.get("active") is None) ^ (data.get("parameters") is None): raise ValueError("Must provide active and parameters. Only one specified") return data @model_validator(mode="after") def active_must_be_among_params(self): parameter_kinds = {p.kind for p in self.parameters} # complain if active not in the given parameters or duplicate detected if self.active not in parameter_kinds: raise ValueError( f"Invalid pointing parameters state: active={self.active} parameters={self.parameters}" ) if len(parameter_kinds) != len(self.parameters): raise ValueError(f"Duplicate parameter types in input: {self.parameters}") return self def __eq__(self, other): if not isinstance(other, PointingPattern): return False self_parameters_by_kind = sorted( self.parameters, key=operator.attrgetter("kind") ) other_parameters_by_kind = sorted( other.parameters, key=operator.attrgetter("kind") ) return ( other.active == self.active and self_parameters_by_kind == other_parameters_by_kind )
[docs] class CoordinateKind(TerseStrEnum): EQUATORIAL = "equatorial" HORIZONTAL = "horizontal" SSO = "sso" # SolarSystem Object GALACTIC = "galactic" ICRS = "icrs" ALTAZ = "altaz" SPECIAL = "special"
[docs] class Coordinates(PdmObject, ABC): """ Coordinates is an abstract base class for pointing coordinates. """ @abstractmethod def to_sky_coord(self) -> SkyCoord: raise NotImplementedError
[docs] class EquatorialCoordinatesReferenceFrame(TerseStrEnum): """ Enumeration of reference frames supported by an EquatorialCoordinates """ ICRS = "icrs" FK5 = "fk5"
[docs] class EquatorialCoordinates(Coordinates): """ SiderealTarget represents the argument for SKA scheduling block. """ def __init__(self, *args, **kwargs): _deprecation("EquatorialCoordinates", "ICRSCoordinates") super().__init__(*args, **kwargs) kind: Literal[CoordinateKind.EQUATORIAL] = CoordinateKind.EQUATORIAL ra: float | str = 0.0 # str for degrees:minutes:seconds representation dec: float | str = 0.0 reference_frame: EquatorialCoordinatesReferenceFrame = ( EquatorialCoordinatesReferenceFrame.ICRS ) epoch: float = Field( default=2000.0, description=( "Epoch of proper motion, date when proper motion offset was zero. " "Note this is an astronomical epoch, not a Unix epoch timestamp" ), ) unit: Tuple[str, str] = ("hourangle", "deg") def to_sky_coord(self) -> SkyCoord: return SkyCoord( self.ra, self.dec, unit=self.unit, frame=self.reference_frame.value )
[docs] class HorizontalCoordinatesReferenceFrame(TerseStrEnum): """ Enumeration of reference frames supported by a HorizontalCoordinates. """ ALTAZ = "altaz"
[docs] class HorizontalCoordinates(Coordinates): def __init__(self, *args, **kwargs): _deprecation("HorizontalCoordinates", "AltAzCoordinates") super().__init__(*args, **kwargs) kind: Literal[CoordinateKind.ALTAZ, CoordinateKind.HORIZONTAL] = ( CoordinateKind.HORIZONTAL ) az: float el: float unit: Tuple[str, str] = ("deg", "deg") reference_frame: HorizontalCoordinatesReferenceFrame = ( HorizontalCoordinatesReferenceFrame.ALTAZ ) def to_sky_coord(self) -> SkyCoord: return SkyCoord( az=self.az, alt=self.el, unit=self.unit, frame=self.reference_frame.value )
[docs] class AltAzCoordinates(HorizontalCoordinates): kind: Literal[CoordinateKind.ALTAZ, CoordinateKind.HORIZONTAL] = ( CoordinateKind.ALTAZ ) def __init__(self, *args, **kwargs): # pylint: disable=W0231 # Call the grandparent init to bypass parent deprecation Coordinates.__init__(self, *args, **kwargs) # pylint: disable=W0233 # Backwards-compatibility @model_validator(mode="before") @classmethod def coerce_to_new_class(cls, data: Any) -> Any: if isinstance(data, HorizontalCoordinates): data = data.model_dump(mode="python") if isinstance(data, dict): if data.get("kind") == CoordinateKind.HORIZONTAL: data["kind"] = CoordinateKind.ALTAZ return data
[docs] class SolarSystemObjectName(TerseStrEnum): """ SolarSystemObjectName represents name of the solar system object. """ SUN = "Sun" MOON = "Moon" MERCURY = "Mercury" VENUS = "Venus" MARS = "Mars" JUPITER = "Jupiter" SATURN = "Saturn" URANUS = "Uranus" NEPTUNE = "Neptune"
[docs] class SolarSystemObject(Coordinates): """ Planet represents the argument for SKA scheduling block. """ def __init__(self, *args, **kwargs): _deprecation("SolarSystemObject", "SpecialCoordinates") super().__init__(*args, **kwargs) kind: Literal[CoordinateKind.SSO, CoordinateKind.SPECIAL] = CoordinateKind.SSO reference_frame: Literal["special"] = "special" name: SolarSystemObjectName def to_sky_coord(self) -> SkyCoord: return get_body(self.name.value, Time.now())
[docs] class SpecialCoordinates(SolarSystemObject): kind: Literal[CoordinateKind.SSO, CoordinateKind.SPECIAL] = CoordinateKind.SPECIAL def __init__(self, *args, **kwargs): # pylint: disable=W0231 # Call the grandparent init to bypass parent deprecation Coordinates.__init__(self, *args, **kwargs) # pylint: disable=W0233 # Backwards-compatibility @model_validator(mode="before") @classmethod def coerce_to_new_class(cls, data: Any) -> Any: if isinstance(data, SolarSystemObject): data = data.model_dump(mode="python") if isinstance(data, dict): if data.get("kind") == CoordinateKind.SSO: data["kind"] = CoordinateKind.SPECIAL return data
[docs] class GalacticCoordinates(Coordinates): """ Coordinates defined in a Galactic frame """ kind: Literal[CoordinateKind.GALACTIC] = CoordinateKind.GALACTIC l: float = Field(description="Galactic longitude in degrees") b: float = Field(description="Galactic latitude in degrees") pm_l: float = Field( default=0, description="Proper motion in the longitudinal direction, in mas/yr" ) pm_b: float = Field( default=0, description="Proper motion in the latitudinal direction, in mas/yr" ) epoch: float = Field( default=2000.0, description=( "Epoch of proper motion, date when proper motion offset was zero. " "Note this is an astronomical epoch, not a Unix epoch timestamp" ), ) parallax: float = 0 def to_sky_coord(self) -> SkyCoord: return SkyCoord( frame="galactic", l=self.l * units.Unit("deg"), b=self.b * units.Unit("deg"), pm_l_cosb=self.pm_l * units.Unit("mas/yr"), pm_b=self.pm_b * units.Unit("mas/yr"), )
class _ICRSCoordBase(Coordinates): ra_str: str dec_str: str pm_ra: float | None = 0.0 pm_dec: float | None = 0.0 parallax: float | None = 0.0 epoch: float = Field( default=2000.0, description=( "Epoch of proper motion, date when proper motion offset was zero. " "Note this is an astronomical epoch, not a Unix epoch timestamp" ), ) def to_sky_coord(self) -> SkyCoord: return SkyCoord( self.ra_str, self.dec_str, unit=("hourangle", "deg"), pm_ra_cosdec=self.pm_ra * units.Unit("mas/yr"), pm_dec=self.pm_dec * units.Unit("mas/yr"), frame=EquatorialCoordinatesReferenceFrame.ICRS, )
[docs] class EquatorialCoordinatesPST(_ICRSCoordBase): def __init__(self, *args, **kwargs): _deprecation("EquatorialCoordinatesPST", "ICRSCoordinates") super().__init__(*args, **kwargs) target_id: str reference_frame: EquatorialCoordinatesReferenceFrame = ( EquatorialCoordinatesReferenceFrame.ICRS )
[docs] class ICRSCoordinates(_ICRSCoordBase): kind: Literal[CoordinateKind.ICRS] = CoordinateKind.ICRS reference_frame: Literal[EquatorialCoordinatesReferenceFrame.ICRS] = ( EquatorialCoordinatesReferenceFrame.ICRS ) # Backwards-compatibility @field_validator("kind", mode="before") @classmethod def accept_legacy_value(cls, given: Any) -> CoordinateKind: as_enum = CoordinateKind(given) if as_enum == CoordinateKind.EQUATORIAL: return CoordinateKind.ICRS return as_enum
compatibility_map = { # Map from deprecated => new ADR-63 kinds. CoordinateKind.HORIZONTAL: CoordinateKind.ALTAZ, CoordinateKind.SSO: CoordinateKind.SPECIAL, # These ones are the same... CoordinateKind.ICRS: CoordinateKind.ICRS, CoordinateKind.ALTAZ: CoordinateKind.ALTAZ, CoordinateKind.SPECIAL: CoordinateKind.SPECIAL, CoordinateKind.GALACTIC: CoordinateKind.GALACTIC, # This one will change, once we add a shim CoordinateKind.EQUATORIAL: CoordinateKind.EQUATORIAL, } # https://docs.pydantic.dev/latest/concepts/unions/#discriminated-unions-with-callable-discriminator # Only needed to support backwards-compatibility. # Replace with field discriminator on 'kind' def get_coord_kind(val: Any) -> CoordinateKind | None: if isinstance(val, dict): given_value = val.get("kind", None) else: given_value = getattr(val, "kind", None) if given_value: return compatibility_map[given_value] # The Tag() can be removed once legacy class names are dropped. CoordinatesUnion = Annotated[ Union[ Annotated[SpecialCoordinates, Tag(CoordinateKind.SPECIAL)], Annotated[GalacticCoordinates, Tag(CoordinateKind.GALACTIC)], Annotated[AltAzCoordinates, Tag(CoordinateKind.ALTAZ)], Annotated[ICRSCoordinates, Tag(CoordinateKind.ICRS)], Annotated[EquatorialCoordinates, Tag(CoordinateKind.EQUATORIAL)], Annotated[SolarSystemObject, Tag(CoordinateKind.SSO)], Annotated[HorizontalCoordinates, Tag(CoordinateKind.HORIZONTAL)], ], Discriminator(get_coord_kind), ]
[docs] class Beam(PdmObject): """ Tied Array Beam arguments. Args: beam_id (int): integer identifying this tied-array beam. It should be unique within each tied-array-beam class beam_name (str): string beam_coordinate (EquatorialCoordinatesPST): position on sky at which beam will be formed. It should lie within the HPBW of the station beam stn_weights (List(float)): array of weights, one for each station. """ beam_id: int beam_name: str | None = None beam_coordinate: EquatorialCoordinatesPST | CoordinatesUnion stn_weights: List[float] = Field(default_factory=list)
[docs] class TiedArrayBeams(PdmObject): """ Tied Array Beam argument lists. """ pst_beams: List[Beam] = Field(default_factory=list) pss_beams: List[Beam] = Field(default_factory=list) vlbi_beams: List[Beam] = Field(default_factory=list)
[docs] class Target(PdmObject): """ Target represents the receptor pointing for an SKA observation, consisting of a reference position and a pointing pattern to be used when observing the target. Default pointing patterns and equatorial coordinates will be set if not provided. """ target_id: TargetID = "" name: str = "" pointing_pattern: PointingPattern = Field(default_factory=PointingPattern) reference_coordinate: CoordinatesUnion = Field( default_factory=EquatorialCoordinates, ) radial_velocity: RadialVelocity = Field(default_factory=RadialVelocity) tied_array_beams: TiedArrayBeams | None = Field(default_factory=TiedArrayBeams) def __str__(self): # pylint: disable=no-member, useless-suppression kind = { "CrossScanParameters": "cross scan", "FivePointParameters": "five-point", "RasterParameters": "raster", "SinglePointParameters": "single point", "SpiralParameters": "spiral", "StarRasterParameters": "star raster", "PointingMosaicParameters": "pointed mosaic", }.get(self.pointing_pattern.active, self.pointing_pattern.active) return f"<Target={self.target_id} | {kind} on {self.reference_coordinate.to_sky_coord().to_string()}>"