### ===================================================================================================================
### Frequency Domain Collection
### ===================================================================================================================
# Copyright ©2025 Haskoning Nederland B.V.
### ===================================================================================================================
### 1. Import modules
### ===================================================================================================================
# General imports
import warnings
import numpy as np
from pathlib import Path
from dataclasses import dataclass, field
from typing import Union, List, Optional, Tuple, Dict
# References for functions and classes in the haskoning_atr_tools package
from haskoning_atr_tools.signal_processing_tool.utils import plot_function, bar_plot_function, \
get_frequency_ban_level_units
from haskoning_atr_tools.signal_processing_tool.frequency_domain.data import FrequencyDomainData
from haskoning_atr_tools.signal_processing_tool.frequency_domain.tools import octave_form_frequency_domain_data
### ===================================================================================================================
### 2. FrequencyDomainCollection class
### ===================================================================================================================
[docs]
@dataclass
class FrequencyDomainCollection:
"""
The FrequencyDomainCollection class represents a collection of frequency spectrum objects.
Input:
- name (str): The name of the frequency domain collection.
- parent (obj): The parent project or entity that this collection belongs to.
- frequency_domain_data (list): List of FrequencyDomainData objects that are part of the collection.
"""
parent: 'ATRProject'
name: str
frequency_domain_data: List[FrequencyDomainData] = field(default_factory=list)
def __post_init__(self):
""" Automatically add this FrequencyDomainCollection to a parent project's collection list."""
if self.parent:
self.parent.add(self)
def __repr__(self):
""" Returns a string representation of the FrequencyDomainCollection."""
return f"FrequencyDomainCollection(name={self.name})"
[docs]
def add_frequency_domain_data_to_collection(self, frequency_domain_data: FrequencyDomainData) -> None:
"""
Method adds a FrequencyDomainData object to the frequency_domain_data.
Input:
- frequency_domain_data (FrequencyDomainData): The FrequencyDomainData object to be added to the collection.
Output:
- FrequencyDomainCollection is added to the collection.
"""
if not isinstance(frequency_domain_data, FrequencyDomainData):
raise TypeError("ERROR: The provided frequency_domain_data must be an instance of FrequencyDomainData.")
self.frequency_domain_data.append(frequency_domain_data)
[docs]
def check_frequency_alignment(self) -> bool:
""" Check if frequencies align across all FrequencyDomainData objects in the collection."""
if not self.frequency_domain_data:
raise ValueError("ERROR: The frequency domain list is empty.")
reference = self.frequency_domain_data[0].frequencies
return all(np.array_equal(reference, data.frequencies) for data in self.frequency_domain_data[1:])
[docs]
def find_collections_max_amplitudes(
self, frequency_range: Optional[List[float]] = None) -> Dict[str, Tuple[float, float]]:
"""
Method to find the maximum amplitude within a given frequency range for all frequency domain objects in the
collection.
Input:
- frequency_range (list of floats): The frequency range to search for the maximum amplitude.
Output:
- Returns dictionary with name of the frequency domain object and value tuple with the maximum amplitude as
float and its corresponding frequency as float.
"""
max_amplitudes = {}
for fdd in self.frequency_domain_data:
max_amplitudes[fdd.name] = fdd.find_max_amplitude(frequency_range=frequency_range)
return max_amplitudes
[docs]
def find_collections_significant_amplitudes(
self, amplitude_threshold: Optional[float] = None) -> Dict[str, List[Tuple[float, float, float]]]:
"""
Method to find the significant amplitudes, with optional threshold, for all frequency domain objects in the
collection.
Input:
- amplitude_threshold (float): Optional input for the threshold for significant amplitudes. Default value
None, in which case 10% of max amplitude is used.
Output:
- Returns dictionary with name of the frequency domain object and value the significant amplitudes as float,
the frequencies as float and phase angles as float, in [deg].
"""
significant_amplitudes = {}
for fdd in self.frequency_domain_data:
significant_amplitudes[fdd.name] = fdd.find_significant_amplitudes(amplitude_threshold=amplitude_threshold)
return significant_amplitudes
[docs]
def get_average_amplitudes(self) -> Tuple[List[float], List[float]]:
"""
Method calculates the average amplitudes across all FrequencyDomainData objects in the collection.
Output:
- Returns tuple of lists with the average amplitudes and frequencies.
"""
if not self.frequency_domain_data:
raise ValueError("ERROR: No frequency domain data available.")
# Stack amplitudes into a 2D NumPy array for efficient averaging
amplitude_matrix = np.array([fdd.amplitudes for fdd in self.frequency_domain_data])
# Validate shape consistency
if not all(len(row) == amplitude_matrix.shape[1] for row in amplitude_matrix):
raise ValueError("ERROR: Inconsistent amplitude lengths among FrequencyDomainData objects.")
# Compute mean across rows (i.e., across all objects)
average_amplitudes = amplitude_matrix.mean(axis=0).tolist()
# Assume all FrequencyDomainData objects share the same frequency vector
frequencies = self.frequency_domain_data[0].frequencies
return average_amplitudes, frequencies
[docs]
def get_average_power_density(self) -> Tuple[List[float], List[float]]:
"""
Calculate the average power density of all the FrequencyDomainData objects in the FrequencyDomainCollection.
Output:
- Returns tuple of two lists of floats, the average power densities and frequencies.
"""
# Calculate the power densities for each FrequencyDomainData object
power_densities = [fdd.get_power_density() for fdd in self.frequency_domain_data]
# Calculate the average power density
num_lists = len(power_densities)
num_elements = len(power_densities[0])
average_power_density = [
sum(power_densities[j][i] for j in range(num_lists)) / num_lists for i in range(num_elements)]
# Assuming all FrequencyDomainData objects have the same frequencies
return average_power_density, self.frequency_domain_data[0].frequencies
[docs]
def averaged_octaves(self, third_octave: bool = False, level_type: str = 'RMS') -> Dict[float, float]:
"""
Calculate the average octave or third-octave bands of all the FrequencyDomainData object in the
FrequencyDomainCollection, for different octave type.
Input:
- third_octave (bool): Input determines the frequency resolution of the output bands. When third-octave is
selected the function calculates third-octave bands, which divide each octave into three narrower bands.
Default value is False, the function calculates standard octave bands, which group frequencies more
broadly.
- level_type (str): Specifies the method used to compute the level of each frequency band. Options include
'peak', 'energy', 'power', and 'RMS'. Default value is 'RMS'.
Output:
- Returns dictionary of the octave or third-octave bands.
"""
if not isinstance(level_type, str):
raise TypeError("ERROR: Input for level-type should be a string. Please check your input.")
level_type = level_type.lower()
if level_type not in ['peak', 'energy', 'power', 'rms']:
raise ValueError(
"ERROR: Invalid input for level-type. Please select from 'peak', 'energy', 'power', or 'RMS'.")
# Calculate octaves based on level-type 'power', 'rms', or 'energy'
if level_type in ['power', 'rms', 'energy']:
power_densities, frequencies = self.get_average_power_density()
power_density_octave = octave_form_frequency_domain_data(
quantities=power_densities, frequencies=frequencies, calculation_method='cumulative',
third_octave=third_octave)
freq_increment = frequencies[1] - frequencies[0]
power_octave = {k: v * freq_increment for k, v in power_density_octave.items()}
if level_type == 'power':
return power_octave
if level_type == 'energy':
return {k: v * self.frequency_domain_data[0].signal_duration for k, v in power_octave.items()}
if level_type == 'rms':
return {k: np.sqrt(v) for k, v in power_octave.items()}
# Level-type is 'peak'
average_amplitudes, frequencies = self.get_average_amplitudes()
return octave_form_frequency_domain_data(
quantities=average_amplitudes, frequencies=frequencies, calculation_method=level_type,
third_octave=third_octave)
[docs]
def convert_collection_to_time_domain(self, name: str = None) -> 'TimeDomainCollection':
"""
Method converts the frequency domain collection to a time domain collection.
Input:
- name (str): The name of the collection to which the time domain data belongs. Default value is None, in
which case the name of the frequency domain collection is used.
Output:
- Returns the converted time domain collection.
"""
if not name:
name = f"time domain Collection from {self.name}"
for frequency_domain_data in self.frequency_domain_data:
frequency_domain_data.convert_to_time_domain(collection_name=name)
return self.parent.time_domain_collections[name]
[docs]
def plot(
self, spectra_type: Union[str, List[str]] = None, log_scale_x: bool = False, log_scale_y: bool = False,
x_lim: Optional[List[float]] = None, y_lim: Optional[List[float]] = None,
file: Optional[Union[Path, str]] = None, show: bool = True, line_width: float = 1.0,
plot_max_values: bool = False) -> Optional[List[Path]]:
"""
Method to plot the specified spectra type(s) of the FrequencyDomainData objects in the collection.
Input:
- spectra_type (str or list of str): The type(s) of spectra to plot. Options are: 'amplitude',
'power density' or 'psd', 'energy density' or 'esd', and 'rms' or 'linear'. If None, all spectra types are
plotted.
- log_scale_x (bool): Select to set the x-axis to a logarithmic scale. Default value is False.
- log_scale_y (bool): Select to set the y-axis to a logarithmic scale. Default value is False.
- x_lim (list of float): Optional list to set limits [bottom limit, top limit] for the x-axis. Default value
is None.
- y_lim (list of float): Optional list to set limits [bottom limit, top limit] for the y-axis. Default value
is None.
- file (Path): Provide the full path and filename for the image of the graph to be created. The image is a
png-file, if the suffix is omitted it will be added. An error will occur if the picture format is set
to a non-supported format. It is also possible to provide the folder only, the title is used as filename.
- show (bool): Select to plot the graph in the environment when running the script. Default value True.
- line_width (float): Change line width, default = 1.
- plot_max_values (bool): Select to calculate the maximum y value for each line and add it to the plot.
Default value is False.
Output:
- Returns the paths of the files if the plots were selected to be saved to a file, otherwise returns None.
"""
if not self.frequency_domain_data:
raise ValueError("ERROR: The frequency domain list of the collection is empty. Plot cannot be created.")
if spectra_type is None:
spectra_type = ['amplitude', 'power density', 'energy density', 'rms']
elif isinstance(spectra_type, str):
spectra_type = [spectra_type]
if not all(isinstance(st, str) for st in spectra_type):
raise TypeError(
"ERROR: Input for plot method of FrequencyDomainCollection spectra-type must be a string or a list of "
"strings.")
spectra_type = [st.lower() for st in spectra_type]
first_fdd = self.frequency_domain_data[0]
amplitude_type = first_fdd.amplitude_type
amplitude_units = first_fdd.amplitude_units
frequency_units = first_fdd.frequency_units
for fdd in self.frequency_domain_data:
if (fdd.amplitude_type != amplitude_type or
fdd.frequency_units != frequency_units or
amplitude_units != fdd.amplitude_units):
warnings.warn(
"WARNING: Inconsistent amplitude type, amplitude units or frequency units in the frequency domain "
"list of FrequencyDomainCollection. Please check plot carefully.")
plots_created = []
for st in spectra_type:
if st == 'amplitude':
y_values = [fdd.amplitudes for fdd in self.frequency_domain_data]
units = first_fdd.amplitude_units
elif st in ['power density', 'psd']:
y_values = [fdd.get_power_density() for fdd in self.frequency_domain_data]
units = first_fdd.power_units
elif st in ['energy density', 'esd']:
y_values = [fdd.get_energy_density() for fdd in self.frequency_domain_data]
units = first_fdd.power_units
elif st in ['rms', 'linear']:
y_values = [fdd.get_rms_spectrum() for fdd in self.frequency_domain_data]
units = first_fdd.amplitude_units
else:
raise ValueError(f"ERROR: Unknown spectrum type: {st} for plot of FrequencyDomainCollection.")
plots_created.append(plot_function(
x_values=[fdd.frequencies for fdd in self.frequency_domain_data], y_values=y_values,
labels=[fdd.name for fdd in self.frequency_domain_data],
title=f'{amplitude_type.capitalize()} {st.capitalize()} Spectrum of {self.name}',
x_label=f'Frequency [{frequency_units}]',
y_label=f'{amplitude_type.capitalize()} {st.capitalize()} [{units}]', log_scale_x=log_scale_x,
log_scale_y=log_scale_y, x_lim=x_lim, y_lim=y_lim, file=file, show=show, line_width=line_width,
plot_max_values=plot_max_values))
[docs]
def plot_octaves(
self, third_octave: bool = False, level_type: Optional[str] = None, log_scale_y: bool = False,
x_lim: Optional[List[float]] = None, y_lim: Optional[List[float]] = None,
file: Optional[Union[Path, str]] = None, show: bool = True, bar_width: float = 1,
show_bar_values: bool = True, decimals_displayed: int = 2) -> Optional[Path]:
"""
Plot the octave or third-octave bands for all FrequencyDomainData objects in the collection.
Input:
- third_octave (bool): Input determines the frequency resolution of the output bands. When third-octave is
selected the function calculates third-octave bands, which divide each octave into three narrower bands.
Default value is False, the function calculates standard octave bands, which group frequencies more
broadly.
- level_type (str): Specifies the method used to compute the level of each frequency band. Options include
'peak', 'energy', 'power', and 'RMS'. Default value is 'RMS'.
- log_scale_y (bool): Select to set the y-axis to a logarithmic scale. Default value is False.
- x_lim (list of float): Optional list to set limits [bottom limit, top limit] for the x-axis. Default value
is None.
- y_lim (list of float): Optional list to set limits [bottom limit, top limit] for the y-axis. Default value
is None.
- file (Path): Provide the full path and filename for the image of the graph to be created. The image is a
png-file, if the suffix is omitted it will be added. An error will occur if the picture format is set
to a non-supported format. It is also possible to provide the folder only, the title is used as filename.
- show (bool): Select to plot the graph in the environment when running the script. Default value True.
- bar_width (float): The width of the bars. Default value is 1.
- show_bar_values (bool): Select to display the labels on the bars. Default value is True.
- decimals_displayed (int): The number of decimal places for formatting values. Default value is 2.
Output:
- Returns the path of the file if the plot was selected to be saved to a file, otherwise returns None.
"""
if not self.frequency_domain_data:
raise ValueError("ERROR: The frequency domain list of the collection is empty. Plot cannot be created.")
first_fdd = next(iter(self.frequency_domain_data))
amplitude_type = first_fdd.amplitude_type
frequency_units = first_fdd.frequency_units
amplitude_units = first_fdd.amplitude_units
x_values = None
y_values = []
group_labels = []
for fdd in self.frequency_domain_data:
if (fdd.amplitude_type != amplitude_type or fdd.frequency_units != frequency_units or
fdd.amplitude_units != amplitude_units):
warnings.warn(
"WARNING: Inconsistent amplitude type, frequency units, or amplitude units in the frequency domain "
"list. Please check plot.")
# Calculate octave or third-octave bands
bands = fdd.octaves(third_octave=third_octave, level_type=level_type)
if x_values is None:
x_values = list(bands.keys())
y_values.append(list(bands.values()))
group_labels.append(fdd.name)
# Set units based on the type
units = get_frequency_ban_level_units(
level_type=level_type, amplitude_units=amplitude_units, x_units=frequency_units)
# Plot using the grouped_bar_plot_function
return bar_plot_function(
x_values=x_values, y_values=y_values, labels=group_labels,
title=f'{level_type.capitalize()} {"Third " if third_octave else ""}Octave Band Plot of {self.name}',
x_label=f'Frequency [{frequency_units}]',
y_label=f'{amplitude_type.capitalize()} {level_type.capitalize()} [{units}]', log_scale_y=log_scale_y,
x_lim=x_lim, y_lim=y_lim, file=file, show=show, bar_width=bar_width, show_bar_values=show_bar_values,
decimals_displayed=decimals_displayed)
### ===================================================================================================================
### 3. End of script
### ===================================================================================================================