Source code for highiq.calc.spectra

import numpy as np
import cupy as cp
import xarray as xr

from scipy.signal import hann, find_peaks
from pint import UnitRegistry
from .gpu_methods import _fast_expand, _gpu_moving_average


[docs]def welchs_method(complex_coeff, fs=50e6, nfft=32, window_skip=16, num_per_block=200): """ This technique is an implementation of Welch's method for processing a Doppler spectral density function from complex autocorrelation function data. Welch's method calculates the Doppler spectral density function by calculating the FFT over (potentially overlapping) windows and then taking the average of the magnitudes of each FFT to create the Doppler spectra. Parameters ---------- complex_coeff: complex nD array An n-D array of complex floats representing the value of the autocorrelation function. The first dimension is the number of entries in time, and the second is the number of entries in the range dimension. fs: The pulse repetition frequency of the radar. nfft: The number of points to include in the FFT for calculating spectra. This must be an even number. window_skip: The number of points to go forward for each window. This cannot exceed the size of the FFT. num_per_block: The number of time periods to send to the GPU at one time. This is implemented because the GPU can only handle a limited number of data points at one time. Returns ------- freq: 1D array The frequencies corresponding to each value in the spectra. power: nD array The power spectral densities for each frequency. """ if nfft % 2 == 1: raise ValueError("The number of points in the FFT must be even!") if window_skip > nfft: raise ValueError("window_skip cannot be greater than nfft!") times = complex_coeff.shape[0] num_points = complex_coeff.shape[-1] window = hann(nfft) if len(complex_coeff.shape) == 3: my_fft = np.zeros((complex_coeff.shape[0], complex_coeff.shape[1], nfft)) for k in range(0, times, num_per_block): j = 0 the_max = min([k + num_per_block, times]) gpu_complex = cp.array(complex_coeff[k:the_max, :, :]) windowt = cp.tile(window, (gpu_complex.shape[0], gpu_complex.shape[1], 1)) temp_fft = cp.zeros((gpu_complex.shape[0], gpu_complex.shape[1], nfft)) for i in range(0, num_points, window_skip): if i + nfft > num_points: start = num_points - nfft temp_fft += cp.square( cp.abs(cp.fft.fft(gpu_complex[:, :, start:] * windowt, axis=-1))) else: temp_fft += cp.square( cp.abs(cp.fft.fft(gpu_complex[:, :, i:i + nfft] * windowt, axis=-1))) j += 1 temp_fft = temp_fft / j my_fft[k:the_max] = temp_fft.get() elif len(complex_coeff.shape) == 2: j = 0 gpu_complex = cp.array(complex_coeff) windowt = cp.tile(window, ((gpu_complex.shape[0], 1))) temp_fft = cp.zeros((gpu_complex.shape[0], nfft)) for i in range(0, num_points, window_skip): if i + nfft > num_points: start = num_points - nfft temp_fft += cp.square( cp.abs(cp.fft.fft(gpu_complex[:, start:] * windowt, axis=-1))) else: temp_fft += cp.square( cp.abs(cp.fft.fft(gpu_complex[:, i:i + nfft] * windowt, axis=-1))) j += 1 temp_fft = temp_fft / j my_fft = temp_fft.get() power = my_fft freq = np.fft.fftfreq(nfft) * fs return freq, np.array(power)
[docs]def get_psd(spectra, gate_resolution=30., wavelength=None, fs=None, nfft=32, acf_name='acf', acf_bkg_name='acf_bkg', block_size_ratio=1.0): """ This function will get the power spectral density from the autocorrelation function. Parameters ---------- spectra: ACT Dataset The dataset containing the autocorrelation function. gate_resolution: float The gate resolution to derive the spectra for. wavelength: float or None The wavelength (in m) of the radar. If None, HighIQ will attempt to load the wavelength from the wavelength attribute of the spectra dataset. fs: float or None The pulse repetition frequency of the radar in Hz. If None, HighIQ will try and extract this information from the ACT dataset. This will require that the dataset have an attribute called sample_rate that contains a string with the magnitude and units of the sample rate. nfft: int The number of points to include in the FFT. acf_name: str The name of the autocorrelation function field. acf_bkg_name: str The name of the autocorrelation function of the background. block_size_ratio: float Increase this value to use more GPU memory for processing. Doing this can poentially optimize processing. Returns ------- spectra: ACT Dataset The dataset containing the power spectral density functions. """ Q_ = UnitRegistry().Quantity if fs is None: if not "sample_rate" in spectra.attrs: raise KeyError("If sample frequency is not specified, then ACT Dataset must contain a sample_rate " + "attribute!") fs_pint = Q_(spectra.attrs["sample_rate"]) fs_pint = fs_pint.to("Hz") fs = fs_pint.magnitude if wavelength is None: if not "wavelength" in spectra.attrs: raise KeyError("If wavelength is not specified, then the dataset must contain a wavelength attribute!") fs_pint = Q_(spectra.attrs["wavelength"]) fs_pint = fs_pint.to("m") wavelength = fs_pint.magnitude num_gates = int(gate_resolution / 3) complex_coeff = spectra[acf_name].sel(complex=1).values +\ spectra[acf_name].sel(complex=2).values * 1j complex_coeff = np.reshape( complex_coeff, (complex_coeff.shape[0], int(complex_coeff.shape[1] / num_gates), int(complex_coeff.shape[2] * num_gates))) freq, power = welchs_method( complex_coeff, fs=fs, nfft=nfft, num_per_block=int(block_size_ratio*200)) attrs_dict = {'long_name': 'Range', 'units': 'm'} spectra['range'] = xr.DataArray( gate_resolution * np.arange(int(complex_coeff.shape[1])), dims=('range'), attrs=attrs_dict) spectra.attrs['nyquist_velocity'] = "%f m s-1" % (wavelength / (4 * 1 / fs)) spectra['freq_bins'] = xr.DataArray(freq, dims=['freq']) spectra['freq_bins'].attrs["long_name"] = "Doppler spectra bins in frequency units" spectra['freq_bins'].attrs["units"] = "Hz" vel_bins = spectra['freq_bins'] * (wavelength / 2) inds_sorted = np.argsort(vel_bins.values) vel_bins = vel_bins[inds_sorted] attrs_dict = {'long_name': 'Doppler velocity', 'units': 'm s-1'} spectra['vel_bins'] = xr.DataArray(vel_bins, dims=('vel_bins'), attrs=attrs_dict) spectra['freq_bins'] = spectra['freq_bins'][inds_sorted] spectra['power'] = xr.DataArray( power[:, :, inds_sorted], dims=(('time', 'range', 'vel_bins'))) complex_coeff = (spectra[acf_bkg_name].sel(complex=1).values + spectra[acf_bkg_name].sel(complex=2).values * 1j) complex_coeff = np.reshape( complex_coeff, (complex_coeff.shape[0], int(complex_coeff.shape[1] / num_gates), int(complex_coeff.shape[2] * num_gates))) freq, power = welchs_method( complex_coeff, fs=50e6, nfft=32, num_per_block=int(200*block_size_ratio)) spectra['power_bkg'] = xr.DataArray( power[:, :, inds_sorted], dims=(('time', 'range', 'vel_bins'))) # Subtract background noise spectra['power_spectral_density'] = spectra['power'] - spectra['power_bkg'] spectra['power_spectral_density'] = spectra['power_spectral_density'].where( spectra['power_spectral_density'] > 0, 0) spectra['power_spectral_density'].attrs["long_name"] = "Power spectral density" tot_power = np.sum(spectra['power_spectral_density'].values, axis=2) dV = np.diff(spectra['vel_bins']) power_tiled = np.stack( [tot_power for x in range(spectra['power'].values.shape[2])], axis=2) spectra['power_spectra_normed'] = spectra['power_spectral_density'] / power_tiled / dV[1] * 100 spectra['power_spectra_normed'].attrs["long_name"] = "p.d.f. of power spectra" spectra['power_spectra_normed'].attrs["units"] = "%" spectra['power_spectral_density'] = 10 * np.log10(spectra['power_spectral_density']) / dV[1] spectra['power_spectral_density'].attrs["units"] = 's dB-1 m-1 ' # Smooth out power spectra interpolated_bins = np.linspace( spectra['vel_bins'].values[0], spectra['vel_bins'].values[-1], 256) spectra['vel_bin_interp'] = xr.DataArray(interpolated_bins, dims=('vel_bin_interp')) spectra['vel_bin_interp'].attrs['long_name'] = "Doppler velocity" spectra['vel_bin_interp'].attrs["units"] = "m s-1" my_array = _gpu_moving_average( _fast_expand(spectra['power_spectral_density'].values, 8)) spectra['power_spectral_density_interp'] = xr.DataArray( my_array, dims=('time', 'range', 'vel_bin_interp')) spectra['power_spectral_density_interp'].attrs['long_name'] = "Power spectral density" spectra['power_spectral_density_interp'].attrs["units"] = "s dB-1 m-1" my_array = _gpu_moving_average( _fast_expand(spectra['power_spectra_normed'].values, 8)) spectra['power_spectra_normed_interp'] = xr.DataArray( my_array, dims=('time', 'range', 'vel_bin_interp'),) spectra['power_spectra_normed_interp'].attrs['long_name'] = "Power spectral density" spectra['power_spectra_normed_interp'].attrs["units"] = "%" spectra['range'].attrs['long_name'] = "Range" spectra['range'].attrs['units'] = 'm' spectra['vel_bins'].attrs['long_name'] = "Doppler velocity" spectra['vel_bins'].attrs['units'] = 'm s-1' return spectra
[docs]def calc_num_peaks(my_spectra, **kwargs): """ This function will calculate the number of peaks in the spectra. Parameters ---------- my_spectra: ACT Dataset The dataset to calculate the number of peaks for. kwargs: Additional keyword arguments are passed into :func:`scipy.signal.find_peaks`. The default minimum height and width of the peak are set to 3 and 8 points respectively. Returns ------- my_spectra: ACT Dataset The dataset with an 'npeaks' variable included that shows the number of peaks. """ spectra = my_spectra['power_spectra_normed_interp'] my_array = spectra.fillna(0).values shp = my_array.shape num_peaks = np.zeros((shp[0], shp[1])) if not 'height' in kwargs.keys(): height = 3 else: height = kwargs.pop('height') if not 'width' in kwargs.keys(): width = 8 else: width = kwargs.pop('height') for i in range(shp[0]): for j in range(shp[1]): num_peaks[i, j] = len( find_peaks(my_array[i,j], height=height, width=width, **kwargs)[0]) my_spectra['npeaks'] = xr.DataArray(num_peaks, dims=('time', 'range')) my_spectra['npeaks'].attrs['long_name'] = "Number of peaks in Doppler spectra" my_spectra['npeaks'].attrs['units'] = "1" return my_spectra