"""
loc.py - Localisation module for the das4whales package.
This module provides functions for localizing the source of a sound source recorded by a DAS array.
Author: Quentin Goestchel, Léa Bouffaut
Date: 2024-06-18/2025-03-05
"""
from __future__ import annotations
import sys
from typing import Dict, List, Tuple, Union, Optional, Any, NamedTuple
import numpy as np
from tqdm import tqdm
from das4whales.spatial import calc_das_section_bearing, calc_source_position_lat_lon, calc_dist_lat_lon
[docs]
class LocalizationResult(NamedTuple):
"""Container for localization results with uncertainty information."""
position: np.ndarray # [x, y, z, t0]
residuals: np.ndarray
rms: float
weighted_rms: float
covariance: np.ndarray
uncertainties: np.ndarray
weights: np.ndarray
n_iterations: int
[docs]
def calc_arrival_times(t0: Union[float, np.ndarray], cable_pos: np.ndarray, pos: Union[Tuple[np.ndarray, np.ndarray, float], Tuple[float, float, float]], c0: float) -> np.ndarray:
"""
Calculate theoretical arrival times of a whale call at a grid of positions or a single point.
Parameters
----------
t0 : float or np.ndarray
Initial time offset. Can be a scalar or an array of time offsets.
cable_pos : np.ndarray
Array of cable positions with shape (N, 3), where N is the number of channels.
Each row represents [x, y, z] coordinates of a cable position.
pos : tuple of np.ndarray or float
If a grid is used, provide a tuple (xg, yg, zg) with xg and yg as 2D arrays.
For a single point, provide (x, y, z) as floats or 1D arrays.
c0 : float
Speed of sound in water (in meters per second).
Returns
-------
th_arrtimes : np.ndarray
Theoretical arrival times at each grid point or point source.
Shape is (M, L, N) for a grid, or (N,) for a single point.
"""
# Extract cable positions
x_cable, y_cable, z_cable = cable_pos[:, 0], cable_pos[:, 1], cable_pos[:, 2]
# Check if pos is a grid, a flattened grid or a single point
if isinstance(pos[0], np.ndarray) and pos[0].ndim == 2:
# Grid case (np.meshgrid)
xg, yg, zg = pos
x_exp = xg[:, :, np.newaxis] # Shape (M, L, 1)
y_exp = yg[:, :, np.newaxis]
z_exp = zg # Scalar or array
# Calculate distances for grid
dist = np.sqrt((x_cable[np.newaxis, np.newaxis, :] - x_exp) ** 2 +
(y_cable[np.newaxis, np.newaxis, :] - y_exp) ** 2 +
(z_cable[np.newaxis, np.newaxis, :] - z_exp) ** 2)
elif isinstance(pos[0], np.ndarray) and pos[0].ndim == 1: #flattened grid case
# Flattened grid case
xg, yg, zg = pos
x_exp = xg[:, np.newaxis]
y_exp = yg[:, np.newaxis]
z_exp = zg
# Calculate distances for flattened grid
dist = np.sqrt((x_cable[np.newaxis, :] - x_exp) ** 2 +
(y_cable[np.newaxis, :] - y_exp) ** 2 +
(z_cable[np.newaxis, :] - z_exp) ** 2)
else:
# Single point case
x, y, z = pos
dist = np.sqrt((x_cable - x) ** 2 +
(y_cable - y) ** 2 +
(z_cable - z) ** 2)
# Calculate arrival times
th_arrtimes = t0 + dist / c0
return th_arrtimes
[docs]
def calc_theory_toa(das_position, whale_position, dist, c0=1490):
"""
Calculate theoretical Time of Arrival (TOA) for a whale call with known position relative to the cable.
Args:
das_position (dict): A dictionary containing latitude, longitude, and depth information of the cable (DAS) positions.
whale_position (dict): A dictionary containing at least whale apex, offset, side and depth
dist (numpy.ndarray): An array containing distances along the cable.
c0 (float, optional): The speed of sound in water in meters per second. Defaults to 1490 m/s.
Returns:
numpy.ndarray: An array containing the theoretical TOAs for each position along the cable.
"""
# Find the index of whale_apex_m in dist
ind_whale_apex = np.where(dist >= whale_position['apex'])[0][0]
# Get the bearing of the DAS cable around whale position
step = 3
das_bearing = calc_das_section_bearing(
das_position['lat'][ind_whale_apex - step],
das_position['lon'][ind_whale_apex - step],
das_position['lat'][ind_whale_apex + step],
das_position['lon'][ind_whale_apex + step])
# Get the whale position
whale_position['lat'], whale_position['lon'] = calc_source_position_lat_lon(
das_position['lat'][ind_whale_apex],
das_position['lon'][ind_whale_apex],
whale_position['offset'],
das_bearing,
whale_position['side'])
# Create an updated whale position dictionary
# print(f"whale position {whale_position}")
# Calculate 3D distance for each element in DAS_position
distances = calc_dist_lat_lon(whale_position, das_position)
# Calculate depth differences
depth_diff = np.array(das_position['depth']) - whale_position['depth']
# Calculate total distance (3D)
total_distance = np.sqrt(distances ** 2 + depth_diff ** 2)
# Calculate theoretical TOAs
toa = (total_distance - np.min(total_distance)) / c0
return toa
[docs]
def calc_distance_matrix(cable_pos, whale_pos):
"""
Compute the distance matrix between the cable and the whale
"""
return np.sqrt(np.sum((cable_pos - whale_pos) ** 2, axis=1))
[docs]
def calc_radii_matrix(cable_pos, whale_pos):
"""
Compute the radii matrix between the cable and the whale
"""
return np.sqrt(np.sum((cable_pos[:,:2] - whale_pos[:2]) ** 2, axis=1))
[docs]
def calc_theta_vector(cable_pos, whale_pos):
"""
Compute the elevation angle between the cable and the whale for each cable position
"""
rj = calc_radii_matrix(cable_pos, whale_pos)
return np.arctan2(abs(whale_pos[2]-cable_pos[:, 2]), rj)
[docs]
def calc_phi_vector(cable_pos, whale_pos):
"""
Compute the azimuthal angle between the cable and the whale for each cable position
"""
return np.arctan2(whale_pos[1]-cable_pos[:,1], whale_pos[0]-cable_pos[:,0])
[docs]
def solve_lq(Ti, cable_pos, c0, Nbiter=10, SNR=None, fix_z=False, ninit=None, residuals=False, verbose=False):
"""
Solve the least squares problem to localize the whale with optional SNR weighting
Parameters
----------
Ti : np.ndarray
Array of arrival times at each cable position [channel x 1]
cable_pos : np.ndarray
Array of cable positions [channel x 3]
c0 : float
Speed of sound in water considered constant
SNR : np.ndarray, optional
Array of signal-to-noise ratios in dB for each measurement [channel x 1]
Nbiter : int, optional (default=10)
Number of iterations for the least squares algorithm
fix_z : bool, optional (default=False)
Whether to fix the z-coordinate
ninit : np.ndarray, optional
Initial guess for n
residuals : bool, optional (default=False)
Whether to return residuals
verbose : bool, optional (default=False)
Whether to print iteration details
Returns
-------
n : np.ndarray
Estimated whale position and time of emission vector [x, y, z, t0]
res : np.ndarray, optional
Residuals if residuals=True
"""
# Make a first guess of the whale position
n = np.array([40000, 1000, -30, np.min(Ti)])
if ninit is not None:
n = ninit
# Regularization parameter
lambda_reg = 1e-5
# Create weight matrix from SNR values if provided
if SNR is not None:
# Convert SNR from dB to linear scale
linear_snr = 10**(SNR/10)
# Use SNR as weights - higher SNR = more weight
W = np.diag(linear_snr)
# Optional: normalize weights to sum to number of measurements
# This keeps the overall influence of the regularization term similar
W = W * (len(SNR) / np.sum(linear_snr))
else:
# Use uniform weights if no SNR provided
W = np.eye(len(Ti))
for j in range(Nbiter):
thj = calc_theta_vector(cable_pos, n)
phij = calc_phi_vector(cable_pos, n)
dt = Ti - calc_arrival_times(n[-1], cable_pos, n[:3], c0)
# Fixed z case
if fix_z:
# Save z value to reappend it after the least squares computation
dz = n[2]
n_fz = np.delete(n, 2) # Remove z from the vector n
del n # Delete n to reassign it with the new value
n = n_fz # Reassign n without z
# Compute the least squares coefficients matrix
G = np.array([np.cos(thj) * np.cos(phij) / c0, np.cos(thj) * np.sin(phij) / c0, np.ones_like(thj)]).T
# Free z case
else:
# Compute the least squares coefficients matrix
G = np.array([np.cos(thj) * np.cos(phij) / c0, np.cos(thj) * np.sin(phij) / c0, np.sin(thj) / c0, np.ones_like(thj)]).T
# Adding regularization to avoid singular matrix error
lambda_identity = lambda_reg * np.eye(G.shape[1])
# Weighted least squares solution: (G^T W G + λI)^(-1) G^T W dt
dn = np.linalg.inv(G.T @ W @ G + lambda_identity) @ G.T @ W @ dt
# Damping factor
if j < 4:
n += 0.7 * dn
else:
n += dn
if fix_z:
# reappend z to n in index 2 (before index 3)
n = np.insert(n, 2, dz)
if verbose:
print(f'Iteration {j+1}: x = {n[0]:.4f} m, y = {n[1]:.4f}, z = {n[2]:.4f}, ti = {n[3]:.4f}')
# Compute final residuals
res = Ti - calc_arrival_times(n[-1], cable_pos, n[:3], c0)
if residuals:
return n, res
else:
return n
[docs]
def calc_dist_weighting(dist, discut, disw1, disw2):
# Initialize weights
weights = np.zeros_like(dist)
# Find second minimum distance
if len(dist) > 1:
dmin2 = np.partition(dist, 1)[1]
if dmin2 > discut: # Event outside the network
# Set weights to 1 for distances less than dmin2 * disw1
weights[dist < dmin2 * disw1] = 1
# Apply cosine taper for distances between dmin2 * disw1 and dmin2 * disw2
taper_indices = (dist >= dmin2 * disw1) & (dist < dmin2 * disw2)
if np.any(taper_indices):
weights[taper_indices] = 0.5 * (1 + np.cos(np.pi * (dist[taper_indices] - dmin2 * disw1) /
(dmin2 * (disw2 - disw1))))
else: # Event inside the network
# Set weights to 1 for distances less than discut * disw1
weights[dist < discut * disw1] = 1
# Apply cosine taper for distances between discut * disw1 and discut * disw2
taper_indices = (dist >= discut * disw1) & (dist < discut * disw2)
if np.any(taper_indices):
weights[taper_indices] = 0.5 * (1 + np.cos(np.pi * (dist[taper_indices] - discut * disw1) /
(discut * (disw2 - disw1))))
# Create weight matrix:
return weights
[docs]
def calc_res_weighting(res, rmscut, rmsw1, rmsw2):
weights = np.zeros_like(res)
rms = np.sqrt(np.mean(res**2))
abs_res = np.abs(res)
if rms > rmscut: # poor solution
# Set weights to 1 for residuals less than rmscut * rmsw1
weights[abs_res < rms * rmsw1] = 1
# Apply cosine taper for residuals between rmscut * rmsw1 and rmscut * rmsw2
taper_indices = (abs_res >= rms * rmsw1) & (abs_res < rms * rmsw2)
if np.any(taper_indices):
weights[taper_indices] = 0.5 * (1 + np.cos(np.pi * (abs_res[taper_indices] - rms * rmsw1) /
(rms * (rmsw2 - rmsw1))))
else: # good solution
# Set weights to 1 for residuals less than rmscut * rmsw1
weights[abs_res < rmscut * rmsw1] = 1
# Apply cosine taper for residuals between rmscut * rmsw1 and rmscut * rmsw2
taper_indices = (abs_res >= rmscut * rmsw1) & (abs_res < rmscut * rmsw2)
if np.any(taper_indices):
weights[taper_indices] = 0.5 * (1 + np.cos(np.pi * (abs_res[taper_indices] - rmscut * rmsw1) /
(rmscut * (rmsw2 - rmsw1))))
return weights
[docs]
def solve_lq_weight(Ti, cable_pos, c0, Nbiter=10, fix_z=False, ninit=None,
return_uncertainty=True, residuals=False, verbose=False):
"""
Solve the least squares problem to localize the whale with distance weighting
Parameters
----------
Ti : np.ndarray
Array of arrival times at each cable position [channel x 1]
cable_pos : np.ndarray
Array of cable positions [channel x 3]
c0 : float
Speed of sound in water considered constant
Nbiter : int, optional (default=10)
Number of iterations for the least squares algorithm
fix_z : bool, optional (default=False)
Whether to fix the z-coordinate
ninit : np.ndarray, optional
Initial guess for n
return_uncertainty : bool, optional (default=True)
Whether to compute and return uncertainty information
residuals : bool, optional (default=False)
Whether to return residuals (deprecated, use return_uncertainty)
verbose : bool, optional (default=False)
Whether to print iteration details
Returns
-------
result : LocalizationResult or np.ndarray
If return_uncertainty=True: LocalizationResult with comprehensive information
If return_uncertainty=False: just the position array [x, y, z, t0]
res : np.ndarray, optional
Residuals if residuals=True (for backward compatibility)
"""
# Make a first guess of the whale position
n = np.array([40000, 1000, -30, np.min(Ti)])
if ninit is not None:
n = ninit
# Regularization parameter
lambda_reg = 1e-5
# Distance weighting parameters
discut = 10000 # 10 km
disw1 = 1 # Cosine taper starts
disw2 = 3 # Cosine taper ends
# Residual weighting parameters
rmscut = 0.2 # 0.1 s
rmsw1 = 1
rmsw2 = 3
# Store final weights for uncertainty analysis
final_weights = None
for j in range(Nbiter):
thj = calc_theta_vector(cable_pos, n)
phij = calc_phi_vector(cable_pos, n)
dt = Ti - calc_arrival_times(n[-1], cable_pos, n[:3], c0)
# Start the hypoinverse weighting only after 4 iterations
if j < 4:
W = np.eye(len(Ti))
else:
dist = calc_distance_matrix(cable_pos, n[:3])
w = calc_dist_weighting(dist, discut, disw1, disw2) * calc_res_weighting(dt, rmscut, rmsw1, rmsw2)
W = np.diag(w)
final_weights = w # Store for uncertainty analysis
# Fixed z case
if fix_z:
# Save z value to reappend it after the least squares computation
dz = n[2]
n_fz = np.delete(n, 2) # Remove z from the vector n
del n # Delete n to reassign it with the new value
n = n_fz # Reassign n without z
# Compute the least squares coefficients matrix
G = np.array([np.cos(thj) * np.cos(phij) / c0, np.cos(thj) * np.sin(phij) / c0, np.ones_like(thj)]).T
# Free z case
else:
# Compute the least squares coefficients matrix
G = np.array([np.cos(thj) * np.cos(phij) / c0, np.cos(thj) * np.sin(phij) / c0, np.sin(thj) / c0, np.ones_like(thj)]).T
# Adding regularization to avoid singular matrix error
lambda_identity = lambda_reg * np.eye(G.shape[1])
# Weighted least squares solution: (G^T W G + λI)^(-1) G^T W dt
dn = np.linalg.inv(G.T @ W @ G + lambda_identity) @ G.T @ W @ dt
# Damping factor
if j < 4:
n += 0.7 * dn # Value from USGS trial and error
else:
n += dn
if fix_z:
# reappend z to n in index 2 (before index 3)
n = np.insert(n, 2, dz)
if verbose:
current_rms = np.sqrt(np.mean(dt**2))
print(f'Iteration {j+1}: x = {n[0]:.4f} m, y = {n[1]:.4f}, z = {n[2]:.4f}, ti = {n[3]:.4f}, RMS = {current_rms:.6f}')
# Compute final residuals
res = Ti - calc_arrival_times(n[-1], cable_pos, n[:3], c0)
if return_uncertainty:
# Calculate RMS values
rms_unweighted = calc_rms(res)
# Calculate weighted RMS if weights are available
if final_weights is not None:
weighted_residuals = res * final_weights
rms_weighted = np.sqrt(np.sum(final_weights * res**2) / np.sum(final_weights))
else:
rms_weighted = rms_unweighted
final_weights = np.ones_like(res)
# Calculate variance and covariance matrix
predicted_times = calc_arrival_times(n[-1], cable_pos, n[:3], c0)
# Use weighted variance if weights are available
if final_weights is not None and not np.allclose(final_weights, final_weights[0]):
# We have actual weights (not all equal)
variance = cal_weighted_variance_residuals(Ti, predicted_times, final_weights, fix_z)
else:
# No weights or all weights equal - use unweighted variance
variance = cal_variance_residuals(Ti, predicted_times, fix_z)
final_weights = None # Don't pass equal weights to covariance calculation
covariance = calc_covariance_matrix(cable_pos, n, c0, variance, fix_z, final_weights)
err_ellipse = calc_error_ellipse_params(covariance)
uncertainty = 2 * np.sqrt(np.max(err_ellipse['eigenvals']))
result = LocalizationResult(
position=n,
residuals=res,
rms=rms_unweighted,
weighted_rms=rms_weighted,
covariance=covariance,
uncertainties=uncertainty,
weights=final_weights,
n_iterations=Nbiter
)
if residuals: # Backward compatibility
return result, res
else:
return result
else:
if residuals:
return n, res
else:
return n
[docs]
def cal_variance_residuals(arrtimes, predic_arrtimes, fix_z=False):
"""Compute the variance of the residuals of the arrival times
Parameters
----------
arrtimes : np.ndarray
array of measured arrival times
predic_arrtimes : np.ndarray
array of predicted arrival times
fix_z : bool, optional
True if the z coordinate is fixed, by default False
Returns
-------
var : float
Variance of the residuals
"""
residuals = arrtimes - predic_arrtimes
if fix_z:
var = 1 / (len(residuals) - 3) * np.sum(residuals**2)
else:
var = 1 / (len(residuals) - 4) * np.sum(residuals**2)
return var
[docs]
def cal_weighted_variance_residuals(arrtimes, predic_arrtimes, weights, fix_z=False):
"""Compute the weighted variance of the residuals of the arrival times
Parameters
----------
arrtimes : np.ndarray
array of measured arrival times
predic_arrtimes : np.ndarray
array of predicted arrival times
weights : np.ndarray
weights for each residual
fix_z : bool, optional
True if the z coordinate is fixed, by default False
Returns
-------
var : float
Weighted variance of the residuals
"""
residuals = arrtimes - predic_arrtimes
# Weighted variance formula: Σ(w_i * r_i²) / Σ(w_i)
# But for uncertainty estimation, we need to account for degrees of freedom
n_params = 3 if fix_z else 4
effective_n = len(residuals) - n_params
# Use effective sample size for weighted data
# For weighted least squares, the variance estimate is:
# σ² = Σ(w_i * r_i²) / (effective_n)
var = np.sum(weights * residuals**2) / effective_n
return var
[docs]
def calc_covariance_matrix(cable_pos, whale_pos, c0, var, fix_z=False, weights=None):
"""Compute the covariance matrix of the estimated whale position
Parameters
----------
cable_pos : np.ndarray
Array of cable positions [channel x 3]
whale_pos : np.ndarray
Estimated whale position [x, y, z, t0]
c0 : float
Speed of sound in water considered constant
var : float
Variance of the residuals (should be weighted variance if weights provided)
fix_z : bool, optional
Whether to fix the z coordinate (default: False)
weights : np.ndarray, optional
Weight vector for weighted least-squares (default: None for unweighted)
Returns
-------
cov : np.ndarray
Covariance matrix of the estimated whale position
"""
thj = calc_theta_vector(cable_pos, whale_pos)
phij = calc_phi_vector(cable_pos, whale_pos)
if fix_z:
G = np.array([np.cos(thj) * np.cos(phij) / c0, np.cos(thj) * np.sin(phij) / c0, np.ones_like(thj)]).T
else:
G = np.array([np.cos(thj) * np.cos(phij) / c0, np.cos(thj) * np.sin(phij) / c0, np.sin(thj) / c0, np.ones_like(thj)]).T
# Apply weights if provided (weighted least-squares)
if weights is not None:
W = np.diag(weights) # Convert to diagonal weight matrix
GtWG = G.T @ W @ G
else:
GtWG = G.T @ G
if np.linalg.cond(GtWG) > 1/sys.float_info.epsilon:
print('Matrix is singular')
lambda_reg = 1e-5
lambda_identity = lambda_reg * np.eye(G.shape[1])
cov = var * np.linalg.inv(GtWG + lambda_identity)
else:
cov = var * np.linalg.inv(GtWG)
return cov
[docs]
def loc_from_picks(associated_list, cable_pos, c0, fs, return_uncertainty=True):
"""Localize whale calls from associated picks with uncertainty quantification.
Parameters
----------
associated_list : list
List of associated picks
cable_pos : np.ndarray
Cable positions
c0 : float
Sound speed
fs : float
Sampling frequency
return_uncertainty : bool, optional
Whether to return uncertainty information (default=True)
Returns
-------
list
List of LocalizationResult objects if return_uncertainty=True,
or list of position arrays if return_uncertainty=False
"""
localizations = []
for select in associated_list:
idxmin_t = np.argmin(select[1][:])
apex_loc = cable_pos[:, 0][select[0][idxmin_t]]
Ti = select[1][:] / fs
Nbiter = 20
# Initial guess (apex_loc, mean_y, -30m, min(Ti))
n_init = [apex_loc, np.mean(cable_pos[:,1]), -40, np.min(Ti)]
# Solve the least squares problem with uncertainty
result = solve_lq_weight(Ti, cable_pos[select[0][:]], c0, Nbiter,
fix_z=True, ninit=n_init,
return_uncertainty=return_uncertainty)
localizations.append(result)
return localizations
[docs]
def loc_picks_bicable(n_assoc, s_assoc, cable_pos, c0, fs, Nbiter=20, return_uncertainty=True):
"""
Solve the least squares localization problem for bicable data using the picks' indices.
Parameters
----------
n_assoc : array-like
The north cable association data [indices, times]
s_assoc : array-like
The south cable association data [indices, times]
cable_pos : tuple
A tuple containing the positions of the north and south cables.
c0 : float
The speed of sound for localization.
fs : float
The sampling frequency.
Nbiter : int, optional
The number of iterations for the least squares solution, default is 20.
return_uncertainty : bool, optional
Whether to return uncertainty information (default=True)
Returns
-------
LocalizationResult or tuple
If return_uncertainty=True: LocalizationResult object
If return_uncertainty=False: tuple (position, residuals) for backward compatibility
"""
n_cable_pos, s_cable_pos = cable_pos
bicable_pos = np.concatenate((n_cable_pos[n_assoc[0]], s_cable_pos[s_assoc[0]]))
idx_time = np.concatenate((n_assoc[1], s_assoc[1]))
idxmin_t = np.argmin(idx_time) # Find the index of the minimum time
times = idx_time / fs
apex_loc = bicable_pos[idxmin_t, 0] # Find the apex location from the minimum time index
init = [apex_loc, np.mean(bicable_pos[:, 1]), -40, np.min(times)] # Initial guess for the localization
# Solve the least squares problem using the provided parameters
if return_uncertainty:
result = solve_lq_weight(times, bicable_pos, c0, Nbiter, fix_z=True,
ninit=init, return_uncertainty=True)
return result
else:
n, residuals = solve_lq_weight(times, bicable_pos, c0, Nbiter, fix_z=True,
ninit=init, return_uncertainty=False, residuals=True)
return n, residuals
[docs]
def loc_picks_bicable_list(n_assoc_list, s_assoc_list, cable_pos, c0, fs, Nbiter=20, return_uncertainty=True):
"""Localize multiple bicable associations with uncertainty quantification.
Parameters
----------
n_assoc_list : list
List of north cable associations
s_assoc_list : list
List of south cable associations
cable_pos : tuple
Cable positions for north and south
c0 : float
Sound speed
fs : float
Sampling frequency
Nbiter : int, optional
Number of iterations (default=20)
return_uncertainty : bool, optional
Whether to return uncertainty information (default=True)
Returns
-------
list
List of LocalizationResult objects if return_uncertainty=True,
or list of position arrays if return_uncertainty=False
"""
if len(n_assoc_list) != len(s_assoc_list):
raise ValueError("The lengths of n_assoc_list and s_assoc_list must be equal.")
localizations = []
for i in range(len(n_assoc_list)):
n_assoc = n_assoc_list[i]
s_assoc = s_assoc_list[i]
result = loc_picks_bicable(n_assoc, s_assoc, cable_pos, c0, fs, Nbiter, return_uncertainty)
localizations.append(result)
return localizations
[docs]
def calc_rms(residuals, window_mask=None):
"""Calculate the root mean square of the residuals.
Parameters
----------
residuals : np.ndarray or None
Array of residuals. If None or empty, returns np.inf.
window_mask : np.ndarray, optional
Boolean mask to apply to the residuals, by default None.
If provided, only the residuals where the mask is True will be considered.
Returns
-------
float
Root mean square of the residuals, or np.inf if residuals are invalid or empty.
"""
if residuals is None:
return np.inf
if window_mask is not None:
residuals = residuals[window_mask]
if residuals.size == 0:
return np.inf
return np.sqrt(np.mean(residuals ** 2))
[docs]
def calc_weighted_rms(residuals, weights):
"""Calculate the weighted root mean square of the residuals.
Parameters
----------
residuals : np.ndarray
Array of residuals.
weights : np.ndarray
Array of weights corresponding to each residual.
Returns
-------
float
Weighted root mean square of the residuals.
"""
if residuals is None or weights is None:
return np.inf
if residuals.size == 0 or weights.size == 0:
return np.inf
if np.sum(weights) == 0:
return np.inf
return np.sqrt(np.sum(weights * residuals**2) / np.sum(weights))
[docs]
def calc_error_ellipse_params(covariance_matrix, confidence_level=0.95):
"""Calculate error ellipse parameters from covariance matrix.
Parameters
----------
covariance_matrix : np.ndarray
Covariance matrix (at least 2x2 for x,y components)
confidence_level : float, optional
Confidence level (default 0.95 for 95% confidence)
Returns
-------
dict
Dictionary containing ellipse parameters:
- semi_major_axis: length of semi-major axis
- semi_minor_axis: length of semi-minor axis
- rotation_angle: rotation angle in radians
- area: ellipse area
"""
# Extract 2x2 covariance matrix for x,y coordinates
cov_xy = covariance_matrix[:2, :2]
# Calculate eigenvalues and eigenvectors
eigenvals, eigenvecs = np.linalg.eigh(cov_xy)
# Sort by eigenvalue (largest first)
order = eigenvals.argsort()[::-1]
eigenvals = eigenvals[order]
eigenvecs = eigenvecs[:, order]
# Chi-squared value for confidence level (use a fallback if scipy not available)
try:
from scipy.stats import chi2
chi2_val = chi2.ppf(confidence_level, df=2)
except ImportError:
# Fallback approximation for 95% confidence
if confidence_level == 0.95:
chi2_val = 5.991 # chi2(0.95, df=2)
elif confidence_level == 0.68:
chi2_val = 2.279 # chi2(0.68, df=2)
else:
chi2_val = 5.991 # Default to 95%
# Calculate ellipse parameters for the given confidence level
semi_major_axis = np.sqrt(chi2_val * eigenvals[0])
semi_minor_axis = np.sqrt(chi2_val * eigenvals[1])
# Rotation angle (angle of major axis with x-axis)
rotation_angle = np.arctan2(eigenvecs[1, 0], eigenvecs[0, 0])
# Ellipse area
area = np.pi * semi_major_axis * semi_minor_axis
return {
'semi_major_axis': semi_major_axis,
'semi_minor_axis': semi_minor_axis,
'rotation_angle': rotation_angle,
'area': area,
'eigenvals': eigenvals,
'eigenvects': eigenvecs
}
[docs]
def localization_results_to_dict(results, utc_start=None, sensor='unknown', call_type='unknown'):
"""Convert LocalizationResult objects to dictionary format for CSV export.
Parameters
----------
results : list of LocalizationResult
List of localization results
utc_start : datetime, optional
Start time for converting relative times to absolute UTC
sensor : str, optional
Sensor identifier (default='unknown')
call_type : str, optional
Call type identifier (default='unknown')
Returns
-------
list of dict
List of dictionaries ready for DataFrame conversion
"""
import datetime
rows = []
for i, result in enumerate(results):
if isinstance(result, LocalizationResult):
pos = result.position
ellipse_params = calc_error_ellipse_params(result.covariance)
row = {
'id': i,
'utc': utc_start + datetime.timedelta(seconds=float(pos[3])) if utc_start else None,
'sensor': sensor,
'call_type': call_type,
'x': float(pos[0]),
'y': float(pos[1]),
'z': float(pos[2]),
't0': float(pos[3]),
'rms': float(result.rms),
'weighted_rms': float(result.weighted_rms),
'unc_x': float(result.uncertainties[0]),
'unc_y': float(result.uncertainties[1]),
'unc_z': float(result.uncertainties[2]) if len(result.uncertainties) > 2 else np.nan,
'unc_t': float(result.uncertainties[-1]),
'ellipse_semi_major': float(ellipse_params['semi_major_axis']),
'ellipse_semi_minor': float(ellipse_params['semi_minor_axis']),
'ellipse_rotation': float(ellipse_params['rotation_angle']),
'ellipse_area': float(ellipse_params['area']),
'n_picks': len(result.residuals),
'n_iterations': result.n_iterations
}
else:
# Handle backward compatibility with simple arrays
pos = result if hasattr(result, '__len__') else [result, 0, 0, 0]
row = {
'id': i,
'utc': utc_start + datetime.timedelta(seconds=float(pos[3])) if utc_start and len(pos) > 3 else None,
'sensor': sensor,
'call_type': call_type,
'x': float(pos[0]) if len(pos) > 0 else np.nan,
'y': float(pos[1]) if len(pos) > 1 else np.nan,
'z': float(pos[2]) if len(pos) > 2 else np.nan,
't0': float(pos[3]) if len(pos) > 3 else np.nan,
'rms': np.nan,
'weighted_rms': np.nan,
'unc_x': np.nan,
'unc_y': np.nan,
'unc_z': np.nan,
'unc_t': np.nan,
'ellipse_semi_major': np.nan,
'ellipse_semi_minor': np.nan,
'ellipse_rotation': np.nan,
'ellipse_area': np.nan,
'n_picks': np.nan,
'n_iterations': np.nan
}
rows.append(row)
return rows