from __future__ import annotations
from typing import Dict, List, Tuple, Union, Optional, Any
import dask
import numpy as np
import pandas as pd
import xarray as xr
from matplotlib import pyplot as plt
from scipy import signal, ndimage
[docs]
def fk_filt_chunk(data: xr.DataArray, tint: float, fs: float, xint: float, dx: float, c_min: float, c_max: float) -> xr.DataArray:
'''
fk_filt_chunk - perform fk filtering on single chunk of DAS data
Parameters
----------
data : xr.DataArray
DataArray containing single chunk
tint : float
interval in time between samples
fs : float
sampling frequency
xint : float
interval in space between samples
dx : float
distance between samples
'''
data_fft = np.fft.fft2(signal.detrend(data))
# Make freq and wavenum vectors
nx = data_fft.shape[0]
ns = data_fft.shape[1]
f = np.fft.fftshift(np.fft.fftfreq(ns, d = tint/fs))
k = np.fft.fftshift(np.fft.fftfreq(nx, d = xint*dx))
ff,kk = np.meshgrid(f,k)
# Soundwaves have f/k = c so f = k*c
g = 1.0*((ff < kk*c_min) & (ff < -kk*c_min))
g2 = 1.0*((ff < kk*c_max) & (ff < -kk*c_max))
g = g + np.fliplr(g)
g2 = g2 + np.fliplr(g2)
g = g-g2
g = ndimage.gaussian_filter(g, 40)
# epsilon = 0.0001
# g = np.exp (-epsilon*( ff-kk*c)**2 )
g = (g - np.min(g.flatten())) / (np.max(g.flatten()) - np.min(g.flatten()))
g = g.astype('f')
data_fft_g = np.fft.fftshift(data_fft) * g
data_g = np.fft.ifft2(np.fft.ifftshift(data_fft_g))
#return f,k,g,data_fft_g,data_g
# construct new DataArray
data_gx = xr.DataArray(data_g.real, dims=['distance','time'], coords=data.coords)
return data_gx
[docs]
def fk_filt(data: xr.DataArray, tint: float, fs: float, xint: float, dx: float, c_min: float, c_max: float) -> xr.DataArray:
'''
fk_filt - perform fk filtering on DAS data
Parameters
----------
data : xr.DataArray
DataArray containing DAS data
tint : float
interval in time between samples
fs : float
sampling frequency
xint : float
interval in space between samples
dx : float
distance between samples
'''
kwargs = {'tint':tint, 'fs':fs, 'xint':xint, 'dx':dx, 'c_min':c_min, 'c_max':c_max}
data_gx = data.map_blocks(fk_filt_chunk, kwargs=kwargs, template=data)
return data_gx
def _energy_TimeDomain_chunk(da: xr.DataArray, time_dim: str = 'time') -> xr.DataArray:
'''
_energy_TimeDomain_chunk - chunkwise function for energy_TimeDomain
Parameters
----------
da : xr.DataArray
DataArray containing DAS data
time_dim : string
time dimension of da
Returns
-------
da_energy : xr.DataArray
DataArray containing energy in time domain. Units are V^2 (where V is units of da)
'''
return (da**2).sum(time_dim, keepdims=True)
[docs]
def energy_TimeDomain(da: xr.DataArray, time_dim: str = 'time') -> xr.DataArray:
'''
energy_TimeDomain - calculate energy in time domain using parsevals theorem
energy is calculated for each chunk in time_dim
Parameters
----------
da : xr.DataArray
DataArray containing DAS data
time_dim : string
time dimension of da
Returns
-------
da_energy : xr.DataArray
DataArray containing energy in time domain. Units are V^2 (where V is units of da)
'''
# move time_dim to last dimension
da = da.transpose(..., time_dim)
# Get number of chunks in each dimension
original_dims = list(da.dims)
original_chunksize = dict(zip(original_dims, da.data.chunksize))
nchunks = []
for k, single_dim in enumerate(original_dims):
nchunks.append(da.shape[k]/original_chunksize[single_dim])
nchunks = dict(zip(original_dims, nchunks))
# get size of output
sizes_dict = dict(da.sizes)
sizes_dict.pop(time_dim)
output_sizes = list(sizes_dict.values()) + [nchunks[time_dim]]
# define new chunk sizes
new_chunk_sizes = {}
for k, item in enumerate(da.dims):
if item == time_dim:
new_chunk_sizes[item] = 1
else:
new_chunk_sizes[item] = original_chunksize[item]
# create template for xarray.map_blocks
template = xr.DataArray(
dask.array.random.random(
output_sizes, chunks=list(new_chunk_sizes.values())),
dims=da.dims,
name=f'energy in {time_dim} dimension')
da_energy = da.map_blocks(_energy_TimeDomain_chunk, template=template, kwargs={'time_dim':time_dim})
return da_energy
# I think everything below this is implemented in xrsignal
[docs]
def filtfilt(da: xr.DataArray, dim: str, **kwargs) -> xr.DataArray:
'''
filtfilt - this is an implentation of [scipy.signal.fitlfilt](https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.filtfilt.html)
This will filter the DAS data in time for each chunk. This process maps chunks and will therefore have error at the end of chunks in time.
By default, this does not compute, but generates the task graph
Parameters
----------
da : xr.DataArray
DataArray containing DAS data that you want to filter.
dim : string
dimension to filter in (should be dimension in da)
**kwargs : various types
passed to [scipy.signal.filtfilt](https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.filtfilt.html)
as per docs, ['x', 'b', and 'a'] are required
Returns
-------
da_filt : xr.DataArray
filtered data array in time. This does not compute the result, but just the task map in dask
'''
kwargs['dim']='time'
da_filt = da.map_blocks(filtfilt_chunk, kwargs=kwargs, template=da)
return da_filt
[docs]
def filtfilt_chunk(da: xr.DataArray, dim: str = 'time', **kwargs) -> xr.DataArray:
'''
converts dataarray to numpy, sends it to signal.filtfilt and then reinhereits all coordinates
Parameters
----------
da : xr.DataArray
dim : string
dimension to filter over (should be dimension in da)
**kwargs : various types
passed to scipy.signal.filtfilt
'''
dim_axis = da.dims.index(dim)
da_np = da.values
da_filt = signal.filtfilt(x=da_np, axis=dim_axis, **kwargs)
da_filtx = xr.DataArray(da_filt, dims=da.dims, coords=da.coords, name=da.name, attrs=da.attrs)
return da_filtx
[docs]
def spec(da: xr.DataArray) -> xr.DataArray:
'''
very quick implementation to calculate spectrogram
PSD is calculated for every chunk
Currently hardcoded for chunk size of 3000 in time
Parameters
----------
da : xr.DataArray
das data to compute spectrogram for
'''
template = xr.DataArray(np.ones((int(da.sizes['time']/3000), 513)), dims=['time','frequency']).chunk({'time':1, 'frequency':513})
return da.map_blocks(__spec_chunk, template=template)
def __spec_chunk(da: xr.DataArray) -> xr.DataArray:
'''
compute PSD for single chunk
Currently hard coded to handle only a time dimension..
'''
f, Pxx = signal.welch(da.values, fs=200, nperseg=1024)
return xr.DataArray(Pxx, dims='frequency', coords={'frequency':f})
[docs]
def disp_comprate(fk_filter: Any) -> None:
"""Display the sizes of the f-k filter matrix (sparse and dense version) and print the compression ratio
Parameters
----------
fk_filter : sparse.COO
f-k filter sparse matrix designed by function dsp.hybrid_filter_design()
"""
# Print some info about the f-k filter and the sparse matrix compression
size_sprfilt_coo = fk_filter.data.nbytes/ (1024 ** 3)
# If the matrix is a scipy.sparse.csr matrix:
# size_sprfilt = (fk_filter.data.nbytes + fk_filter.indices.nbytes + fk_filter.indptr.nbytes) / (1024**3)
densefk_filter = fk_filter.todense() # fk_filter.toarray()
sizefilt = densefk_filter.size * densefk_filter.itemsize / (1024**3)
# TODO: add the relative ratio
print(f'The size of the sparse filter is {size_sprfilt_coo:.4f} Gib')
print(f'The size of the dense filter is {sizefilt:.2f} Gib')
print(f'The compression ratio is {sizefilt / size_sprfilt_coo:.2f} ({abs(sizefilt - size_sprfilt_coo) *100 / sizefilt:.1f} %)')
return