"""
The ska_oso_pdm._shared.target module defines a Python representation of the
target of the observation.
"""
import operator
from abc import ABC, abstractmethod
from typing import Any, List, Literal, Tuple, Union
from warnings import warn
from astropy import units
from astropy.coordinates import SkyCoord, get_body
from astropy.time import Time
from pydantic import (
AfterValidator,
ConfigDict,
Discriminator,
Field,
Tag,
WithJsonSchema,
field_validator,
model_validator,
)
from typing_extensions import Annotated
from ska_oso_pdm._shared import (
AstropyQuantity,
PdmObject,
TargetID,
TerseStrEnum,
UnitHelpers,
)
from ska_oso_pdm._shared.custom_types import AstropyUnit
from ska_oso_pdm._shared.custom_types import Quantity as PydanticQuantityAnnotations
def _deprecation(old_name, new_name):
msg = f"please use {new_name}() instead. {old_name}() will be removed in a future release."
warn(msg, DeprecationWarning, stacklevel=2)
[docs]
class RadialVelocityDefinition(TerseStrEnum):
"""
Enumeration of reference definitions supported by a RadialVelocity.
The sky frequency (ν) at which we must observe a spectral line is derived
from the rest frequency of the spectral line (ν₀), the line-of-sight
velocity of the source (V), and the speed of light (c). The relativistic
velocity, or true line-of-sight velocity, is related to the observed and
rest frequencies by
V= c * (ν₀²− ν²) / (v₀² + v²)
This equation is a bit cumbersome to use; in astronomy two different
approximations are typically used:
Optical Velocity:
Voptical = c * (λ − λ₀) / λ₀ = cz
(z is the redshift of the source; λ and λ₀ are the corresponding observed
and rest wavelengths, respectively)
Radio Velocity:
Vradio = c * (ν₀ − ν) / v₀ = c * (λ−λ₀) / λ
The radio and optical velocities are not identical. Particularly, Voptical
and Vradio diverge for large velocities. Optical velocities are commonly
used for (Helio/Barycentric) extragalactic observations; (LSRK) radio
velocities are typical for Galactic observations.
Taken from https://science.nrao.edu/facilities/vla/docs/manuals/obsguide/modes/line
"""
OPTICAL = "OPTICAL"
RADIO = "RADIO"
RELATIVISTIC = "RELATIVISTIC"
[docs]
class RadialVelocityReferenceFrame(TerseStrEnum):
"""
Reference frame in which the radial velocity is defined.
The Earth rotates, revolves around the Sun, rotates around the Galaxy,
moves within the Local Group, and shows motion against the Cosmic
Microwave Background. As for the convention above, any source velocity
must therefore also always be specified relative to a reference frame.
Various velocity rest frames are used in the literature. The following
table lists their name, the motion that is corrected for, and the maximum
amplitude of the velocity correction. Each rest frame correction is
incremental to the preceding row:
Velocity Rest Frame Correct for + max correction (km/s)
=========================== ====================================== ======================================
Topocentric Telescope Nothing (0)
Geocentric Earth Center Earth rotation (0.5)
Earth-Moon Barycentric Earth+Moon center of mass Motion around Earth+Moon center of mass (0.013)
Heliocentric Center of the Sun Earth orbital motion (30)
Barycentric Earth+Sun center of mass Earth+Sun center of mass (0.012)
Local Standard of Rest Center of Mass of local stars Solar motion relative to nearby stars (20)
Galactocentric Center of Milky Way Milky Way Rotation (230)
Local Group Barycentric Local Group center of mass Milky Way Motion (100)
Virgocentric Center of the Local Virgo supercluster Local Group motion (300)
Cosmic Microwave Background CMB Local Supercluster Motion (600)
The velocity frame should be chosen based on the science. For most
observations, however, one of the following three reference frames is
commonly used:
- Topocentric is the reference frame of the observatory (defining the sky
frequency of the observations). Visibilities in a measurement set are
typically stored in this frame.
- Local Standard of Rest is the native output of images in CASA. Note that
there are two varieties of LSR: the kinematic LSR (LSRK) and the dynamic
(LSRD) definitions for the kinematic and dynamic centers, respectively.
In almost all cases LSRK is being used and the less precise name LSR is
usually used synonymously with the more modern LSRK definition.
- Barycentric is a commonly used frame that has virtually replaced the
older Heliocentric standard. Given the small difference between the
Barycentric and Heliocentric frames, they are frequently used
interchangeably.
Taken from https://science.nrao.edu/facilities/vla/docs/manuals/obsguide/modes/line
"""
TOPOCENTRIC = "TOPOCENTRIC"
LSRK = "LSRK"
BARYCENTRIC = "BARYCENTRIC"
[docs]
class RadialVelocityUnits(TerseStrEnum):
"""
Units for radial velocities.
"""
# Values MUST be str of an AstroPy unit.
M_PER_SEC = "m / s"
KM_PER_SEC = "km / s"
RadialVelocityUnitType = Annotated[
AstropyUnit,
AfterValidator(UnitHelpers.constrain_unit_to(RadialVelocityUnits)),
WithJsonSchema(UnitHelpers.enum_jsonschema(RadialVelocityUnits)),
]
[docs]
class RadialVelocityQuantity(PydanticQuantityAnnotations):
model_config = ConfigDict(json_schema_extra={"title": "RadialVelocityQuantity"})
unit: RadialVelocityUnitType = units.Unit(RadialVelocityUnits.KM_PER_SEC)
RadialVelocityQuantityType = Annotated[
AstropyQuantity,
RadialVelocityQuantity,
AfterValidator(UnitHelpers.constrain_unit_to(RadialVelocityUnits)),
]
[docs]
class RadialVelocity(PdmObject):
"""
Radial velocity measures the line-of-sight velocity of an astronomical
source.
In principle, the radial velocity can be converted to and from the target
redshift. However, these values are persisted separately to give the user
the option of inputting either value.
A velocity must also define the reference frame and definition that are
applicable to the velocity. By default, these have values of:
- definition = RADIO
- reference_frame = LSRK
- redshift = 0.0
"""
quantity: RadialVelocityQuantityType = units.Quantity(
value=0.0, unit=RadialVelocityUnits.KM_PER_SEC
)
definition: RadialVelocityDefinition = RadialVelocityDefinition.RADIO
reference_frame: RadialVelocityReferenceFrame = RadialVelocityReferenceFrame.LSRK
redshift: float = 0.0
[docs]
class PointingKind(TerseStrEnum):
FIVE_POINT = "FivePointParameters"
SINGLE_POINT = "SinglePointParameters"
CROSS_SCAN = "CrossScanParameters"
RASTER = "RasterParameters"
STAR_RASTER = "StarRasterParameters"
SPIRAL = "SpiralParameters"
POINTED_MOSAIC = "PointedMosaicParameters"
[docs]
class PointingPatternParameters(PdmObject, ABC):
"""
PointingPatternParameters is an abstract base class extended by classes
that define receptor pointing patterns.
"""
[docs]
class FivePointParameters(PointingPatternParameters):
"""
FivePointParameters defines the properties of an observing pattern that
uses a five-point observing pattern centred on a reference position.
"""
kind: Literal[PointingKind.FIVE_POINT] = PointingKind.FIVE_POINT
offset_arcsec: float = 0.0
[docs]
class CrossScanParameters(PointingPatternParameters):
"""
CrossScanParameters defines the properties of an observing pattern that
uses a cross scan observing pattern, typically used for pointing
calibrations.
"""
kind: Literal[PointingKind.CROSS_SCAN] = PointingKind.CROSS_SCAN
offset_arcsec: float = 0.0
[docs]
class SinglePointParameters(PointingPatternParameters):
"""
SinglePointParameters defines the properties for an observing pattern
consisting of a single receptor pointing with an optional offset from
the reference position.
"""
kind: Literal[PointingKind.SINGLE_POINT] = PointingKind.SINGLE_POINT
offset_x_arcsec: float = 0.0
offset_y_arcsec: float = 0.0
[docs]
class CoordinatesOffset(PdmObject):
"""
CoordinatesOffset defines a unitless pointing offset from a reference
position. Units are expected to be provided in a separate field.
"""
x: float = 0.0
y: float = 0.0
[docs]
class AngleUnits(TerseStrEnum):
"""
Enumeration of units used to measure angles.
"""
# Values MUST be str of an AstroPy angle unit.
DEGREES = "deg"
ARCMINUTES = "arcmin"
ARCSECONDS = "arcsec"
[docs]
class PointedMosaicParameters(PointingPatternParameters):
"""
PointedMosaicParameters defines the properties for an observing pattern
consisting of a single receptor pointing with n optional offsets from
the reference position.
"""
kind: Literal[PointingKind.POINTED_MOSAIC] = PointingKind.POINTED_MOSAIC
offsets: list[CoordinatesOffset] = Field(default_factory=list)
units: AngleUnits
[docs]
class RasterParameters(PointingPatternParameters):
"""
RasterParameters defines the properties of an observing pattern that
uses a raster pattern centred on a reference position.
"""
kind: Literal[PointingKind.RASTER] = PointingKind.RASTER
row_length_arcsec: float = 0.0
row_offset_arcsec: float = 0.0
n_rows: int = 1
pa: float = 0.0
unidirectional: bool = False
[docs]
class StarRasterParameters(PointingPatternParameters):
"""
StarRasterParameters defines the properties of an observing pattern that
uses a star raster pattern centred on a reference position.
"""
kind: Literal[PointingKind.STAR_RASTER] = PointingKind.STAR_RASTER
row_length_arcsec: float = 0.0
n_rows: int = 1
row_offset_angle: float = 0.0
unidirectional: bool = False
[docs]
class SpiralParameters(PointingPatternParameters):
"""
SpiralParameters defines the properties of holography observations
to support MID holography observing
These parameters are accepted by the MeerKAT holography script but are not
included in the trajectory JSON.
* scan_extent: Diameter of beam pattern to measure, in degrees (default 10.0).
* track_time: Extra time in seconds for scanning antennas to track when
passing over the target (default 10.0)
* cycle_track_time: Extra time or scanning antennas to track when passing
over target (default 30.0)
* slow_time: Time in seconds to slow down at start and end of each spiral arm
(default 6.0)
* sample_time: Time in seconds to spend on each sample point generated
(default 0.25)
* scan_speed: Scan speed in degrees per second (default 0.1)
* slew_speed: Speed at which to slew in degrees per second, or if negative number
then this multiplied by scan_speed (default -1.0)
* twist_factor Spiral twist factor (0.0 for straight radial, 1.0 standard spiral)
(default standard spiral = 1.0)
* high_el_slowdown_factor: Factor by which to slow down nominal scanning speed at
90 degree elevation, linearly scaled from factor of 1 at 60 degrees elevation
(default 2.0)
"""
kind: Literal[PointingKind.SPIRAL] = PointingKind.SPIRAL
scan_extent: AstropyQuantity = units.Quantity(value=10.0, unit="deg")
track_time: AstropyQuantity = units.Quantity(value=10.0, unit="s")
cycle_track_time: AstropyQuantity = units.Quantity(value=30.0, unit="s")
slow_time: AstropyQuantity = units.Quantity(value=6.0, unit="s")
sample_time: AstropyQuantity = units.Quantity(value=0.25, unit="s")
scan_speed: AstropyQuantity = units.Quantity(value=0.1, unit="deg / s")
slew_speed: AstropyQuantity = units.Quantity(value=-1.0, unit="deg / s")
twist_factor: float = 1.0
high_el_slowdown_factor: float = 2.0
PointingParametersUnion = Annotated[
Union[
FivePointParameters,
CrossScanParameters,
SinglePointParameters,
RasterParameters,
StarRasterParameters,
SpiralParameters,
PointedMosaicParameters,
],
Field(discriminator="kind"),
]
[docs]
class PointingPattern(PdmObject):
"""
PointingPattern holds the user-configured pointing patterns and current active
pattern for receptor pointing patterns associated with a target.
One of each pointing pattern type can be held in the parameters list. Only the
active pattern will be used for observing; the remainder provide an easy way to
recover previously edited observing parameters for the target.
"""
active: PointingKind = PointingKind.SINGLE_POINT
parameters: list[PointingParametersUnion] = Field(
default_factory=lambda: [SinglePointParameters()], min_length=1
)
@model_validator(mode="before")
@classmethod
def active_xor_parameters(cls, data: Any) -> Any:
if (data.get("active") is None) ^ (data.get("parameters") is None):
raise ValueError("Must provide active and parameters. Only one specified")
return data
@model_validator(mode="after")
def active_must_be_among_params(self):
parameter_kinds = {p.kind for p in self.parameters}
# complain if active not in the given parameters or duplicate detected
if self.active not in parameter_kinds:
raise ValueError(
f"Invalid pointing parameters state: active={self.active} parameters={self.parameters}"
)
if len(parameter_kinds) != len(self.parameters):
raise ValueError(f"Duplicate parameter types in input: {self.parameters}")
return self
def __eq__(self, other):
if not isinstance(other, PointingPattern):
return False
self_parameters_by_kind = sorted(
self.parameters, key=operator.attrgetter("kind")
)
other_parameters_by_kind = sorted(
other.parameters, key=operator.attrgetter("kind")
)
return (
other.active == self.active
and self_parameters_by_kind == other_parameters_by_kind
)
[docs]
class CoordinateKind(TerseStrEnum):
EQUATORIAL = "equatorial"
HORIZONTAL = "horizontal"
SSO = "sso" # SolarSystem Object
GALACTIC = "galactic"
ICRS = "icrs"
ALTAZ = "altaz"
SPECIAL = "special"
[docs]
class Coordinates(PdmObject, ABC):
"""
Coordinates is an abstract base class for pointing coordinates.
"""
@abstractmethod
def to_sky_coord(self) -> SkyCoord:
raise NotImplementedError
[docs]
class EquatorialCoordinatesReferenceFrame(TerseStrEnum):
"""
Enumeration of reference frames supported by an EquatorialCoordinates
"""
ICRS = "icrs"
FK5 = "fk5"
[docs]
class EquatorialCoordinates(Coordinates):
"""
SiderealTarget represents the argument for SKA scheduling block.
"""
def __init__(self, *args, **kwargs):
_deprecation("EquatorialCoordinates", "ICRSCoordinates")
super().__init__(*args, **kwargs)
kind: Literal[CoordinateKind.EQUATORIAL] = CoordinateKind.EQUATORIAL
ra: float | str = 0.0 # str for degrees:minutes:seconds representation
dec: float | str = 0.0
reference_frame: EquatorialCoordinatesReferenceFrame = (
EquatorialCoordinatesReferenceFrame.ICRS
)
epoch: float = Field(
default=2000.0,
description=(
"Epoch of proper motion, date when proper motion offset was zero. "
"Note this is an astronomical epoch, not a Unix epoch timestamp"
),
)
unit: Tuple[str, str] = ("hourangle", "deg")
def to_sky_coord(self) -> SkyCoord:
return SkyCoord(
self.ra, self.dec, unit=self.unit, frame=self.reference_frame.value
)
[docs]
class HorizontalCoordinatesReferenceFrame(TerseStrEnum):
"""
Enumeration of reference frames supported by a HorizontalCoordinates.
"""
ALTAZ = "altaz"
[docs]
class HorizontalCoordinates(Coordinates):
def __init__(self, *args, **kwargs):
_deprecation("HorizontalCoordinates", "AltAzCoordinates")
super().__init__(*args, **kwargs)
kind: Literal[CoordinateKind.ALTAZ, CoordinateKind.HORIZONTAL] = (
CoordinateKind.HORIZONTAL
)
az: float
el: float
unit: Tuple[str, str] = ("deg", "deg")
reference_frame: HorizontalCoordinatesReferenceFrame = (
HorizontalCoordinatesReferenceFrame.ALTAZ
)
def to_sky_coord(self) -> SkyCoord:
return SkyCoord(
az=self.az, alt=self.el, unit=self.unit, frame=self.reference_frame.value
)
[docs]
class AltAzCoordinates(HorizontalCoordinates):
kind: Literal[CoordinateKind.ALTAZ, CoordinateKind.HORIZONTAL] = (
CoordinateKind.ALTAZ
)
def __init__(self, *args, **kwargs): # pylint: disable=W0231
# Call the grandparent init to bypass parent deprecation
Coordinates.__init__(self, *args, **kwargs) # pylint: disable=W0233
# Backwards-compatibility
@model_validator(mode="before")
@classmethod
def coerce_to_new_class(cls, data: Any) -> Any:
if isinstance(data, HorizontalCoordinates):
data = data.model_dump(mode="python")
if isinstance(data, dict):
if data.get("kind") == CoordinateKind.HORIZONTAL:
data["kind"] = CoordinateKind.ALTAZ
return data
[docs]
class SolarSystemObjectName(TerseStrEnum):
"""
SolarSystemObjectName represents name of the solar system object.
"""
SUN = "Sun"
MOON = "Moon"
MERCURY = "Mercury"
VENUS = "Venus"
MARS = "Mars"
JUPITER = "Jupiter"
SATURN = "Saturn"
URANUS = "Uranus"
NEPTUNE = "Neptune"
[docs]
class SolarSystemObject(Coordinates):
"""
Planet represents the argument for SKA scheduling block.
"""
def __init__(self, *args, **kwargs):
_deprecation("SolarSystemObject", "SpecialCoordinates")
super().__init__(*args, **kwargs)
kind: Literal[CoordinateKind.SSO, CoordinateKind.SPECIAL] = CoordinateKind.SSO
reference_frame: Literal["special"] = "special"
name: SolarSystemObjectName
def to_sky_coord(self) -> SkyCoord:
return get_body(self.name.value, Time.now())
[docs]
class SpecialCoordinates(SolarSystemObject):
kind: Literal[CoordinateKind.SSO, CoordinateKind.SPECIAL] = CoordinateKind.SPECIAL
def __init__(self, *args, **kwargs): # pylint: disable=W0231
# Call the grandparent init to bypass parent deprecation
Coordinates.__init__(self, *args, **kwargs) # pylint: disable=W0233
# Backwards-compatibility
@model_validator(mode="before")
@classmethod
def coerce_to_new_class(cls, data: Any) -> Any:
if isinstance(data, SolarSystemObject):
data = data.model_dump(mode="python")
if isinstance(data, dict):
if data.get("kind") == CoordinateKind.SSO:
data["kind"] = CoordinateKind.SPECIAL
return data
[docs]
class GalacticCoordinates(Coordinates):
"""
Coordinates defined in a Galactic frame
"""
kind: Literal[CoordinateKind.GALACTIC] = CoordinateKind.GALACTIC
l: float = Field(description="Galactic longitude in degrees")
b: float = Field(description="Galactic latitude in degrees")
pm_l: float = Field(
default=0, description="Proper motion in the longitudinal direction, in mas/yr"
)
pm_b: float = Field(
default=0, description="Proper motion in the latitudinal direction, in mas/yr"
)
epoch: float = Field(
default=2000.0,
description=(
"Epoch of proper motion, date when proper motion offset was zero. "
"Note this is an astronomical epoch, not a Unix epoch timestamp"
),
)
parallax: float = 0
def to_sky_coord(self) -> SkyCoord:
return SkyCoord(
frame="galactic",
l=self.l * units.Unit("deg"),
b=self.b * units.Unit("deg"),
pm_l_cosb=self.pm_l * units.Unit("mas/yr"),
pm_b=self.pm_b * units.Unit("mas/yr"),
)
class _ICRSCoordBase(Coordinates):
ra_str: str
dec_str: str
pm_ra: float | None = 0.0
pm_dec: float | None = 0.0
parallax: float | None = 0.0
epoch: float = Field(
default=2000.0,
description=(
"Epoch of proper motion, date when proper motion offset was zero. "
"Note this is an astronomical epoch, not a Unix epoch timestamp"
),
)
def to_sky_coord(self) -> SkyCoord:
return SkyCoord(
self.ra_str,
self.dec_str,
unit=("hourangle", "deg"),
pm_ra_cosdec=self.pm_ra * units.Unit("mas/yr"),
pm_dec=self.pm_dec * units.Unit("mas/yr"),
frame=EquatorialCoordinatesReferenceFrame.ICRS,
)
[docs]
class EquatorialCoordinatesPST(_ICRSCoordBase):
def __init__(self, *args, **kwargs):
_deprecation("EquatorialCoordinatesPST", "ICRSCoordinates")
super().__init__(*args, **kwargs)
target_id: str
reference_frame: EquatorialCoordinatesReferenceFrame = (
EquatorialCoordinatesReferenceFrame.ICRS
)
[docs]
class ICRSCoordinates(_ICRSCoordBase):
kind: Literal[CoordinateKind.ICRS] = CoordinateKind.ICRS
reference_frame: Literal[EquatorialCoordinatesReferenceFrame.ICRS] = (
EquatorialCoordinatesReferenceFrame.ICRS
)
# Backwards-compatibility
@field_validator("kind", mode="before")
@classmethod
def accept_legacy_value(cls, given: Any) -> CoordinateKind:
as_enum = CoordinateKind(given)
if as_enum == CoordinateKind.EQUATORIAL:
return CoordinateKind.ICRS
return as_enum
compatibility_map = {
# Map from deprecated => new ADR-63 kinds.
CoordinateKind.HORIZONTAL: CoordinateKind.ALTAZ,
CoordinateKind.SSO: CoordinateKind.SPECIAL,
# These ones are the same...
CoordinateKind.ICRS: CoordinateKind.ICRS,
CoordinateKind.ALTAZ: CoordinateKind.ALTAZ,
CoordinateKind.SPECIAL: CoordinateKind.SPECIAL,
CoordinateKind.GALACTIC: CoordinateKind.GALACTIC,
# This one will change, once we add a shim
CoordinateKind.EQUATORIAL: CoordinateKind.EQUATORIAL,
}
# https://docs.pydantic.dev/latest/concepts/unions/#discriminated-unions-with-callable-discriminator
# Only needed to support backwards-compatibility.
# Replace with field discriminator on 'kind'
def get_coord_kind(val: Any) -> CoordinateKind | None:
if isinstance(val, dict):
given_value = val.get("kind", None)
else:
given_value = getattr(val, "kind", None)
if given_value:
return compatibility_map[given_value]
# The Tag() can be removed once legacy class names are dropped.
CoordinatesUnion = Annotated[
Union[
Annotated[SpecialCoordinates, Tag(CoordinateKind.SPECIAL)],
Annotated[GalacticCoordinates, Tag(CoordinateKind.GALACTIC)],
Annotated[AltAzCoordinates, Tag(CoordinateKind.ALTAZ)],
Annotated[ICRSCoordinates, Tag(CoordinateKind.ICRS)],
Annotated[EquatorialCoordinates, Tag(CoordinateKind.EQUATORIAL)],
Annotated[SolarSystemObject, Tag(CoordinateKind.SSO)],
Annotated[HorizontalCoordinates, Tag(CoordinateKind.HORIZONTAL)],
],
Discriminator(get_coord_kind),
]
[docs]
class Beam(PdmObject):
"""
Tied Array Beam arguments.
Args:
beam_id (int): integer identifying this tied-array beam. It should be unique within each tied-array-beam class
beam_name (str): string
beam_coordinate (EquatorialCoordinatesPST): position on sky at which beam will be formed. It should lie within the HPBW of the station beam
stn_weights (List(float)): array of weights, one for each station.
"""
beam_id: int
beam_name: str | None = None
beam_coordinate: EquatorialCoordinatesPST | CoordinatesUnion
stn_weights: List[float] = Field(default_factory=list)
[docs]
class TiedArrayBeams(PdmObject):
"""
Tied Array Beam argument lists.
"""
pst_beams: List[Beam] = Field(default_factory=list)
pss_beams: List[Beam] = Field(default_factory=list)
vlbi_beams: List[Beam] = Field(default_factory=list)
[docs]
class Target(PdmObject):
"""
Target represents the receptor pointing for an SKA observation, consisting
of a reference position and a pointing pattern to be used when observing
the target.
Default pointing patterns and equatorial coordinates will be set if not
provided.
"""
target_id: TargetID = ""
name: str = ""
pointing_pattern: PointingPattern = Field(default_factory=PointingPattern)
reference_coordinate: CoordinatesUnion = Field(
default_factory=EquatorialCoordinates,
)
radial_velocity: RadialVelocity = Field(default_factory=RadialVelocity)
tied_array_beams: TiedArrayBeams | None = Field(default_factory=TiedArrayBeams)
def __str__(self):
# pylint: disable=no-member, useless-suppression
kind = {
"CrossScanParameters": "cross scan",
"FivePointParameters": "five-point",
"RasterParameters": "raster",
"SinglePointParameters": "single point",
"SpiralParameters": "spiral",
"StarRasterParameters": "star raster",
"PointingMosaicParameters": "pointed mosaic",
}.get(self.pointing_pattern.active, self.pointing_pattern.active)
return f"<Target={self.target_id} | {kind} on {self.reference_coordinate.to_sky_coord().to_string()}>"