"""
data_handle.py - data handling functions for DAS data
This module provides various functions to handle DAS data, including loading, downloading, and conditioning.
It aims at having specific functions for each interrogator type.
Authors: Léa Bouffaut, Quentin Goestchel, Erfan Horeh
Date: 2023-2024-2025
"""
from __future__ import annotations
import csv
import os
import re
from datetime import datetime, timezone, timedelta
from typing import Dict, List, Optional, Tuple, Union, Any
from urllib.parse import urljoin
import dask.array as da
import h5py
import numpy as np
import pandas as pd
import wget
from nptdms import TdmsFile
# Check if simpledas is installed (optional dependency for ASN interrogator support)
try :
from simpledas import simpleDASreader as sd
SIMPLEDAS_AVAILABLE = True
except ImportError:
SIMPLEDAS_AVAILABLE = False
# Test for the package
[docs]
def hello_world_das_package() -> None:
"""Print a hello world message for the package."""
print("Yepee! You now have access to all the functionalities of the das4whale python package!")
# Read metadata
# Definition of the functions for DAS data conditioning
[docs]
def get_acquisition_parameters(filepath: str, interrogator: str = 'optasense') -> Optional[Dict[str, Any]]:
"""
Retrieve acquisition parameters based on the specified interrogator.
Parameters
----------
filepath : str
The file path to the data file.
interrogator : str, optional
The interrogator type, one of {'optasense', 'silixa', 'mars', 'alcatel', 'onyx'}.
Defaults to 'optasense'.
Returns
-------
metadata : dict or None
Metadata related to the acquisition parameters. Returns None if no matching
interrogator is found.
Raises
------
ValueError
If the interrogator name is not in the predefined list.
"""
# List the known used interrogators:
interrogator_list = ['optasense', 'silixa', 'mars', 'asn', 'onyx']
if interrogator in interrogator_list:
if interrogator == 'optasense':
metadata = get_metadata_optasense(filepath)
elif interrogator == 'silixa':
metadata = get_metadata_silixa(filepath)
elif interrogator == 'mars':
metadata = get_metadata_mars(filepath)
elif interrogator == 'asn':
metadata = get_metadata_asn(filepath)
elif interrogator == 'onyx':
metadata = get_metadata_onyx(filepath)
else:
raise ValueError('Interrogator name incorrect')
return metadata
# Load/download das data as strain
[docs]
def raw2strain(trace: np.ndarray, metadata: Dict[str, Any]) -> np.ndarray:
"""
Transform the amplitude of raw das data from unwrapped optical phase (rad) to strain according to scale factor
Parameters
----------
trace : array-like
a [channel x time sample] nparray containing the raw data in the spatio-temporal domain
metadata : dict
dictionary filled with metadata (fs, dx, nx, ns, n, GL, scale_factor)
Returns
-------
trace : array-like
a [channel x time sample] nparray containing the strain data in the spatio-temporal domain
"""
# Remove the mean trend from each channel and scale
trace -= np.mean(trace, axis=1, keepdims=True) # using np.median() is also possible, depending on the nature of the noise
trace *= metadata["scale_factor"]
return trace
[docs]
def load_das_data(filename: str, selected_channels: List[int], metadata: Dict[str, Any], interrogator: str = 'optasense') -> Tuple[np.ndarray, np.ndarray, np.ndarray, datetime]:
"""
Load the DAS data corresponding to the input file name as strain according to the selected channels.
Parameters
----------
filename : str
A string containing the full path to the data to load.
selected_channels : list
A list containing the selected channels.
metadata : dict
A dictionary filled with metadata (sampling frequency, channel spacing, scale factor...).
interrogator : name of used interrogators. Supports
Returns
-------
trace : np.ndarray
A [channel x sample] nparray containing the strain data.
tx : np.ndarray
The corresponding time axis (s).
dist : np.ndarray
The corresponding distance along the FO cable axis (m).
file_begin_time_utc : datetime.datetime
The beginning time of the file, can be printed using file_begin_time_utc.strftime("%Y-%m-%d %H:%M:%S").
"""
if not os.path.exists(filename):
raise FileNotFoundError(f'File {filename} not found')
if interrogator in ['optasense', 'silixa', 'onyx']:
with h5py.File(filename, 'r') as fp:
# Data matrix
raw_data = fp['Acquisition/Raw[0]/RawData']
# Check the orientation of the data compared to the metadata
if raw_data.shape[0] == metadata["nx"]:
# Data is in the correct orientation
pass
elif raw_data.shape[1] == metadata["nx"]:
# Data is transposed without loading in memory
raw_data = raw_data[:,:].T
# Selection the traces corresponding to the desired channels
# Loaded as float64, float 32 might be sufficient?
trace = raw_data[selected_channels[0]:selected_channels[1]:selected_channels[2], :].astype(np.float64)
trace = raw2strain(trace, metadata)
# UTC Time vector for naming
raw_data_time = fp['Acquisition']['Raw[0]']['RawDataTime']
# For future save
file_begin_time_utc = datetime.utcfromtimestamp(raw_data_time[0] * 1e-6)
elif interrogator == 'asn':
if not SIMPLEDAS_AVAILABLE:
raise ImportError("simpledas package is required to load ASN interrogator data. Please install it using: pip install git+https://github.com/qgoestch/simpleDAS")
dfdas = sd.load_DAS_files(filename, chIndex=None, samples=None, sensitivitySelect=-3,
userSensitivity={'sensitivity': metadata['scale_factor'],
'sensitivityUnit': 'rad/(m*strain)'},
integrate=True, unwr=True)
trace = dfdas.values.T
trace = trace[selected_channels[0]:selected_channels[1]:selected_channels[2], :].astype(np.float64)
# For future save
file_begin_time_utc = dfdas.meta['time']
else:
raise ValueError('Interrogator name incorrect or not supported')
# Store the following as the dimensions of our data block
nnx = trace.shape[0]
nns = trace.shape[1]
# Define new time and distance axes
tx = np.arange(nns) / metadata['fs']
dist = (np.arange(nnx) * selected_channels[2] + selected_channels[0]) * metadata['dx']
return trace, tx, dist, file_begin_time_utc
[docs]
def load_mtpl_das_data(filepaths: List[str], selected_channels: List[int], metadata: Dict[str, Any], timestamp: str, time_window: float) -> Tuple[np.ndarray, np.ndarray, np.ndarray, datetime]:
"""
Load the DAS data corresponding to the input file names as strain according to the selected channels. Takes multiple files as input and concatenates them along the time axis starting from the input timestamp for the input time window.
Parameters
----------
filepaths : list
A list containing the full pathes to the data to load.
selected_channels : list
A list containing the selected channels.
metadata : dict
A dictionary filled with metadata (sampling frequency, channel spacing, scale factor...).
timestamp : str
The timestamp to extract the data from.
time_window : float
The time window duration to extract the data from.
Returns
-------
trace : np.ndarray
A [channel x sample] nparray containing the strain data.
tx : np.ndarray
The corresponding time axis (s).
dist : np.ndarray
The corresponding distance along the FO cable axis (m).
file_begin_time_utc : datetime.datetime
The beginning time of the file, can be printed using file_begin_time_utc.strftime("%Y-%m-%d %H:%M:%S").
"""
# Print the input timestamp
print(f'timestamp_input: {timestamp}')
# Convert timestamp to microseconds
timestamp = datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S.%f')
timestamp = timestamp.replace(tzinfo=timezone.utc)
timestamp_us = timestamp.timestamp() * 1e6
trace = None
file_begin_time_utc = None
# Loop through each filepath lazily
for filepath in filepaths:
if not os.path.exists(filepath):
raise FileNotFoundError(f'File {filepath} not found')
with h5py.File(filepath, 'r') as fp:
raw_data = fp['Acquisition/Raw[0]/RawData']
raw_data_time = fp['Acquisition/Raw[0]/RawDataTime']
# Select the traces corresponding to the desired channels lazily
selected_trace = da.from_array(raw_data[selected_channels[0]:selected_channels[1]:selected_channels[2], :], chunks='auto')
# Find the index where raw_data_time >= timestamp lazily
if trace is None:
index = np.searchsorted(raw_data_time, timestamp_us)
file_begin_time_utc = datetime.fromtimestamp(raw_data_time[index] * 1e-6, tz=timezone.utc)
# Concatenate traces along the time axis lazily
trace = selected_trace if trace is None else da.concatenate([trace, selected_trace], axis=1)
# Convert the time window duration to samples
duration_samples = int(time_window * metadata["fs"])
# Extract the desired time window lazily
trace = trace[:, index:index + duration_samples].astype(np.float64)
# Convert raw data to strain
tr = raw2strain(trace, metadata)
print(f'timestamp_output: {file_begin_time_utc}')
# Get dimensions of the data block lazily
nnx, nns = trace.shape
# Define new time and distance axes lazily
time = np.arange(nns) / metadata["fs"]
dist = (np.arange(nnx) * selected_channels[2] + selected_channels[0]) * metadata["dx"]
# Return dask arrays for lazy evaluation
return tr.compute(), time, dist, file_begin_time_utc
[docs]
def dl_file(url: str) -> Tuple[str, str]:
"""Download the file at the given url
Parameters
----------
url : string
url location of the file
Returns
-------
filepath : string
local path destination of the file
"""
filename = url.split('/')[-1]
filepath = os.path.join('data',filename)
if os.path.exists(filepath) == True:
print(f'{filename} already stored locally')
else:
# Create the data subfolder if it doesn't exist
os.makedirs('data', exist_ok=True)
wget.download(url, out='data', bar=wget.bar_adaptive)
print(f'Downloaded {filename}')
return filepath, filename
# Load cable position information
[docs]
def load_cable_coordinates(filepath: str, dx: float) -> pd.DataFrame:
"""
Load the cable coordinates from a text file.
Parameters
----------
filepath : str
The file path to the cable coordinates file.
dx : float
The distance between two channels.
Returns
-------
df : pandas.DataFrame
The cable coordinates dataframe.
"""
# load the .txt file and create a pandas dataframe
df = pd.read_csv(filepath, delimiter = ",", header = None)
df.columns = ['chan_idx','lat', 'lon', 'depth']
df['chan_m'] = df['chan_idx'] * dx
return df
[docs]
def get_cable_lat_lon_depth(file: str, selected_channels: Tuple[int, int, int]) -> Dict[str, List[float]]:
"""
Extract latitude, longitude, and depth information from a CSV or TXT file for selected cable channels.
Parameters
----------
file : str
The file path to the CSV or TXT file containing latitude, longitude, and depth information.
selected_channels : tuple
A tuple containing three integers: (start, stop, step) used to slice and select cable channels.
Returns
-------
position : dict
A dictionary containing the extracted information for the selected channels:
- 'chan_idx_oj': list of floats, representing the selected cable channel indices.
- 'lat': list of floats, representing the selected latitude values.
- 'lon': list of floats, representing the selected longitude values.
- 'depth': list of floats, representing the selected depth values in absolute magnitude.
"""
# Prepare lists
channel = []
lat = []
lon = []
depth = []
# Read the latitude, longitude, and depth data from the CSV/TXT file
with open(file, mode='r') as file:
csv_file = csv.reader(file, delimiter=' ')
for lines in csv_file:
# Filter out empty strings
lines = [line for line in lines if line]
channel.append(float(lines[0]))
lat.append(float(lines[1]))
lon.append(float(lines[2]))
depth.append(abs(float(lines[3])))
# Select the specified channels
channel = channel[selected_channels[0]:selected_channels[1]:selected_channels[2]]
lat = lat[selected_channels[0]:selected_channels[1]:selected_channels[2]]
lon = lon[selected_channels[0]:selected_channels[1]:selected_channels[2]]
depth = depth[selected_channels[0]:selected_channels[1]:selected_channels[2]]
# Store latitude, longitude, and depth in a dictionary
position = {
'chan_idx_oj': channel,
'lat': lat,
'lon': lon,
'depth': depth,
}
return position
# Load annotation files
[docs]
def load_annotation_csv(filepath: str) -> pd.DataFrame:
"""
Load the annotation data from a CSV file. The file must include the following columns:
'file_name', 'apex', 'offset', 'start_time', 'whale_side', as output from the DAS Source Locator
annotation application (https://github.com/leabouffaut/DASSourceLocator).
Parameters
----------
filepath : str
The file path to the annotation CSV file to load.
Returns
-------
annotations : pandas.DataFrame
A DataFrame containing the following columns:
- 'file_name': str, names of files
- 'apex': int, apex values
- 'offset': int, offset values
- 'start_time': float, start times
- 'whale_side': str, indicating the side of the whale (e.g., 'left' or 'right').
"""
# load the .csv file and create a pandas dataframe
annotations = pd.read_csv(filepath, header=0, keep_default_na=False)
annotations.columns = ['file_name', 'apex', 'offset', 'start_time', 'whale_side']
return annotations
[docs]
def calc_dist_to_xidx(x: float, selected_channels_m: List[float], selected_channels: List[int], dx: float) -> int:
"""
Calculate the index of the channel closest to the given distance.
Parameters
----------
x : float
The distance along the cable.
selected_channels_m : list
The selected channels in meters.
selected_channels : list
The selected channels.
dx : float
The distance between two channels.
Returns
-------
int
The index of the channel closest to the given distance.
"""
return int((x-selected_channels_m[0]) / (dx * selected_channels[2]))
[docs]
def get_selected_channels(selected_channels_m: List[float], dx: float) -> List[int]:
"""
Get the selected channels in channel numbers.
Parameters
----------
selected_channels_m : list
The selected channels in meters. [ChannelStart_m, ChannelStop_m, ChannelStep_m]
dx : float
The distance between two channels.
Returns
-------
list
The selected channels in channel numbers. [ChannelStart, ChannelStop, ChannelStep]
"""
selected_channels = [int(selected_channels_m // dx) for selected_channels_m in
selected_channels_m] # list of values in channel number (spatial sample) corresponding to the starting, ending and step wanted
# channels along the FO Cable
# selected_channels = [ChannelStart, ChannelStop, ChannelStep] in channel
# numbers
return selected_channels
[docs]
def generate_file_list(base_url: str, start_file: str, duration: int) -> List[str]:
"""
Generate a list of file URLs that correspond to a given time range, starting from a known file.
Parameters
----------
base_url : str
The base URL where the files are hosted.
start_file : str
Filename of the first file to use as reference.
duration : int
Duration in seconds.
Returns
-------
list of str
List of full file URLs covering the time range.
"""
# Extract start time from filename
start_time = extract_timestamp(start_file)
if start_time is None:
raise ValueError("Could not extract timestamp from filename.")
selected_files = [urljoin(base_url, start_file)]
accumulated_time = 0
current_file = start_file
while accumulated_time < duration:
# Infer the next file timestamp by checking the difference between current and next
next_time = start_time + timedelta(seconds=60) # Default to 60s step
next_filename = re.sub(r'(\d{4}-\d{2}-\d{2}T\d{6})Z', next_time.strftime('%Y-%m-%dT%H%M%S') + 'Z', current_file)
selected_files.append(urljoin(base_url, next_filename))
accumulated_time += 60 # Assume 60s duration per file; adjust as needed
start_time = next_time
current_file = next_filename
return selected_files