Source code for haskoning_atr_tools.signal_processing_tool.frequency_domain.tools

### ===================================================================================================================
###   Frequency Domain Tools
### ===================================================================================================================
# Copyright ©2025 Haskoning Nederland B.V.

### ===================================================================================================================
###   1. Import modules
### ===================================================================================================================

# General imports
import warnings
import numpy as np
from scipy.fft import ifft
from typing import List, Tuple, Dict, Optional

# References for functions and classes in the haskoning_atr_tools package
from haskoning_atr_tools.signal_processing_tool.utils.constants import WINDOW_PARAMETERS, THIRD_OCTAVE_BANDS, \
    OCTAVE_BANDS

# Formatting warnings
warnings.formatwarning = lambda message, category, filename, lineno, line=None: \
    f"{filename}:{lineno}: {category.__name__}: {message}\n"


### ===================================================================================================================
###   2. Function for manipulating frequency domain data
### ===================================================================================================================

[docs] def convert_double_sided_to_real_single_sided_spectrum( spectrum_values: np.ndarray, frequencies: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ Function to convert a complex double-sided spectrum to a real single-sided spectrum. In the case of spectrum form FFT: If the input signal is not complex, a FT creates a spectrum mirrored around the DC frequency, and it goes up to +- nyquist frequency. So the negative frequency section is redundant. .. note:: This function assumes that the positive frequencies are in order. This matches with NumPy and SciPy FFT functions. Input: - spectrum_values (numpy array): The double-sided spectrum values. - frequencies (numpy array): The double-sided spectrum frequencies. Output: - Returns tuple with three numpy arrays: Single-sided amplitude spectrum, corresponding phase angles in degrees and corresponding positive frequencies. """ positive_indices = np.where(frequencies >= 0)[0] positive_spectrum_freq = frequencies[positive_indices] single_sided_values = np.abs(spectrum_values[positive_indices]) # Calculate corresponding phase angles phase_angles = np.angle(spectrum_values[positive_indices], deg=True) # Check if the number of points is even or odd signal_length = len(spectrum_values) if signal_length % 2 == 0: # Even number of points # Double values except for DC and Nyquist to convert to a single-sided spectrum single_sided_values[1:signal_length // 2] *= 2 else: # Odd number of points # Double values except for DC to convert to a single-sided spectrum single_sided_values[1:(signal_length - 1) // 2 + 1] *= 2 return single_sided_values, phase_angles, positive_spectrum_freq
[docs] def reconstruct_2sided_complex_spectrum( spectrum_values: List[float], phase_angle: Optional[List[float]] = None, frequencies: Optional[List[float]] = None) -> Tuple[List[float], List[float]]: """ Function to reconstruct the complete complex spectrum from real amplitudes and phase angles, form the positive frequencies. If frequencies are provided, they are also mirrored on the negative side. Input: - spectrum_values (list of floats): The real values in the frequency domain, form the positive frequencies. - phase_angle (list of floats): Optional input for the phase angles in the frequency domain, form the positive frequencies. Default value is None, in which case an array of zeros will be used. - frequencies (list of floats): Optional input for the frequency values in the frequency domain, form the positive frequencies. Default value is None, in which case only the complex spectrum is returned. Output: - Returns tuple with the complete complex spectrum as list of floats and the mirrored frequencies as list of floats. """ # If phase_angle is not provided, create an array of zeros if phase_angle is None: phase_angle = [0.0] * len(spectrum_values) # Create a complex array from amplitudes and phase angles complex_spectrum = np.array(spectrum_values) * np.exp(1j * np.array(phase_angle)) # Halve the positive frequency components except for DC and Nyquist complex_spectrum[1:len(spectrum_values) - 1] /= 2 # Mirror the positive frequencies to the negative frequencies, excluding the Nyquist frequency complex_spectrum = np.concatenate((complex_spectrum, np.conj(complex_spectrum[-2:0:-1]))) if frequencies is not None: # Mirror the frequencies mirrored_frequencies = np.concatenate((frequencies, -np.array(frequencies[-2:0:-1]))) return complex_spectrum.tolist(), mirrored_frequencies.tolist() return complex_spectrum.tolist(), []
[docs] def time_domain_data_from_frequency_domain( amplitudes: List[float], phase_angle: List[float], frequencies: List[float]) -> Tuple[List[float], List[float]]: """ Function to perform IFFT on the frequency domain data and manipulated to obtain time domain data. Input: - amplitudes (list of floats): The amplitude values of the signal in the frequency domain. - phase_angle (list of floats): The phase angles of the signal in the frequency domain. - frequencies (list of floats): The frequencies of the signal in the frequency domain. Output: - Returns tuple with the time stamps and the IFFT values of the signal in the time domain. """ # Determine the signal length from the length of the amplitude array signal_length = 2 * len(amplitudes) - 2 # Reconstruct the complex spectrum from amplitudes and fase angles complex_spectrum, _ = reconstruct_2sided_complex_spectrum(spectrum_values=amplitudes, phase_angle=phase_angle) # Reverse the signal length normalization complex_spectrum = [value * signal_length for value in complex_spectrum] # Perform the IFFT time_domain = ifft(complex_spectrum) # Calculate the time step based on the signal length and the highest frequency time_steps = 1 / (2 * max(frequencies)) # Generate the time vector time_vector = np.arange(signal_length) * time_steps # Return the real part of the time domain and the time vector return time_domain.real.tolist(), time_vector.tolist()
[docs] def identify_significant_amplitudes( amplitudes: List[float], frequencies: List[float], phase_angles: Optional[List[float]] = None, amplitude_threshold: float = None) -> List[Tuple[float, float, Optional[float]]]: """ Function to identify significant local maxima amplitudes and their respective frequency and phase angle pairs. Input: - amplitudes (list of floats): List of amplitude values. - frequencies (list of floats): List of frequency values. - phase_angles (list of floats): List of phase angles in radians. Default value is None. - amplitude_threshold (float): Optional input for the threshold for significant amplitudes. Defaults value is None, which defaults to 10% of max amplitude. Output: - Returns list of tuples containing significant amplitudes, frequencies and phase angles (if provided). """ if amplitude_threshold is None: amplitude_threshold = 0.1 * max(amplitudes) # Find local maxima local_maxima_indices = (np.diff(np.sign(np.diff(amplitudes))) < 0).nonzero()[0] + 1 # Filter by amplitude threshold filtered_indices = [i for i in local_maxima_indices if amplitudes[i] > amplitude_threshold] # Match the indices and create the list of tuples if phase_angles is not None: return [(amplitudes[i], frequencies[i], phase_angles[i]) for i in filtered_indices] return [(amplitudes[i], frequencies[i], None) for i in filtered_indices]
[docs] def find_max_amplitude( amplitudes: List[float], frequencies: List[float], frequency_range: Optional[List[float]] = None) \ -> Tuple[float, float]: """ Function to find the maximum amplitude within a given frequency range. Input: - amplitudes (list of floats): The list of amplitudes. - frequencies (list of floats): The list of frequencies. - frequency_range (list of floats): Optional input for the frequency range to search for the maximum amplitude. Default value is None, the entire range will be used. Output: - Returns tuple with the maximum amplitude as float and its corresponding frequency as float. """ # Convert the frequencies to numpy array for easy manipulation frequencies = np.array(frequencies) amplitudes = np.array(amplitudes) # If frequency_range is not provided, use the entire frequency range if frequency_range is None: frequency_range = [frequencies.min(), frequencies.max()] # Find the indices of the frequencies that are within the given range indices = np.where((frequencies >= frequency_range[0]) & (frequencies <= frequency_range[1])) # Extract the amplitudes that are within the given frequency range amplitudes_in_range = amplitudes[indices] # Find the maximum amplitude and its index within the given frequency range max_amp_index_in_range = np.argmax(amplitudes_in_range) max_amplitude = round(float(amplitudes_in_range[max_amp_index_in_range]), 2) freq_at_max_amplitude = round(float(frequencies[indices][max_amp_index_in_range]), 2) return max_amplitude, freq_at_max_amplitude
[docs] def power_density_form_amplitude_spectrum( amplitude_spectrum: List[float], frequencies: List[float], window_used: str = 'rectangular') -> list[float]: """ Function to calculate the power spectral density (PSD) from the amplitude spectrum. The PSD is the square of the amplitude spectrum. Input: - amplitude_spectrum (list of floats): The amplitude spectrum values. - frequencies (list of floats): The list of frequencies. - window_used (str): Optional input for the window used for the FFT. Default value is 'rectangular' which is same as no windowing. Output: - Returns list of power spectral density (PSD) values as floats. """ # Check if the window_type input if window_used is None or window_used in ['rectangular', 'Rectangular']: window_used = 'boxcar' # Rectangular window is the same as boxcar window if isinstance(window_used, str): window_used = window_used.lower() else: raise TypeError( f"ERROR: Input for the window type should be a string. Provided was {window_used} of type " f"{type(window_used)}.") if window_used not in WINDOW_PARAMETERS: valid_window_types = ", ".join(WINDOW_PARAMETERS.keys()) warnings.warn(f"WARNING: Invalid window type. Please choose from {valid_window_types}.") return [] # Compensate windowing effects with the Normalized Equivalent Noise Bandwidth (NENBW) factor NENBW = WINDOW_PARAMETERS[window_used][1] # Convert the amplitude spectrum back to a double-sided spectrum ds_amplitude_spectrum, ds_frequencies = ( reconstruct_2sided_complex_spectrum(spectrum_values=amplitude_spectrum, frequencies=frequencies)) # Double-sided Power spectral density (PSD) freq_increment = frequencies[1] - frequencies[0] ds_power_density_spectrum = np.array(ds_amplitude_spectrum) ** 2 / (freq_increment * NENBW) power_density_spectrum, _, _ = convert_double_sided_to_real_single_sided_spectrum( spectrum_values=ds_power_density_spectrum, frequencies=np.array(ds_frequencies)) return power_density_spectrum.tolist()
[docs] def rms_from_psd(power_density_spectrum_segment: List[float], freq_increment: float) -> float: """ Function to calculate the root-mean-square (RMS) value from a segment of the power spectral density (PSD). If the entries PSD is provided, the RMS is the square root of the integral of the PSD. The RMS is the square root of the integral of the PSD segment. Input: - power_density_spectrum_segment (list of floats): The power spectral density values. - freq_increment (float): The frequency increment or Delta f. Output: - Returns the root-mean-square (RMS) value as float. """ return np.sqrt(np.sum(power_density_spectrum_segment) * freq_increment)
[docs] def octave_form_frequency_domain_data( quantities: List[float], frequencies: List[float], calculation_method: str = 'cumulative', third_octave: bool = False) -> Dict[float, float]: """ Calculate octave or third-octave bands directly from frequency domain content. Input: - quantities (list of floats): The quantities in frequency domain. - frequencies (list of floats): The frequency values. - calculation_method (str): The calculation method, select from 'peak', 'mean', or 'cumulative'. Default value is 'cumulative'. - third_octave (bool): Input determines the frequency resolution of the output bands. When third-octave is selected the function calculates third-octave bands, which divide each octave into three narrower bands. Default value is False, the function calculates standard octave bands, which group frequencies more broadly. Output: - A dictionary with center frequencies as keys and corresponding band values as values. """ def _calculate_band_value(freq: np.ndarray, f_l: float, f_u: float, quant: np.ndarray, method: str) -> float: """ Calculate the band value based on the selected calculation method.""" # Find indices corresponding to the band idx = np.where((freq >= f_l) & (freq <= f_u))[0] if method == 'peak': return np.max(quant[idx]) if method == 'mean': return float(np.mean(quant[idx])) return np.sum(quant[idx]) # Ensure inputs are NumPy arrays frequencies = np.array(frequencies) quantities = np.array(quantities) # Select the appropriate band data bands_data = THIRD_OCTAVE_BANDS if third_octave else OCTAVE_BANDS bands = {} for band in bands_data: f_center = band['center'] f_lower = band['lower'] f_upper = band['upper'] if f_center > max(frequencies): break if f_upper - f_lower < frequencies[1] - frequencies[0]: continue # Skip if the band is smaller than the frequency increment # Calculate band value bands[f_center] = _calculate_band_value( freq=frequencies, f_l=f_lower, f_u=f_upper, quant=quantities, method=calculation_method) return bands
### =================================================================================================================== ### 3. End of script ### ===================================================================================================================