Source code for flexcode.core

import numpy as np

from .basis_functions import BasisCoefs, evaluate_basis
from .helpers import box_transform, make_grid
from .loss_functions import cde_loss
from .post_processing import *


[docs]class FlexCodeModel(object): def __init__( self, model, max_basis, basis_system="cosine", z_min=None, z_max=None, regression_params={}, custom_model=None, ): """Initialize FlexCodeModel object :param model: A FlexCodeRegression object :param max_basis: int, the maximal number of basis functions :param basis_system: string, the basis system: options are "cosine" :param z_min: float, the minimum z value; if None will default to the minimum of the training values :param z_max: float, the maximum z value; if None will default to the maximum of the training values :param regression_params: A dictionary of tuning parameters for the regression model :param custom_model: a scikit-learn-type model, i.e. with fit and predict method. """ self.max_basis = max_basis self.best_basis = range(max_basis) self.basis_system = basis_system self.model = model(max_basis, regression_params, custom_model) self.z_min = z_min self.z_max = z_max self.bump_threshold = None self.sharpen_alpha = None
[docs] def fit(self, x_train, z_train, weight=None): """Fits basis function regression models. :param x_train: a numpy matrix of training covariates. :param z_train: a numpy array of z values. :param weight: (optional) a numpy array of weights. :returns: None. :rtype: """ if len(x_train.shape) == 1: x_train = x_train.reshape(-1, 1) if len(z_train.shape) == 1: z_train = z_train.reshape(-1, 1) if self.z_min is None: self.z_min = min(z_train) if self.z_max is None: self.z_max = max(z_train) z_basis = evaluate_basis( box_transform(z_train, self.z_min, self.z_max), self.max_basis, self.basis_system ) self.model.fit(x_train, z_basis, weight)
[docs] def tune(self, x_validation, z_validation, bump_threshold_grid=None, sharpen_grid=None, n_grid=1000): """Set tuning parameters to minimize CDE loss Sets best_basis, bump_delta, and sharpen_alpha values attributes :param x_validation: a numpy matrix of covariates :param z_validation: a numpy array of z values :param bump_threshold_grid: an array of candidate bump threshold values :param sharpen_grid: an array of candidate sharpen parameter values :param n_grid: integer, the number of grid points to evaluate :returns: None :rtype: """ if len(x_validation.shape) == 1: x_validation = x_validation.reshape(-1, 1) if len(z_validation.shape) == 1: z_validation = z_validation.reshape(-1, 1) z_validation = box_transform(z_validation, self.z_min, self.z_max) z_basis = evaluate_basis(z_validation, self.max_basis, self.basis_system) coefs = self.model.predict(x_validation) term1 = np.mean(coefs**2, 0) term2 = np.mean(coefs * z_basis, 0) # losses = np.cumsum(term1 - 2 * term2) self.best_basis = np.where(term1 - 2 * term2 < 0.0)[0] if bump_threshold_grid is not None or sharpen_grid is not None: coefs = coefs[:, self.best_basis] z_grid = make_grid(n_grid, self.z_min, self.z_max) z_basis = evaluate_basis( box_transform(z_grid, self.z_min, self.z_max), max(self.best_basis) + 1, self.basis_system ) z_basis = z_basis[:, self.best_basis] cdes = np.matmul(coefs, z_basis.T) normalize(cdes) if bump_threshold_grid is not None: self.bump_threshold = choose_bump_threshold(cdes, z_grid, z_validation, bump_threshold_grid) remove_bumps(cdes, self.bump_threshold) normalize(cdes) if sharpen_grid is not None: self.sharpen_alpha = choose_sharpen(cdes, z_grid, z_validation, sharpen_grid)
[docs] def predict_coefs(self, x_new): if len(x_new.shape) == 1: x_new = x_new.reshape(-1, 1) coefs = self.model.predict(x_new)[:, self.best_basis] return BasisCoefs( coefs, self.basis_system, self.z_min, self.z_max, self.bump_threshold, self.sharpen_alpha )
[docs] def predict(self, x_new, n_grid): """Predict conditional density estimates on new data n :param x_new: A numpy matrix of covariates at which to predict :param n_grid: int, the number of grid points at which to predict the conditional density :returns: A numpy matrix where each row is a conditional density estimate at the grid points :rtype: numpy matrix """ if len(x_new.shape) == 1: x_new = x_new.reshape(-1, 1) z_grid = make_grid(n_grid, 0.0, 1.0) z_basis = evaluate_basis(z_grid, max(self.best_basis) + 1, self.basis_system) z_basis = z_basis[:, self.best_basis] coefs = self.model.predict(x_new)[:, self.best_basis] cdes = np.matmul(coefs, z_basis.T) # Post-process normalize(cdes) if self.bump_threshold is not None: remove_bumps(cdes, self.bump_threshold) if self.sharpen_alpha is not None: sharpen(cdes, self.sharpen_alpha) cdes /= self.z_max - self.z_min return cdes, make_grid(n_grid, self.z_min, self.z_max)
[docs] def estimate_error(self, x_test, z_test, n_grid=1000): """Estimates CDE loss on test data :param x_test: A numpy matrix of covariates :param z_test: A numpy matrix of z values :param n_grid: Number of grid points at which to predict the conditional density :returns: an estimate of the CDE loss :rtype: float """ if len(x_test.shape) == 1: x_test = x_test.reshape(-1, 1) if len(z_test.shape) == 1: z_test = z_test.reshape(-1, 1) cde_estimate, z_grid = self.predict(x_test, n_grid) return cde_loss(cde_estimate, z_grid, z_test)