Source code for localpoly.base

import random
import numpy as np

from .utils.helpers import sort_values_by_X, create_partitions
from .utils.kernels import kernel_dict

RANDOM_STATE = 1


[docs]class LocalPolynomialRegression: """Local polynomial regression. LocalPolynomialRegression fits a polynomial of degree 3 in to the sourrounding of each point. The surrounding is realized by a kernel with bandwidth h. The regression returns the fit, as well as its first and second derivative. Parameters: X: X-values of data that is to be fitted (explanatory variable) y: y-values of data that is to be fitted (observations) h: bandwidth for the kernel gridsize: desired size of the fit (granularity) kernel_str: the name of the kernel as a string "gaussian" """ def __init__(self, X, y, h, kernel="gaussian", gridsize=100): self.X = X self.y = y self.h = h self.kernel_str = kernel self.kernel = kernel_dict[self.kernel_str] self.gridsize = gridsize
[docs] def localpoly(self, x): """Calculates estimate for position x via Local Polynomial Regression. The usage of Local Polynomial Regression allows to not only calculate the estimate, but also its first and second derivative in this point. Data (X, y) and regression settings (kernel, h) are saved in self. Args: x (float): Position for which to calculate the estimated value. Returns: dict: Results of regression. The estimated value for point x, its first and second derivative in this point and the weight vector of the influence of the surrounding points.:: {"fit": beta[0], "first": beta[1], "second": beta[2], "weight": W_hi} """ n = self.X.shape[0] K_i = 1 / self.h * self.kernel(x, self.X, self.h) f_i = 1 / n * sum(K_i) if f_i == 0: # doesnt really happen, but in order to avoid possible errors W_hi = np.zeros(n) else: W_hi = K_i / f_i X1 = np.ones(n) X2 = self.X - x X3 = X2 ** 2 X = np.array([X1, X2, X3]).T W = np.diag(W_hi) # (n,n) XTW = (X.T).dot(W) # (3,n) XTWX = XTW.dot(X) # (3,3) XTWy = XTW.dot(self.y) # (3,1) beta = np.linalg.pinv(XTWX).dot(XTWy) # (3,1) return {"fit": beta[0], "first": beta[1], "second": beta[2], "weight": W_hi}
[docs] def fit(self, prediction_interval): """Fit the Local Polynomial Regression model for the prediction interval. Args: prediction_interval (tuple): interval for which the prediction is calculated Returns: dict: Results of the fit. The estimated function (fit) in the prediction interval (X) and its first and second derivative:: { 'X' : X_domain, # prediction interval of fit 'fit': fit, # fit of the function at point x 'first': first, # first derivative at point x 'second': second, # second derivative at point x } """ X_min, X_max = prediction_interval X_domain = np.linspace(X_min, X_max, self.gridsize) fit = np.zeros(len(X_domain)) first = np.zeros(len(X_domain)) second = np.zeros(len(X_domain)) for i, x in enumerate(X_domain): results = self.localpoly(x) fit[i] = results["fit"] first[i] = results["first"] second[i] = results["second"] return {"X": X_domain, "fit": fit, "first": first, "second": second}
[docs]class LocalPolynomialRegressionCV(LocalPolynomialRegression): """Bandwidth Selection via Cross Validation for Local Polynomial Regression. LocalPolynomialRegressionCV performs the parameter optimization for LocalPolynomialRegression. The optimal Bandwidth highly depends on the data (X, y) and the kernel. Args: X (np.array): X-values of data that is to be fitted (explanatory variable) y (np.array): y-values of data that is to be fitted (observations) kernel (str, optional): Name of the kernel. Defaults to "gaussian". gridsize (int, optional): Desired size of the fit - granularity. Defaults to 100. n_sections (int, optional): Amount of sections to devide the dataset in cross validation (k-folds). Defaults to 10. loss (str, optional): Loss function for optimization. Defaults to "MSE". sampling (str, optional): Whether the dataset should be partitioned "random" or as "slicing". Defaults to "random". Attributes: prediction_interval: Interval in which to calculate the estimates, automatically set to (X.min(), X.max()) """ def __init__( self, X, y, kernel="gaussian", gridsize=100, n_sections=10, loss="MSE", sampling="random", ): self.n_sections = n_sections self.loss = loss self.sampling = sampling self.prediction_interval_ = (X.min(), X.max()) # invoking the __init__ of the parent class LocalPolynomialRegression.__init__(self, X=X, y=y, h=None, kernel=kernel, gridsize=gridsize)
[docs] def bandwidth_cv( self, coarse_list_of_bandwidths, ): """Cross Validation for Bandwidth optimization. The CV Routine is performed twice. First, for a ``coarse_list_of_bandwidths``, then on a finer grid which spans around the first optimal value, ``fine_list_of_bandwidths``. Args: coarse_list_of_bandwidths (list): coarse list of bandwidths, it is suggested to give values around the Silverman bandwidth Returns: dict: fine results and coarse results of bandwidth search:: { "fine results": { "bandwidths": fine_list_of_bandwidths, "MSE": # mean squared errors for bandwidths, "h": # optimal bandwidth within fine_list_of_bandwidths, }, "coarse results": { # ... same as above but with coarse_list_of_bandwidths }, } """ # 1) coarse parameter search coarse_results = self._bandwidth_cv_sampling(coarse_list_of_bandwidths) # 2) fine parameter search, around minimum of first search coarse_h = coarse_results["h"] stepsize = coarse_list_of_bandwidths[1] - coarse_list_of_bandwidths[0] fine_list_of_bandwidths = np.linspace(coarse_h - (stepsize * 1.1), coarse_h + (stepsize * 1.1), 10) fine_results = self._bandwidth_cv_sampling(fine_list_of_bandwidths) return { "fine results": fine_results, "coarse results": coarse_results, }
def _bandwidth_cv_sampling(self, list_of_bandwidths): """The actual CV. First, the data is sorted by X, which is important in case the sampling type "slicing" was selected. Then the partitions for the CV are created (slices or random). Finally the CV is performed. Args: list_of_bandwidths (list): list of bandwidths that are evaluated Returns: dict: Results of CV. List of bandwidth that were evaluated. MSE for each bandwidth. Optimal bandwidth "h". """ X_sorted, y_sorted = sort_values_by_X(self.X, self.y) X_sections = create_partitions(X_sorted, y_sorted, self.n_sections, self.sampling) max_comparisons_per_section = min(len(X_sections[0]), 30) num = len(list_of_bandwidths) mse_bw = np.zeros(num) # for each bandwidth have mse - loss function # for bandwidth h in list_of_bandwidths for b, h in enumerate(list_of_bandwidths): # take out chunks of our data and do leave-out-prediction runs, mse = 0, 0 for i, section in enumerate(X_sections): X_train, X_test = np.delete(X_sorted, section), X_sorted[section] y_train, y_test = np.delete(y_sorted, section), y_sorted[section] max_section_comparison_length = min(len(X_test), max_comparisons_per_section) model = LocalPolynomialRegression(X_train, y_train, h, self.kernel_str, self.gridsize) random.seed(RANDOM_STATE) # for random y in y_test, extimate y_hat and calculate mse for idx_test in random.sample(range(len(X_test)), max_section_comparison_length): y_hat = model.localpoly(X_test[idx_test])["fit"] y_true = y_test[idx_test] mse += (y_true - y_hat) ** 2 runs += 1 mse_bw[b] = 1 / runs * mse return { "bandwidths": list_of_bandwidths, "MSE": mse_bw, "h": list_of_bandwidths[mse_bw.argmin()], }