Source code for haskoning_atr_tools.signal_processing_tool.frequency_domain.collection

### ===================================================================================================================
###   Frequency Domain Collection
### ===================================================================================================================
# Copyright ©2025 Haskoning Nederland B.V.

### ===================================================================================================================
###   1. Import modules
### ===================================================================================================================

# General imports
import warnings
import numpy as np
from pathlib import Path
from dataclasses import dataclass, field
from typing import Union, List, Optional, Tuple, Dict

# References for functions and classes in the haskoning_atr_tools package
from haskoning_atr_tools.signal_processing_tool.utils import plot_function, bar_plot_function, \
    get_frequency_ban_level_units
from haskoning_atr_tools.signal_processing_tool.frequency_domain.data import FrequencyDomainData
from haskoning_atr_tools.signal_processing_tool.frequency_domain.tools import octave_form_frequency_domain_data


### ===================================================================================================================
###   2. FrequencyDomainCollection class
### ===================================================================================================================

[docs] @dataclass class FrequencyDomainCollection: """ The FrequencyDomainCollection class represents a collection of frequency spectrum objects. Input: - name (str): The name of the frequency domain collection. - parent (obj): The parent project or entity that this collection belongs to. - frequency_domain_data (list): List of FrequencyDomainData objects that are part of the collection. """ parent: 'ATRProject' name: str frequency_domain_data: List[FrequencyDomainData] = field(default_factory=list) def __post_init__(self): """ Automatically add this FrequencyDomainCollection to a parent project's collection list.""" if self.parent: self.parent.add(self) def __repr__(self): """ Returns a string representation of the FrequencyDomainCollection.""" return f"FrequencyDomainCollection(name={self.name})"
[docs] def add_frequency_domain_data_to_collection(self, frequency_domain_data: FrequencyDomainData) -> None: """ Method adds a FrequencyDomainData object to the frequency_domain_data. Input: - frequency_domain_data (FrequencyDomainData): The FrequencyDomainData object to be added to the collection. Output: - FrequencyDomainCollection is added to the collection. """ if not isinstance(frequency_domain_data, FrequencyDomainData): raise TypeError("ERROR: The provided frequency_domain_data must be an instance of FrequencyDomainData.") self.frequency_domain_data.append(frequency_domain_data)
[docs] def check_frequency_alignment(self) -> bool: """ Check if frequencies align across all FrequencyDomainData objects in the collection.""" if not self.frequency_domain_data: raise ValueError("ERROR: The frequency domain list is empty.") reference = self.frequency_domain_data[0].frequencies return all(np.array_equal(reference, data.frequencies) for data in self.frequency_domain_data[1:])
[docs] def find_collections_max_amplitudes( self, frequency_range: Optional[List[float]] = None) -> Dict[str, Tuple[float, float]]: """ Method to find the maximum amplitude within a given frequency range for all frequency domain objects in the collection. Input: - frequency_range (list of floats): The frequency range to search for the maximum amplitude. Output: - Returns dictionary with name of the frequency domain object and value tuple with the maximum amplitude as float and its corresponding frequency as float. """ max_amplitudes = {} for fdd in self.frequency_domain_data: max_amplitudes[fdd.name] = fdd.find_max_amplitude(frequency_range=frequency_range) return max_amplitudes
[docs] def find_collections_significant_amplitudes( self, amplitude_threshold: Optional[float] = None) -> Dict[str, List[Tuple[float, float, float]]]: """ Method to find the significant amplitudes, with optional threshold, for all frequency domain objects in the collection. Input: - amplitude_threshold (float): Optional input for the threshold for significant amplitudes. Default value None, in which case 10% of max amplitude is used. Output: - Returns dictionary with name of the frequency domain object and value the significant amplitudes as float, the frequencies as float and phase angles as float, in [deg]. """ significant_amplitudes = {} for fdd in self.frequency_domain_data: significant_amplitudes[fdd.name] = fdd.find_significant_amplitudes(amplitude_threshold=amplitude_threshold) return significant_amplitudes
[docs] def get_average_amplitudes(self) -> Tuple[List[float], List[float]]: """ Method calculates the average amplitudes across all FrequencyDomainData objects in the collection. Output: - Returns tuple of lists with the average amplitudes and frequencies. """ if not self.frequency_domain_data: raise ValueError("ERROR: No frequency domain data available.") # Stack amplitudes into a 2D NumPy array for efficient averaging amplitude_matrix = np.array([fdd.amplitudes for fdd in self.frequency_domain_data]) # Validate shape consistency if not all(len(row) == amplitude_matrix.shape[1] for row in amplitude_matrix): raise ValueError("ERROR: Inconsistent amplitude lengths among FrequencyDomainData objects.") # Compute mean across rows (i.e., across all objects) average_amplitudes = amplitude_matrix.mean(axis=0).tolist() # Assume all FrequencyDomainData objects share the same frequency vector frequencies = self.frequency_domain_data[0].frequencies return average_amplitudes, frequencies
[docs] def get_average_power_density(self) -> Tuple[List[float], List[float]]: """ Calculate the average power density of all the FrequencyDomainData objects in the FrequencyDomainCollection. Output: - Returns tuple of two lists of floats, the average power densities and frequencies. """ # Calculate the power densities for each FrequencyDomainData object power_densities = [fdd.get_power_density() for fdd in self.frequency_domain_data] # Calculate the average power density num_lists = len(power_densities) num_elements = len(power_densities[0]) average_power_density = [ sum(power_densities[j][i] for j in range(num_lists)) / num_lists for i in range(num_elements)] # Assuming all FrequencyDomainData objects have the same frequencies return average_power_density, self.frequency_domain_data[0].frequencies
[docs] def averaged_octaves(self, third_octave: bool = False, level_type: str = 'RMS') -> Dict[float, float]: """ Calculate the average octave or third-octave bands of all the FrequencyDomainData object in the FrequencyDomainCollection, for different octave type. Input: - third_octave (bool): Input determines the frequency resolution of the output bands. When third-octave is selected the function calculates third-octave bands, which divide each octave into three narrower bands. Default value is False, the function calculates standard octave bands, which group frequencies more broadly. - level_type (str): Specifies the method used to compute the level of each frequency band. Options include 'peak', 'energy', 'power', and 'RMS'. Default value is 'RMS'. Output: - Returns dictionary of the octave or third-octave bands. """ if not isinstance(level_type, str): raise TypeError("ERROR: Input for level-type should be a string. Please check your input.") level_type = level_type.lower() if level_type not in ['peak', 'energy', 'power', 'rms']: raise ValueError( "ERROR: Invalid input for level-type. Please select from 'peak', 'energy', 'power', or 'RMS'.") # Calculate octaves based on level-type 'power', 'rms', or 'energy' if level_type in ['power', 'rms', 'energy']: power_densities, frequencies = self.get_average_power_density() power_density_octave = octave_form_frequency_domain_data( quantities=power_densities, frequencies=frequencies, calculation_method='cumulative', third_octave=third_octave) freq_increment = frequencies[1] - frequencies[0] power_octave = {k: v * freq_increment for k, v in power_density_octave.items()} if level_type == 'power': return power_octave if level_type == 'energy': return {k: v * self.frequency_domain_data[0].signal_duration for k, v in power_octave.items()} if level_type == 'rms': return {k: np.sqrt(v) for k, v in power_octave.items()} # Level-type is 'peak' average_amplitudes, frequencies = self.get_average_amplitudes() return octave_form_frequency_domain_data( quantities=average_amplitudes, frequencies=frequencies, calculation_method=level_type, third_octave=third_octave)
[docs] def convert_collection_to_time_domain(self, name: str = None) -> 'TimeDomainCollection': """ Method converts the frequency domain collection to a time domain collection. Input: - name (str): The name of the collection to which the time domain data belongs. Default value is None, in which case the name of the frequency domain collection is used. Output: - Returns the converted time domain collection. """ if not name: name = f"time domain Collection from {self.name}" for frequency_domain_data in self.frequency_domain_data: frequency_domain_data.convert_to_time_domain(collection_name=name) return self.parent.time_domain_collections[name]
[docs] def plot( self, spectra_type: Union[str, List[str]] = None, log_scale_x: bool = False, log_scale_y: bool = False, x_lim: Optional[List[float]] = None, y_lim: Optional[List[float]] = None, file: Optional[Union[Path, str]] = None, show: bool = True, line_width: float = 1.0, plot_max_values: bool = False) -> Optional[List[Path]]: """ Method to plot the specified spectra type(s) of the FrequencyDomainData objects in the collection. Input: - spectra_type (str or list of str): The type(s) of spectra to plot. Options are: 'amplitude', 'power density' or 'psd', 'energy density' or 'esd', and 'rms' or 'linear'. If None, all spectra types are plotted. - log_scale_x (bool): Select to set the x-axis to a logarithmic scale. Default value is False. - log_scale_y (bool): Select to set the y-axis to a logarithmic scale. Default value is False. - x_lim (list of float): Optional list to set limits [bottom limit, top limit] for the x-axis. Default value is None. - y_lim (list of float): Optional list to set limits [bottom limit, top limit] for the y-axis. Default value is None. - file (Path): Provide the full path and filename for the image of the graph to be created. The image is a png-file, if the suffix is omitted it will be added. An error will occur if the picture format is set to a non-supported format. It is also possible to provide the folder only, the title is used as filename. - show (bool): Select to plot the graph in the environment when running the script. Default value True. - line_width (float): Change line width, default = 1. - plot_max_values (bool): Select to calculate the maximum y value for each line and add it to the plot. Default value is False. Output: - Returns the paths of the files if the plots were selected to be saved to a file, otherwise returns None. """ if not self.frequency_domain_data: raise ValueError("ERROR: The frequency domain list of the collection is empty. Plot cannot be created.") if spectra_type is None: spectra_type = ['amplitude', 'power density', 'energy density', 'rms'] elif isinstance(spectra_type, str): spectra_type = [spectra_type] if not all(isinstance(st, str) for st in spectra_type): raise TypeError( "ERROR: Input for plot method of FrequencyDomainCollection spectra-type must be a string or a list of " "strings.") spectra_type = [st.lower() for st in spectra_type] first_fdd = self.frequency_domain_data[0] amplitude_type = first_fdd.amplitude_type amplitude_units = first_fdd.amplitude_units frequency_units = first_fdd.frequency_units for fdd in self.frequency_domain_data: if (fdd.amplitude_type != amplitude_type or fdd.frequency_units != frequency_units or amplitude_units != fdd.amplitude_units): warnings.warn( "WARNING: Inconsistent amplitude type, amplitude units or frequency units in the frequency domain " "list of FrequencyDomainCollection. Please check plot carefully.") plots_created = [] for st in spectra_type: if st == 'amplitude': y_values = [fdd.amplitudes for fdd in self.frequency_domain_data] units = first_fdd.amplitude_units elif st in ['power density', 'psd']: y_values = [fdd.get_power_density() for fdd in self.frequency_domain_data] units = first_fdd.power_units elif st in ['energy density', 'esd']: y_values = [fdd.get_energy_density() for fdd in self.frequency_domain_data] units = first_fdd.power_units elif st in ['rms', 'linear']: y_values = [fdd.get_rms_spectrum() for fdd in self.frequency_domain_data] units = first_fdd.amplitude_units else: raise ValueError(f"ERROR: Unknown spectrum type: {st} for plot of FrequencyDomainCollection.") plots_created.append(plot_function( x_values=[fdd.frequencies for fdd in self.frequency_domain_data], y_values=y_values, labels=[fdd.name for fdd in self.frequency_domain_data], title=f'{amplitude_type.capitalize()} {st.capitalize()} Spectrum of {self.name}', x_label=f'Frequency [{frequency_units}]', y_label=f'{amplitude_type.capitalize()} {st.capitalize()} [{units}]', log_scale_x=log_scale_x, log_scale_y=log_scale_y, x_lim=x_lim, y_lim=y_lim, file=file, show=show, line_width=line_width, plot_max_values=plot_max_values))
[docs] def plot_octaves( self, third_octave: bool = False, level_type: Optional[str] = None, log_scale_y: bool = False, x_lim: Optional[List[float]] = None, y_lim: Optional[List[float]] = None, file: Optional[Union[Path, str]] = None, show: bool = True, bar_width: float = 1, show_bar_values: bool = True, decimals_displayed: int = 2) -> Optional[Path]: """ Plot the octave or third-octave bands for all FrequencyDomainData objects in the collection. Input: - third_octave (bool): Input determines the frequency resolution of the output bands. When third-octave is selected the function calculates third-octave bands, which divide each octave into three narrower bands. Default value is False, the function calculates standard octave bands, which group frequencies more broadly. - level_type (str): Specifies the method used to compute the level of each frequency band. Options include 'peak', 'energy', 'power', and 'RMS'. Default value is 'RMS'. - log_scale_y (bool): Select to set the y-axis to a logarithmic scale. Default value is False. - x_lim (list of float): Optional list to set limits [bottom limit, top limit] for the x-axis. Default value is None. - y_lim (list of float): Optional list to set limits [bottom limit, top limit] for the y-axis. Default value is None. - file (Path): Provide the full path and filename for the image of the graph to be created. The image is a png-file, if the suffix is omitted it will be added. An error will occur if the picture format is set to a non-supported format. It is also possible to provide the folder only, the title is used as filename. - show (bool): Select to plot the graph in the environment when running the script. Default value True. - bar_width (float): The width of the bars. Default value is 1. - show_bar_values (bool): Select to display the labels on the bars. Default value is True. - decimals_displayed (int): The number of decimal places for formatting values. Default value is 2. Output: - Returns the path of the file if the plot was selected to be saved to a file, otherwise returns None. """ if not self.frequency_domain_data: raise ValueError("ERROR: The frequency domain list of the collection is empty. Plot cannot be created.") first_fdd = next(iter(self.frequency_domain_data)) amplitude_type = first_fdd.amplitude_type frequency_units = first_fdd.frequency_units amplitude_units = first_fdd.amplitude_units x_values = None y_values = [] group_labels = [] for fdd in self.frequency_domain_data: if (fdd.amplitude_type != amplitude_type or fdd.frequency_units != frequency_units or fdd.amplitude_units != amplitude_units): warnings.warn( "WARNING: Inconsistent amplitude type, frequency units, or amplitude units in the frequency domain " "list. Please check plot.") # Calculate octave or third-octave bands bands = fdd.octaves(third_octave=third_octave, level_type=level_type) if x_values is None: x_values = list(bands.keys()) y_values.append(list(bands.values())) group_labels.append(fdd.name) # Set units based on the type units = get_frequency_ban_level_units( level_type=level_type, amplitude_units=amplitude_units, x_units=frequency_units) # Plot using the grouped_bar_plot_function return bar_plot_function( x_values=x_values, y_values=y_values, labels=group_labels, title=f'{level_type.capitalize()} {"Third " if third_octave else ""}Octave Band Plot of {self.name}', x_label=f'Frequency [{frequency_units}]', y_label=f'{amplitude_type.capitalize()} {level_type.capitalize()} [{units}]', log_scale_y=log_scale_y, x_lim=x_lim, y_lim=y_lim, file=file, show=show, bar_width=bar_width, show_bar_values=show_bar_values, decimals_displayed=decimals_displayed)
### =================================================================================================================== ### 3. End of script ### ===================================================================================================================