Source code for haskoning_atr_tools.signal_processing_tool.time_domain.tools

### ===================================================================================================================
###   Time Domain Tools
### ===================================================================================================================
# Copyright ©2025 Haskoning Nederland B.V.

### ===================================================================================================================
###   1. Import modules
### ===================================================================================================================

# General imports
import warnings
import numpy as np
from scipy.fft import fft, fftfreq, rfft, rfftfreq
from scipy.signal import butter, buttord, sosfiltfilt
from typing import List, Tuple, Optional, Dict, Union

# References for functions and classes in the haskoning_atr_tools package
from haskoning_atr_tools.signal_processing_tool.utils.constants import WINDOW_PARAMETERS
from haskoning_atr_tools.signal_processing_tool.frequency_domain.tools import \
    convert_double_sided_to_real_single_sided_spectrum

# Formatting warnings
warnings.formatwarning = lambda message, category, filename, lineno, line=None: \
    f"{filename}:{lineno}: {category.__name__}: {message}\n"
warnings.simplefilter('always')


### ===================================================================================================================
###   2. Function for editing time domain data
### ===================================================================================================================

[docs] def crop_time_domain_data( time_stamps: List[float], amplitudes: List[float], from_front: bool = True, number_of_items: Optional[int] = None, time_amount: Optional[float] = None, percentage_to_crop: Optional[float] = None, crop_at: Optional[float] = None) -> Tuple[List[float], List[float]]: """ Function to crop signal at the front or back given different cues. Input: - time_stamps (list): List of time stamps. - amplitudes (list): List of amplitude values. - from_front (bool): If True, as default, crop from the front. If False, crop from the back. - number_of_items (int): The number of items to remove. - time_amount (float): The amount of time to remove. - percentage_to_crop (float): The percentage of the signal to remove. (10 = 10%) - crop_at (float): The value of the time_stamp signal from where to remove. Output: - Returns tuple with cropped time stamps as list of floats and amplitudes as list of floats. """ if time_amount: time_step = time_stamps[1] - time_stamps[0] number_of_items = int(time_amount / time_step) elif percentage_to_crop: number_of_items = int(len(time_stamps) * percentage_to_crop / 100) elif crop_at: number_of_items = next((i for i, v in enumerate(time_stamps) if v > crop_at), None) elif number_of_items is None: raise ValueError( "ERROR: To execute the cropping of the signal, at least one of number_of_items, time_amount, " "percentage_to_crop, or crop_at must be provided.") if from_front: return time_stamps[number_of_items:], amplitudes[number_of_items:] return time_stamps[:-number_of_items], amplitudes[:-number_of_items]
[docs] def zero_pad_time_domain_data( time_stamps: List[float], amplitudes: List[float], pad_from: str = 'both', number_of_items: Optional[int] = None, time_amount: Optional[float] = None, percentage_to_pad: Optional[float] = None) -> Tuple[List[float], List[float]]: """ Zero pad the signal at the front, back, or both given different cues. Input: - time_stamps (list of float): List of time stamps. - amplitudes (list of float): List of amplitude values. - pad_from (str): Where to pad the signal. Options are 'front', 'back', 'both'. Defaults to 'both'. - number_of_items (int): Optional input for the number of items to pad. - time_amount (float): Optional input for the amount of time to pad. - percentage_to_pad (float): Optional input for the percentage of the signal to pad (10 = 10%). Output: - Returns tuple with padded time stamps as list of floats and amplitudes as list of floats. """ time_step = time_stamps[1] - time_stamps[0] if time_amount: number_of_items = int(time_amount / time_step) elif percentage_to_pad: number_of_items = int(len(time_stamps) * percentage_to_pad / 100) elif number_of_items is None: raise ValueError( "ERROR: To execute the zero padding of the signal, at least one of number_of_items, time_amount, or " 'percentage_to_pad must be provided.') if pad_from not in ['front', 'back', 'both']: raise ValueError( "ERROR: To execute the zero padding of the signal, select one of the options for pad_from input: 'front', " "'back' or 'both'.") if pad_from in ['front', 'both']: front_time_stamps = [time_stamps[0] - (i + 1) * time_step for i in range(number_of_items)][::-1] front_amplitudes = [0] * number_of_items time_stamps = front_time_stamps + time_stamps amplitudes = front_amplitudes + amplitudes if pad_from in ['back', 'both']: back_time_stamps = [time_stamps[-1] + (i + 1) * time_step for i in range(number_of_items)] back_amplitudes = [0] * number_of_items time_stamps = time_stamps + back_time_stamps amplitudes = amplitudes + back_amplitudes return time_stamps, amplitudes
[docs] def shift_time_stamp_start(time_stamps: List[float], start_time: float) -> List[float]: """ Function to shift the start time of the time domain data. Input: - time_stamps (list of floats): The original time stamps of the time domain. - start_time (float): The new start time for the time domain. Output: - Returns the shifted time stamps as list of float. """ # Calculate the time shift time_shift = start_time - time_stamps[0] # Shift the time stamps return [time_stamp + time_shift for time_stamp in time_stamps]
[docs] def combine_time_domain_data(time_domain_list: List[Dict[str, List[float]]]) -> Tuple[List[float], List[float]]: """ Combine the amplitudes of multiple signals. Input: - time_domain_list (list of dicts): A list of dictionaries, each containing 'time_stamps' (as list of floats), the time stamps of the time domain and 'amplitudes' (as list of floats), the amplitudes of the time domain. Output: - Returns the combined time_stamps and amplitudes as tuple with two lists of floats. """ if not time_domain_list: raise ValueError( "ERROR: To execute the combining of domain data, the list with time domain data should be provided.") # Shift all time stamps to start at 0 for ts in time_domain_list: ts['time_stamps'] = shift_time_stamp_start(ts['time_stamps'], start_time=0) # Determine the maximum length of all time domain max_length = max(len(ts['time_stamps']) for ts in time_domain_list) # Find the time domain with the maximum length longest_time_domain = max(time_domain_list, key=lambda ts: len(ts['time_stamps'])) # Set combined_time_stamps to the time stamps of the longest time domain combined_time_stamps = longest_time_domain['time_stamps'] # Allocate the size of combined_amplitudes combined_amplitudes = np.zeros(max_length) for ts in time_domain_list: # Check if the time stamps of the overlapping parts are the same min_length = min(len(combined_time_stamps), len(ts['time_stamps'])) if not np.array_equal(combined_time_stamps[:min_length], ts['time_stamps'][:min_length]): raise ValueError( "ERROR: To execute the combining of domain data the time stamps must match for the overlapping parts.") # Issue a warning if the lengths of the signals are different if len(combined_time_stamps) != len(ts['time_stamps']): warnings.warn( "WARNING: The lengths of the signals that are being combined are different. The non-overlapping parts " "will be padded with zeros.") # Pad the shorter signal with zeros to match the maximum length padded_amplitudes = np.pad(ts['amplitudes'], (0, max_length - len(ts['amplitudes'])), 'constant') # Fill in the values for combined_amplitudes combined_amplitudes += padded_amplitudes return combined_time_stamps, combined_amplitudes.tolist()
[docs] def stitch_time_domain_data( time_domain_list: List[Dict[str, Union[str, List[float]]]]) -> Tuple[List[float], List[float]]: """ Stitch together multiple signals. Input: - time_domain_list (list of dicts): A list of dictionaries, each containing the following keys and values: 'amplitude_type' as string, the type of the time domain data. 'amplitude_units' as string, the units of the amplitude values. 'time_stamps_units' as string, the units of the time stamps. 'time_stamps' as list of floats, the time stamps of the time domain. Output: - Returns the stitched time_stamps and amplitudes as tuple with two lists of floats. """ if not time_domain_list: raise ValueError( "ERROR: To execute the stitching of domain data, the list with time domain data should be provided.") # Shift all time stamps to start at 0 for ts in time_domain_list: ts['time_stamps'] = shift_time_stamp_start(ts['time_stamps'], start_time=0) stitched_time_stamps = [] stitched_amplitudes = [] for ts in time_domain_list: if stitched_time_stamps: # Adjust the time_stamps of the current signal to start after the end of the previous signal time_interval = stitched_time_stamps[-1] - stitched_time_stamps[-2] ts['time_stamps'] = \ [time_stamp + stitched_time_stamps[-1] + time_interval for time_stamp in ts['time_stamps']] stitched_time_stamps.extend(ts['time_stamps']) stitched_amplitudes.extend(ts['amplitudes']) return stitched_time_stamps, stitched_amplitudes
[docs] def rotate_signals(amplitudes1: List[float], amplitudes2: List[float], angle: float) -> Tuple[List[float], List[float]]: """ Rotate two orthogonal signals by a specified angle. Input: - amplitudes1 (list of floats): Amplitudes of the first signal. - amplitudes2 (list of floats): Amplitudes of the second signal. - angle (float): The angle of rotation, in [deg]. Output: - Returns tuple: Rotated amplitudes of the two signals. """ # Convert angle to radians angle_rad = np.radians(angle) # Create rotation matrix rotation_matrix = np.array([[np.cos(angle_rad), -np.sin(angle_rad)], [np.sin(angle_rad), np.cos(angle_rad)]]) # Stack amplitudes into a 2D array amplitudes = np.vstack((amplitudes1, amplitudes2)) # Apply rotation matrix to amplitudes return np.dot(rotation_matrix, amplitudes).tolist()
### =================================================================================================================== ### 3. Function for checking time domain data ### ===================================================================================================================
[docs] def check_time_stamps_intervals(time_stamps: List[float]) -> bool: """ Function to check if all time steps in the time domain are the same size. Input: - time_stamps (list of floats): The time stamps of the time domain. Output: - Returns boolean with the outcome of the check. Warning is thrown if the time steps are not the same size. """ # Check if time stamps are in the same interval intervals = np.diff(time_stamps) if not np.allclose(intervals, intervals[0]): warnings.warn("WARNING: Not all time steps are the same size.") return False return True
[docs] def check_td_average(amplitudes: List[float], relative_limit: float = 0.1) -> bool: """ Function to check if the average of the signal is more than a specified fraction of the maximum amplitude. Input: - amplitudes (list of float]): The amplitude values of the signal. - relative_limit (float): The fraction of the maximum amplitude to compare against. Default value is 0.1. Output: - Returns boolean with the outcome of the check. Warning is thrown if the average of the signal is more than the specified fraction of the maximum amplitude. """ if np.abs(np.mean(amplitudes)) > np.max(amplitudes) * relative_limit: warnings.warn( f"WARNING: The average of the signal is more than {relative_limit * 100}% the biggest amplitude. (DC>>0)") return False return True
[docs] def check_td_noise(amplitudes: List[float], relative_limit: float = 0.01) -> bool: """ Function to check if the signal has noise by evaluating its variance. Input: - amplitudes (list of float]): The amplitude values of the signal. - relative_limit (float): The fraction of the maximum amplitude to compare against. Default value is 0.01. Output: - Returns boolean with the outcome of the check. Warning is thrown if the signal has noise (variance is significantly different from zero). """ max_amplitude = np.max(amplitudes) - np.min(amplitudes) threshold_variance = (max_amplitude * relative_limit) ** 2 signal_variance = np.var(amplitudes) if signal_variance > threshold_variance: warnings.warn("The time domain has significant noise. Filtering might be needed.") return True return False
[docs] def check_periodicity(amplitudes: List[float], threshold: float = 0.01) -> bool: """ Function to check if the signal is periodic by checking if the amplitude the signal starts and ends with less than 1% of the max amplitude. .. note:: This check is very simplified, check can be extended in future. Input: - amplitudes (list of float]): The amplitude values of the signal. - threshold (float): The threshold for determining periodicity. Output: - Returns boolean with the outcome of the check. True if the signal is periodic, False otherwise. """ fraction_signal_length = int(len(amplitudes) / 20) signal_start = amplitudes[:fraction_signal_length] signal_end = amplitudes[-fraction_signal_length:] min_amplitude = np.abs(np.max(amplitudes) - np.mean(amplitudes)) * threshold start_max = np.abs(np.max(signal_start) - np.mean(amplitudes)) end_max = np.abs(np.max(signal_end) - np.mean(amplitudes)) if not (start_max > min_amplitude or end_max > min_amplitude): warnings.warn( "WARNING: The signal does not start or end close to the average or value, most-likely not periodic.") return False return True
[docs] def check_stationary( amplitudes: List[float], relative_tolerance: float = 0.1, absolute_tolerance: float = 0.1) -> bool: """ Function to check if the signal is stationary (mean and variance do not change over time) by checking if the variance of the first half of the signal is significantly different from the second half, we consider it non-stationary. Input: - amplitudes (List[float]): The amplitude values of the time domain. - relative_tolerance (float): The relative tolerance parameter for comparing variances. Default value is 0.1. - absolute_tolerance (float): The absolute tolerance parameter for comparing means max amplitude difference. Default value is 0.1. Output: - Returns True if the signal is stationary, else False. """ half = len(amplitudes) // 2 if not np.isclose(np.var(amplitudes[:half]), np.var(amplitudes[half:]), rtol=relative_tolerance): warnings.warn("WARNING: The variance of the time domain is not stationary.") # print((np.var(amplitudes[:half]), np.var(amplitudes[half:]))) return False if not np.isclose( np.mean(amplitudes[:half]), np.mean(amplitudes[half:]), atol=absolute_tolerance * (np.max(amplitudes) - np.min(amplitudes))): warnings.warn("WARNING: The mean of the time domain is not stationary.") return False return True
[docs] def check_td_ft_suitability(amplitudes: List[float], time_stamps: List[float]) -> bool: """ Function to check if the signal is suitable for Fourier Transform by checking time intervals, periodicity, and if it is stationary. Input: - amplitudes (list of floats): The amplitude values of the signal. - time_stamps (list of floats): The time stamps corresponding to the amplitude values. Output: - The function checks suitability for Fourier Transform, if the signal does not comply a warning is raised. """ # Check if time stamps are in the same interval if not check_time_stamps_intervals(time_stamps) or check_stationary(amplitudes) or check_periodicity(amplitudes): warnings.warn( "WARNING: The time domain is not suitable for Fourier Transform. It is not stationary, periodic or time " "step are not constant.") return False return True
[docs] def check_time_domain_compatibility( time_domain_list: List[Dict[str, Union[str, List[float]]]], identifier: str = 'time domain data') -> None: """ Function to check if the time domain data are compatible by verifying if amplitude_type, amplitude_units, time_stamps_units, and time_stamps step size are the same. Input: - time_domain_list (list of dicts): A list of dictionaries, each containing the following keys and values: 'amplitude_type' as string, the type of the time domain data. 'amplitude_units' as string, the units of the amplitude values. 'time_stamps_units' as string, the units of the time stamps. 'time_stamps' as list of floats, the time stamps of the time domain. - identifier (str): The identifier of the time domain data. Default value is 'time domain data'. Output: - Check is executed and raises warning for any failing check. """ if not time_domain_list: raise ValueError( "ERROR: To execute the compatibility check for time domain data, the list with time domain data should be " "provided.") reference = time_domain_list[0] ref_amplitude_type = reference['amplitude_type'] ref_amplitude_units = reference['amplitude_units'] ref_time_stamps_units = reference['time_stamps_units'] ref_time_step = reference['time_stamps'][1] - reference['time_stamps'][0] for ts in time_domain_list[1:]: if ts['amplitude_type'] != ref_amplitude_type: warnings.warn(f"WARNING: Amplitude type for {identifier} are different.") if ts['amplitude_units'] != ref_amplitude_units: warnings.warn(f"WARNING: Amplitude units for {identifier} are different.") if ts['time_stamps_units'] != ref_time_stamps_units: warnings.warn(f"WARNING: Time stamp units for {identifier} are different.") if (ts['time_stamps'][1] - ts['time_stamps'][0]) != ref_time_step: warnings.warn(f"WARNING: Time stamp step size for {identifier} are different.")
### =================================================================================================================== ### 4. Function for manipulating time domain data ### ===================================================================================================================
[docs] def apply_window(amplitudes: List[float], window_type: Optional[str] = None, calibrate: bool = True) -> List[float]: """ Function to Apply a window to the signal and windowing compensating factor before performing FFT. Input: - amplitudes (list of float): The amplitude values of the signal. - window_type (str): The type of window to apply. Default value is None, no window is applied. - calibrate (str): Select to apply the windowing compensating factor. Default value is True. Output: - Returns the amplitude values of the signal after applying the window as list of floats. """ if not amplitudes: warnings.warn("WARNING: Applying a window to an empty signal is not possible.") return [] # Check if the window type is in the constant WINDOW_PARAMETERS dictionary if window_type is None or window_type in ['rectangular', 'Rectangular']: # No window applied return amplitudes if isinstance(window_type, str): window_type = window_type.lower() else: raise TypeError( f"ERROR: Input for the window type should be a string. Provided was {window_type} of type " f"{type(window_type)}.") if window_type not in WINDOW_PARAMETERS: valid_window_types = ', '.join(WINDOW_PARAMETERS.keys()) warnings.warn(f"WARNING: Invalid window type. Please choose from {valid_window_types}. No window applied.") return amplitudes # Return the windowing compensation factor based on the calibration type amplitude_correction_factor = 1 if calibrate: amplitude_correction_factor = WINDOW_PARAMETERS[window_type][0] # Apply the appropriate window window_function = WINDOW_PARAMETERS[window_type][2] if len(WINDOW_PARAMETERS[window_type]) == 4: window = window_function(len(amplitudes), **WINDOW_PARAMETERS[window_type][3]) else: window = window_function(len(amplitudes)) # Convert array of amplitudes for the applied window and correction factor return (np.array(amplitudes) * window * amplitude_correction_factor).tolist()
[docs] def apply_filter_to_time_domain_data( amplitudes: List[float], sampling_rate: float, highpass_limit: float = None, lowpass_limit: float = None, gpass: float = 1, gstop: float = 5) -> List[float]: """ Apply a Butterworth filter to a signal. Input: - amplitudes (list of float): List of amplitude values of the signal. - sampling_rate (float): The sampling rate of the signal, in [Hz]. - highpass_limit (float): The high-pass limit for the filter. If None, by default, no high pass filter is applied. - lowpass_limit (float): The low-pass limit for the filter. If None, by default, no low pass filter is applied. - gpass (float): The maximum loss in the passband, in [dB]. Default is 1 dB. - gstop (float): The minimum attenuation in the stopband, in [dB]. Default is 5 dB. Output: - Returns the filtered amplitudes as list of floats. """ if not amplitudes: warnings.warn("WARNING: Empty list of amplitudes provided. Returning an empty list.") return [] if np.any(np.isnan(amplitudes)): warnings.warn("WARNING: Input data contains NaNs.") return [] # Normalising factor n_f = 1 / (sampling_rate / 2) if highpass_limit and lowpass_limit: btype = 'band' wp = [highpass_limit * n_f, lowpass_limit * n_f] ws = [0.8 * highpass_limit * n_f, 1.2 * lowpass_limit * n_f] elif highpass_limit: btype = 'high' wp = highpass_limit * n_f ws = 0.8 * highpass_limit * n_f elif lowpass_limit: btype = 'low' wp = lowpass_limit * n_f ws = 1.2 * lowpass_limit * n_f else: raise ValueError("ERROR: Either highpass_limit or lowpass_limit must be provide to apply filter to signal.") # Determine the minimum filter order and critical frequencies order, wn = buttord(wp, ws, gpass, gstop) while order >= 2: try: # Set up the filter with the determined order and critical frequencies sos = butter(order, wn, btype=btype, output='sos') # Apply the filter to the signal filtered_amplitudes = sosfiltfilt(sos, np.array(amplitudes)) # Check for NaNs in filtered data if np.any(np.isnan(filtered_amplitudes)): raise ValueError("ERROR: Filtered data contains NaNs when applying filter to signal") return filtered_amplitudes.tolist() except ValueError as e: warnings.warn(f"ERROR: Filtered data contains NaNs with order {order}. Reducing order and retrying.") order -= 1 # If it fails with order 2, raise an error raise ValueError("ERROR: Filtered data contains NaNs even with order 2. Cannot apply filter.")
[docs] def fft_from_time_domain_data( amplitudes: List[float], sampling_rate: Union[float, int], window_type: Optional[str] = None, calibrate: bool = True) -> Tuple[np.ndarray, np.ndarray]: """ Function that applies windowing and respective calibration to the time domain data, then performs the full complex spectrum of a signal FFT on the corrected data with usage of the SciPy package. Input: - amplitudes (list of float): The amplitude values of the signal. - sampling_rate (float, int): Sample rate, in [Hz]. - window_type (str): The type of window to apply. Default value is None, no window is applied. - calibrate (str): Select to apply the windowing compensating factor. Default value is True. Output: - Returns tuple of numpy arrays, the FFT values and the corresponding frequencies. """ # Compute FFT ft = fft(apply_window(amplitudes=amplitudes, window_type=window_type, calibrate=calibrate)) # Compute the discrete FFT frequencies ft_freq = fftfreq(n=len(amplitudes), d=1 / sampling_rate) return ft, ft_freq
[docs] def rfft_from_time_domain_data( amplitudes: List[float], sampling_rate: Union[float, int], window_type: Optional[str] = None, calibrate: bool = True) -> Tuple[np.ndarray, np.ndarray]: """ Function that applies windowing and respective calibration to the time domain data, then performs the real-valued signal FFT on the corrected data with usage of the SciPy package. Input: - amplitudes (list of float): The amplitude values of the signal. - sampling_rate (float, int): Sampling rate, in [Hz]. - window_type (str): The type of window to apply. Default value is None, no window is applied. - calibrate (str): Select to apply the windowing compensating factor. Default value is True. Output: - Returns tuple of numpy arrays, the FFT values and the corresponding frequencies. """ # Compute RFFT ft = rfft(apply_window(amplitudes=amplitudes, window_type=window_type, calibrate=calibrate)) # Compute the discrete RFFT frequencies ft_freq = rfftfreq(len(amplitudes), d=1 / sampling_rate) return ft, ft_freq
[docs] def amplitude_spectrum_from_time_domain_data( amplitudes: List[float], sampling_rate: float, window_type: Optional[str] = None, calibrate: bool = True, normalise_length: bool = True) -> Tuple[List[float], List[float], List[float]]: """ This function performs FFT on the time domain data and calculate the corresponding phase angles. It uses the SciPy package for the FFT calculation. If the input amplitudes are all real numbers, it uses the more efficient real FFT computation. Input: - amplitudes (list of float): The amplitude values of the signal. - sampling_rate (float, int): Sample rate of the signal, in [Hz]. - window_type (str): The type of window to apply. Default value is None, no window is applied. - calibrate (str): Select to apply the windowing compensating factor. Default value is True. - normalise_length (bool): Select to normalise the amplitude spectrum by the signal length. Default value is True. Output: - Returns tuple containing three lists of floats: Amplitude spectrum, corresponding phase angles, in [deg] and corresponding frequencies, in [Hz]. """ # Check if all input amplitudes are real numbers if np.isreal(amplitudes).all(): # Compute FFT for real amplitude spectra values (more efficient) ft_amplitude, positive_ft_freq = rfft_from_time_domain_data( amplitudes=amplitudes, sampling_rate=sampling_rate, window_type=window_type, calibrate=calibrate) # Scale FFT output to preserve energy in single-sided spectrum N = len(amplitudes) if N % 2 == 0: # Even: don't scale DC (0) or Nyquist (N//2) ft_amplitude[1:N // 2] *= 2 else: # Odd: don't scale DC (0), but last bin is not Nyquist — it should be scaled ft_amplitude[1:(N + 1) // 2] *= 2 # Extract magnitude and phase real_positive_ft_amplitude = np.abs(ft_amplitude) phase_angles = np.angle(ft_amplitude, deg=True) else: warnings.warn( "WARNING: The input signal is complex. The conversion to a real single-sided spectrum may discard relevant " "information. Full handling of complex signals is not fully implemented.") # Compute FFT for full complex spectrum for amplitude spectra values ft_amplitude, ft_freq = fft_from_time_domain_data( amplitudes=amplitudes, sampling_rate=sampling_rate, window_type=window_type, calibrate=calibrate) # Convert FFT result to real positive-sided spectrum real_positive_ft_amplitude, phase_angles, positive_ft_freq = \ convert_double_sided_to_real_single_sided_spectrum(spectrum_values=ft_amplitude, frequencies=ft_freq) # Normalises by number of data points for correct scaling and consistent comparison with different signal lengths if normalise_length: amplitude_spectrum = real_positive_ft_amplitude / len(amplitudes) else: amplitude_spectrum = real_positive_ft_amplitude return amplitude_spectrum.tolist(), phase_angles.tolist(), positive_ft_freq.tolist()
### =================================================================================================================== ### 5. Benchmarks and Norms ### ===================================================================================================================
[docs] def calculate_progressive_effective_rms_slow( amplitude_data: List[float], time_stamps: List[float], tau: float = 0.125) -> List[float]: """ Calculate the progressive effective RMS value using the slow integration method. `Reference: <https://www.ritchievink.com/blog/2017/05/07/what-should-be-explained-in-the-dutch-sbr-b-guideline/>`_ Input: - amplitude_data (list of floats): The amplitude data. - time_stamps (list of floats): List of time points, in [s]. - tau (float): Time constant, in [s]. Default value 0.125s according SBR-B. Output: - Returns the calculated RMS values. """ time_stamps = np.array(time_stamps) amplitude_data = np.array(amplitude_data) time_step_delta = time_stamps[1] - time_stamps[0] squared_signal = np.square(amplitude_data) signal_length = len(amplitude_data) # Time array starting from 0 shifted_time = time_stamps - time_stamps[0] # Initialise result array v_eff = np.zeros(signal_length) # Calculate exponential weighing function and reverse it g_xi = np.exp(-shifted_time / tau) g_xi = g_xi[::-1] # Calculate RMS at each time step for i in range(signal_length): if i == 0: exp_wgh = 1 else: exp_wgh = g_xi[-i:] v_eff[i] = np.sqrt((1/tau) * np.trapezoid(exp_wgh * squared_signal[:i], dx=time_step_delta)) return v_eff.tolist()
[docs] def calculate_progressive_effective_rms_fast( amplitude_data: List[float], time_stamps: List[float], tau: float = 0.125) -> List[float]: """ Function to calculate the progressive effective RMS value using the fast difference equation method. Input: - amplitude_data (list of floats): The amplitude data. - time_stamps (list of floats): List of time points, in [s]. - tau (float): Time constant, in [s]. Default value 0.125s according SBR-B guideline. Output: - Returns the calculated RMS values. """ time_stamps = np.array(time_stamps) amplitude_data = np.array(amplitude_data) time_step_delta = time_stamps[1] - time_stamps[0] squared_signal = np.square(amplitude_data) signal_length = len(squared_signal) # Calculate alpha parameter for the exponential weighted moving average alpha = 1 - np.exp(-time_step_delta / tau) # Initialise arrays for the Exponential Weighted Moving Average and effective velocity ewma = np.zeros(signal_length) v_eff = np.zeros(signal_length) # Set initial values ewma[0] = squared_signal[0] v_eff[0] = np.sqrt(ewma[0]) # Apply difference equation for i in range(1, signal_length): ewma[i] = alpha * squared_signal[i] + (1 - alpha) * ewma[i - 1] v_eff[i] = np.sqrt(ewma[i]) return v_eff.tolist()
[docs] def frequency_weighting( frequency_data: np.ndarray, amplitudes: np.ndarray, amplitude_type: str = 'velocity', f0: float = 5.6, v0: float = 1.0) -> np.ndarray: """ Calculate frequency weighting factors and apply them to amplitudes at each respective frequency. Input: - frequency_data (np.ndarray): NumPy array of frequency values, in [Hz]. - amplitudes (np.ndarray): Numpy array of amplitude values. - amplitude_type (str): Type of amplitude data. Select from 'velocity' or 'acceleration'. Default value is 'velocity'. - f0 (float): Reference frequency, in [Hz]. Default value 5.6 Hz according SBR-B guideline. - v0 (float): Reference velocity, in [mm/s]. Default value 1.0 mm/s according SBR-B guideline. Output: - Returns the frequency weighted amplitudes. """ # Avoid division by zero for frequency = 0 frequency_data_safe = np.copy(frequency_data) frequency_data_safe[frequency_data_safe == 0] = np.finfo(float).eps if amplitude_type.lower() == 'velocity': # H(f) = 1/v0 * 1/sqrt(1+(f0/f)^2) weighting_factors = (1 / v0) * 1 / np.sqrt(1 + (f0 / frequency_data_safe) ** 2) elif amplitude_type.lower() == 'acceleration': # H(f) = 1/v0 * 1/(2*pi*f0) * 1/sqrt(1+(f0/f)^2) weighting_factors = (1 / v0) * 1 / (2 * np.pi * f0) * 1 / np.sqrt(1 + (f0 / frequency_data_safe) ** 2) else: raise ValueError("ERROR: The amplitude_type must be either 'velocity' or 'acceleration'.") # Apply weighting factors to amplitudes return amplitudes * weighting_factors
[docs] def calculate_sbr_b_rms( amplitudes: List[float], time_stamps: List[float], method: str = 'fast', tau: float = 0.125, f0: float = 5.6, v0: float = 1.0, amplitude_type: str = 'velocity', apply_frequency_weighting: bool = True, highpass_freq: float = 1.0, lowpass_freq: float = 80.0) -> List[float]: """ Calculate the RMS values from time domain data according SBR-B guideline. Input: - amplitudes (list of floats): The amplitude data - time_stamps (List[float]): List of time points in seconds - method (str): Method to use for calculation ('fast' or 'slow') - tau (float): Time constant in seconds (default: 0.125s for SBR-B) - f0 (float): Reference frequency, in [Hz]. Default value 5.6 Hz according SBR-B guideline. - v0 (float): Reference velocity, in [mm/s]. Default value 1.0 mm/s according SBR-B guideline. - amplitude_type (str): Type of amplitude data. Select from 'velocity' or 'acceleration'. Default value is 'velocity'. - apply_frequency_weighting (bool): Select to apply frequency weighting. Default value is True. - highpass_limit (float): The high-pass limit for the filter. By default, 1Hz is applied, according SBR-B guideline. - lowpass_limit (float): The low-pass limit for the filter. By default, 80Hz is applied, according SBR-B guideline. Output: - Returns list of the calculated RMS values. """ # Check suitability of the signal if not check_time_stamps_intervals(time_stamps=time_stamps): raise RuntimeError( "ERROR: The time steps in the time domain are not the same size. The RMS according SBR-B can't be " "calculated.") # Calculate time step delta time_step_delta = time_stamps[1] - time_stamps[0] # Apply bandpass filter filtered_signal = apply_filter_to_time_domain_data( amplitudes=amplitudes, sampling_rate=1 / time_step_delta, highpass_limit=highpass_freq, lowpass_limit=lowpass_freq) # Apply frequency weighting if requested if apply_frequency_weighting: # Perform FFT to get frequency domain representation frequencies = np.fft.rfftfreq(n=len(filtered_signal), d=time_step_delta) fft_data = np.fft.rfft(a=filtered_signal) # Apply frequency weighting weighted_fft = frequency_weighting( frequency_data=frequencies, amplitudes=fft_data, amplitude_type=amplitude_type, f0=f0, v0=v0) # Convert back to time domain filtered_signal = np.fft.irfft(a=weighted_fft, n=len(filtered_signal)) # Calculate the RMS using the specified method if method.lower() == 'fast': return calculate_progressive_effective_rms_fast( amplitude_data=filtered_signal, time_stamps=time_stamps, tau=tau) elif method.lower() == 'slow': return calculate_progressive_effective_rms_slow( amplitude_data=filtered_signal, time_stamps=time_stamps, tau=tau) raise ValueError("ERROR: Method must be either 'fast' or 'slow'.")
### =================================================================================================================== ### 6. End of script ### ===================================================================================================================