Source code for autopwlf.autopwlf

"""
MIT License

Copyright (c) 2024 Nedeesha Weerasuriya

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

"""

__date__ = "2025-02-13"
__author__ = "NedeeshaWeerasuriya"
__version__ = "0.9.2"


from typing import Tuple

import numpy as np
import pwlf
from scipy.interpolate import interp1d
from scipy.signal import find_peaks, peak_prominences, savgol_filter
from sklearn.linear_model import ElasticNetCV


[docs]class AutoPWLF(object): """ Class to find the optimal number of breaks for a piecewise linear fit using the Bayesian Information Criterion (BIC) and then fit the piecewise linear function to the given data set """
[docs] def __init__( self, x_data: np.array, y_data: np.array, disp_res: bool = False, lapack_driver: str = "gelsd", degree: int = 1, weights: np.array = None, random_seed: int = None, smooth_polyorder: int = 0, peak_threshold: float = 0.25, prominence_threshold: float = 0.1, ): """ Initialize the AutoPWLF class to find the optimal number of breaks and fit a continuous piecewise linear function. Supply x and y values which will be used to fit the piecewise linear function to where y(x) = f(x). Args: x_data: independent variable y_data: dependent variable disp_res: display results lapack_driver: LAPACK driver to use for the linear least squares problem degree: degree of the polynomial to fit weights: weights for the least squares problem random_seed: random seed for reproducibility smooth_polyorder: polynomial order for the Savitzky-Golay filter peak_threshold: threshold for identifying peaks and valleys prominence_threshold: threshold for identifying significant peaks and valleys Attributes: stationary_points: number of stationary points in the data set y_smooth: smoothed y values optimal_breaks: optimal number of breaks outliers: array containing the outliers initial_outlier_model: initial piecewise linear model used for the outlier detection outlier_detection_pred: predicted values without the outliers outlier_detection_residuals: residuals without the outliers """ x, y = self._switch_to_np_array(x_data), self._switch_to_np_array(y_data) self.x, self.y = x, y self.disp_res = disp_res self.lapack_driver = lapack_driver self.degree = degree self.weights = weights self.random_seed = random_seed self.smooth_polyorder = smooth_polyorder self.peak_threshold = peak_threshold self.prominence_threshold = prominence_threshold if random_seed: np.random.seed(random_seed) # Initialise empty attributes self.stationary_points = None self.y_smooth = None self.optimal_breaks = None # Outliers attributes self.outliers = None self.initial_outlier_model = None self.outlier_detection_pred = None self.outlier_detection_residuals = None
@staticmethod def _switch_to_np_array(input_): r""" Check the input, if it's not a Numpy array transform it to one. Args: input_: input data Returns: input_: input data as a Numpy array """ if isinstance(input_, np.ndarray) is False: input_ = np.array(input_) return input_ @staticmethod def _filter_outliers( x: np.ndarray, y: np.ndarray, outliers: np.ndarray ) -> Tuple[np.ndarray, np.ndarray]: """ Remove outliers from the data. Args: x: Independent variable. y: Dependent variable. outliers: Indices of outliers to be removed. Returns: Tuple[np.ndarray, np.ndarray]: Filtered x and y data without outliers. """ mask = ~np.isin(np.arange(len(x)), outliers) return x[mask], y[mask] @staticmethod def _find_peaks_valleys( data, peak_threshold: float, prominence_threshold: float ) -> tuple: """ Find the peaks and valleys of the smoothed data set Args: data: data set peak_threshold: threshold for identifying peaks and valleys prominence_threshold: threshold for identifying significant peaks and valleys Returns: significant_peaks: peaks of the data set significant_valleys: valleys of the data set """ # Find maxima and minima of the smoothed linear interpolation peaks, _ = find_peaks(data, threshold=peak_threshold) valleys, _ = find_peaks(-data, threshold=peak_threshold) # Filter peaks and valleys by prominence peak_prominence = peak_prominences(data, peaks)[0] valley_prominence = peak_prominences(-data, valleys)[0] significant_peaks = peaks[peak_prominence > prominence_threshold] significant_valleys = valleys[valley_prominence > prominence_threshold] return significant_peaks, significant_valleys @staticmethod def _sav_gol_fit(x: np.array, y: np.array, poly_order: int) -> np.array: """ Apply the Savitzky-Golay filter to the data set to smooth the data Args: x: independent variable y: dependent variable poly_order: polynomial order for the filter Returns: np.array: Smoothed y values """ # Smoothing the data using Savitzky-Golay filter window_size = int(3 + len(x) / 50) # Making sure window size is odd if window_size % 2 == 0: window_size += 1 y_smooth = savgol_filter(y, window_size, poly_order) return y_smooth
[docs] def auto_fit( self, fitfast: bool = True, buffer: int = 2, complexity_penalty=20, outliers: bool = False, outlier_threshold: int = 5, ) -> pwlf.PiecewiseLinFit: """ Fit a piecewise linear function with automated number of breaks found from the stationary points Adding a buffer to the number of breaks to allow for more flexibility with the model chosen by the BIC Args: fitfast: if true, use the fast fitting method for the piecewise linear fit buffer: buffer for the number of breaks to allow for more flexibility complexity_penalty: complexity penalty for the BIC outliers: if true, remove outliers from the data outlier_threshold: threshold for identifying outliers: standard deviations from the mean residual Returns: pwlf.PiecewiseLinFit: PiecewiseLinFit model """ self.stationary_points = self.find_num_stationary_points() if outliers: # Initial fit to detect outliers init_breaks = max(1, self.stationary_points) _, init_pwlf = self.fit(self.x, self.y, init_breaks, init_breaks) self.outliers = self.find_outliers(init_pwlf, outlier_threshold) # Remove outliers from the data x_filtered, y_filtered = self._filter_outliers( self.x, self.y, self.outliers ) else: x_filtered, y_filtered = self.x, self.y min_breaks = self.stationary_points - buffer max_breaks = self.stationary_points + buffer optimal_breaks, my_pwlf = self.fit( x_filtered, y_filtered, min_breaks, max_breaks, complexity_penalty=complexity_penalty, fitfast=fitfast, ) self.optimal_breaks = optimal_breaks return my_pwlf
[docs] def find_num_stationary_points(self) -> int: """ Find the number of stationary points in the data set to use as min and max breaks First fit a smoothened interpolation function on the data Then find the number of peaks and valleys in the data Returns: int: Number of stationary points """ y_smooth = self._sav_gol_fit(self.x, self.y, poly_order=self.smooth_polyorder) # Linear interpolation for quicker runtime f = interp1d(self.x, y_smooth, kind="linear") xnew = np.linspace(self.x.min(), self.x.max(), num=100) ynew = f(xnew) # Find maxima and minima of the smoothened linear interpolation peaks, valleys = self._find_peaks_valleys( ynew, self.peak_threshold, self.prominence_threshold ) stationary_points = ( len(peaks) + len(valleys) ) // 2 + 2 # Adding 2 to account for the end points return stationary_points
[docs] def fit( self, x: np.array, y: np.array, min_breaks: int, max_breaks: int = None, complexity_penalty: int = 20, fitfast: bool = True, ) -> tuple: """ Finds the optimal number of breaks for a piecewise linear fit using the Bayesian Information Criterion (BIC) and then plots the piecewise linear fit of the given data set Args: x: Independent variable. y: Dependent variable. min_breaks: minimum number of breaks max_breaks: maximum number of breaks complexity_penalty: complexity penalty for the BIC fitfast: if true, use the fast fitting method for the piecewise linear fit Returns: tuple: Optimal number of breaks, PiecewiseLinFit model """ if not max_breaks: max_breaks = min_breaks if min_breaks < 1: min_breaks = 1 # BIC calculation bic_values = [] pwlf_list = [] n = len(y) for breaks in range(min_breaks, max_breaks + 1): my_pwlf = pwlf.PiecewiseLinFit( x, y, seed=self.random_seed, degree=self.degree, weights=self.weights, lapack_driver=self.lapack_driver, ) if fitfast: my_pwlf.fitfast(breaks) else: my_pwlf.fit(breaks) # calculate the residual sum of squares yhat = my_pwlf.predict(x) rss = np.sum((y - yhat) ** 2) # calculate the BIC bic = n * np.log(rss / n) + complexity_penalty * breaks * np.log(n) bic_values.append(bic) pwlf_list.append(my_pwlf) optimal_index = np.argmin(bic_values) self.optimal_breaks = min_breaks + optimal_index return self.optimal_breaks, pwlf_list[optimal_index]
[docs] def find_outliers(self, pwlf_model: pwlf.PiecewiseLinFit, outlier_threshold: int): """ This function adjusts the outliers by fitting a new piecewise linear model without the outliers Args: pwlf_model: PiecewiseLinFit model outlier_threshold: threshold for identifying outliers: standard deviations from the mean residual Returns: outliers: array containing the outliers """ # Remove the outliers from the data fit_breaks = pwlf_model.fit_breaks A = pwlf_model.assemble_regression_matrix(fit_breaks, self.x) en_model = ElasticNetCV( cv=5, l1_ratio=[0.1, 0.5, 0.7, 0.9, 0.95, 1], fit_intercept=False, max_iter=1000, n_jobs=-1, ) en_model.fit(A, self.y) # Calculate the residuals y_hat = pwlf_model.predict(self.x) residuals = self.y - y_hat # Store attributes for outlier detection self.initial_outlier_model = pwlf_model self.outlier_detection_pred = y_hat self.outlier_detection_residuals = residuals # Identify outliers as points that are more than 'outlier_threshold' standard deviations from the mean residual outlier_mask = np.abs( residuals - np.mean(residuals) ) > outlier_threshold * np.std(residuals) outliers = self.x[outlier_mask] return outliers