Module riskmodels.univariate

This module contains univariate models for risk analysis, namely, empirical (discrete) distributions and semiparametric distributions with Generalised Pareto tail models. Both MLE-based and Bayesian estimation are supported for the GP models; useful diagnostic plots are available for fitted models, and exceedance conditional distributions of the type X | X > u for a fitted model X are implemented through the >= and > operators for all model classes, as well as scalar addition and (positive) rescaling, Finally, Binned distributions with integer support are available and can be convolved to get the distribution of a sum of independent integer random variables. This is useful for risk models in energy procurement.

Expand source code
"""
This module contains univariate models for risk analysis, namely, empirical (discrete) distributions and semiparametric distributions with Generalised Pareto tail models. Both MLE-based and Bayesian estimation are supported for the GP models; useful diagnostic plots are available for fitted models, and exceedance conditional distributions of the type X | X > u for a fitted model X are implemented through the >= and > operators for all model classes, as well as scalar addition and (positive) rescaling, Finally, Binned distributions with integer support are available and can be convolved to get the distribution of a sum of independent integer random variables. This is useful for risk models in energy procurement.
"""
from __future__ import annotations

import logging
import time
import typing as t
import traceback
import warnings
from argparse import Namespace
from abc import ABC, abstractmethod
from multiprocessing import Pool
import copy

import pandas as pd
import scipy as sp

import matplotlib.pyplot as plt
from matplotlib.colors import Normalize
from matplotlib import cm
import matplotlib

import numpy as np
import emcee

from scipy.stats import genpareto as gpdist
from scipy.optimize import LinearConstraint, minimize, root_scalar
from scipy.stats import gaussian_kde as kde
from scipy.signal import fftconvolve


from pydantic import BaseModel, ValidationError, validator, PositiveFloat
from functools import reduce

import statsmodels.distributions.empirical_distribution as ed

# class BaseWrapper(BaseModel):

#   class Config:
#     arbitrary_types_allowed = True


class BaseDistribution(BaseModel):

    """Base interface for available data model types"""

    _allowed_scalar_types = (int, float, np.int64, np.int32, np.float32, np.float64)
    _figure_color_palette = ["tab:cyan", "deeppink"]
    _error_tol = 1e-6

    def __repr__(self):
        return "Base distribution object"

    def __str__(self):
        return self.__repr__()

    class Config:
        arbitrary_types_allowed = True

    @abstractmethod
    def simulate(self, size: int) -> np.ndarray:
        """Produces simulated values from model

        Args:
            n (int): Number of samples
        """
        pass

    @abstractmethod
    def moment(self, n: int) -> float:
        """Calculates non-centered moments

        Args:
            n (int): moment order
        """
        pass

    @abstractmethod
    def ppf(self, q: float) -> float:
        """Calculates the corresponding quantile for a given probability value

        Args:
            q (float): probability level
        """
        pass

    def mean(self, **kwargs) -> float:
        """Calculates the expected value

        Args:
            **kwargs: Additional arguments passed to `moments`. This is needed for Bayesian model instances in which a `return_all` parameter can be passed.

        Returns:
            float: mean value

        """
        return self.moment(1, **kwargs)

    def std(self, **kwargs) -> float:
        """Calculates the standard deviation

        Args:
            **kwargs: Additional arguments passed to `moments`. This is needed for Bayesian model instances in which a `return_all` parameter can be passed.

        Returns:
            float: standard deviation value

        """
        return np.sqrt(self.moment(2, **kwargs) - self.mean(**kwargs) ** 2)

    @abstractmethod
    def cdf(self, x: float) -> float:
        """Evaluates the cumulative probability function

        Args:
            x (float): a point in the support
        """
        pass

    @abstractmethod
    def pdf(self, x: float) -> float:
        """Calculates the probability mass or probability density function

        Args:
            x (float): a point in the support
        """
        pass

    def histogram(self, size: int = 1000) -> None:
        """Plots a histogram of a simulated sample

        Args:
            size (int, optional): sample size

        """

        # show histogram from 1k samples
        samples = self.simulate(size=size)
        plt.hist(
            samples, bins=25, edgecolor="white", color=self._figure_color_palette[0]
        )
        plt.title(f"Histogram from {np.round(size/1000,1)}K simulated samples")
        plt.show()

    def plot(self, size: int = 1000) -> None:
        """Plots a histogram of a simulated sample

        Args:
            size (int, optional): sample size

        """
        self.histogram(size)

    def cvar(self, p: float, **kwargs) -> float:
        """Calculates conditional value at risk for a probability level p, defined as the mean conditioned to an exceedance above the p-quantile.

        Args:
            p (float): Description
            **kwargs: Additional arguments passed to `moments`. This is needed for Bayesian model instances in which a `return_all` parameter can be passed.

        Returns:
            float: conditional value at risk

        Raises:
            ValueError: Description

        """
        if not isinstance(p, float) or p < 0 or p >= 1:
            raise ValueError("p must be in the open interval (0,1)")

        return (self >= self.ppf(p)).mean(**kwargs)

    @abstractmethod
    def __gt__(self, other: float) -> BaseDistribution:
        pass

    @abstractmethod
    def __ge__(self, other: float) -> BaseDistribution:
        pass

    @abstractmethod
    def __add__(self, other: self._allowed_scalar_types):
        pass

    @abstractmethod
    def __mul__(self, other: self._allowed_scalar_types):
        pass

    def __sub__(self, other: float):

        if not isinstance(other, self._allowed_scalar_types):
            raise TypeError(
                f"- is implemented for instances of types: {self._allowed_scalar_types}"
            )

        return self.__add__(-other)

    def __rmul__(self, other):
        return self.__mul__(other)

    def __radd__(self, other):
        return self.__add__(other)

    def __rsub__(self, other):
        return self.__sub__(other)

    # __radd__ = __add__

    # __rmul__ = __mul__

    # __rsub__ = __sub__


class Mixture(BaseDistribution):

    """This class represents a probability distribution given by a mixture of weighted continuous and empirical densities; as the base continuous densities can only be of class GPTail, this class is intended to represent either a semiparametric model with a Generalised Pareto tail, or the convolution of such a model with an integer distribution, as is the case for the power surplus distribution in power system reliability modeling.

    Args:
        distributions (t.List[BaseDistribution]): list of distributions that make up the mixture
        weights (np.ndarray): weights for each of the distribution. The weights must be a distribution themselves
    """

    distributions: t.List[BaseDistribution]
    weights: np.ndarray

    def __repr__(self):
        return f"Mixture with {len(self.weights)} components"

    @validator("weights", allow_reuse=True)
    def check_weigths(cls, weights):
        if not np.isclose(np.sum(weights), 1, atol=cls._error_tol):
            raise ValidationError(f"Weights don't sum 1 (sum = {np.sum(weights)})")
        elif np.any(weights <= 0):
            raise ValidationError("Negative or null weights are present")
        else:
            return weights

    def __mul__(self, factor: float):

        return Mixture(
            weights=self.weights,
            distributions=[factor * dist for dist in self.distributions],
        )

    def __add__(self, factor: float):

        return Mixture(
            weights=self.weights,
            distributions=[factor + dist for dist in self.distributions],
        )

    def __ge__(self, other: float):

        if not isinstance(other, self._allowed_scalar_types):
            raise TypeError(
                f">= is implemented for instances of types : {self._allowed_scalar_types}"
            )

        if self.cdf(other) == 1:
            raise ValueError("There is no probability mass above provided threshold")

        cond_weights = np.array(
            [
                1 - dist.cdf(other) + (isinstance(dist, Empirical)) * dist.pdf(other)
                for dist in self.distributions
            ]
        )
        new_weights = cond_weights * self.weights

        indices = (new_weights > 0).nonzero()[0]

        nz_weights = new_weights[indices]
        nz_dists = [self.distributions[i] for i in indices]

        return Mixture(
            weights=nz_weights / np.sum(nz_weights),
            distributions=[dist >= other for dist in nz_dists],
        )

    def __gt__(self, other: float):

        if not isinstance(other, self._allowed_scalar_types):
            raise TypeError(
                f"> is implemented for instances of types : {self._allowed_scalar_types}"
            )

        if self.cdf(other) == 1:
            raise ValueError("There is no probability mass above provided threshold")

        cond_weights = np.array(
            [
                1 - dist.cdf(other) + (isinstance(dist, Empirical)) * dist.pdf(other)
                for dist in self.distributions
            ]
        )

        new_weights = cond_weights * self.weights

        indices = (new_weights > 0).nonzero()[0]

        nz_weights = new_weights[indices]
        nz_dists = [self.distributions[i] for i in indices]

        return Mixture(
            weights=nz_weights / np.sum(nz_weights),
            distributions=[dist > other for dist in nz_dists],
        )

        # index = self.support > other

        # return type(self)(
        #   self.support[index],
        #   self.pdf_values[index]/np.sum(self.pdf_values[index]),
        #   self.data[self.data > other])

    def simulate(self, size: int) -> np.ndarray:
        """Simulate values from mixture distribution

        Args:
            size (int): Sample size

        Returns:
            np.ndarray: simulated sample
        """
        n_samples = np.random.multinomial(n=size, pvals=self.weights, size=1)[0]
        indices = (n_samples > 0).nonzero()[0]
        samples = [
            dist.simulate(size=k)
            for dist, k in zip(
                [self.distributions[k] for k in indices], n_samples[indices]
            )
        ]
        samples = np.concatenate(samples, axis=0)
        np.random.shuffle(samples)
        return samples

    def cdf(
        self, x: t.Union[float, np.ndarray], **kwargs
    ) -> t.Union[float, np.ndarray]:
        """Evaluate Mixture's CDF function

        Args:
            x (t.Union[float, np.ndarray]): point at which to evaluate CDF
            **kwargs: Additional arguments passed to individual mixture component's CDF function

        Returns:
            t.Union[float, np.ndarray]: CDF value
        """

        # the use of a list and a reduce is needed because mixture components might return scalars or vectors depending on their class and on the passed kwargs.
        vals = [
            w * dist.cdf(x, **kwargs)
            for w, dist in zip(self.weights, self.distributions)
        ]
        return reduce(lambda x, y: x + y, vals)

    def pdf(
        self, x: t.Union[float, np.ndarray], **kwargs
    ) -> t.Union[float, np.ndarray]:
        """Evaluate Mixture's pdf function

        Args:
            x (t.Union[float, np.ndarray]): point at which to evaluate CDF
            **kwargs: Additional arguments passed to individual mixture component's pdf function

        Returns:
            t.Union[float, np.ndarray]: pdf value
        """

        # the use of a list and a reduce is needed because mixture components might return scalars or vectors depending on their class and on the passed kwargs.
        vals = [
            w * dist.pdf(x, **kwargs)
            for w, dist in zip(self.weights, self.distributions)
        ]
        return reduce(lambda x, y: x + y, vals)

    def ppf(
        self, q: t.Union[float, np.ndarray], **kwargs
    ) -> t.Union[float, np.ndarray]:
        """Evaluate Mixture's quantile function function

        Args:
            x (t.Union[float, np.ndarray]): point at which to evaluate quantile function
            **kwargs: Additional arguments passed to individual mixture component's quantile function

        Returns:
            t.Union[float, np.ndarray]: quantile value
        """

        if isinstance(q, np.ndarray):
            return np.array([self.ppf(elem, **kwargs) for elem in q])

        def target_function(x):
            return self.cdf(x) - q

        vals = [
            w * dist.ppf(q, **kwargs)
            for w, dist in zip(self.weights, self.distributions)
        ]
        x0 = reduce(lambda x, y: x + y, vals)

        # the use of a list and a reduce is needed because mixture components might return scalars or vectors depending on their class and on the passed kwargs.
        vals = [
            w * dist.ppf(0.5 + q / 2, **kwargs)
            for w, dist in zip(self.weights, self.distributions)
        ]
        x1 = reduce(lambda x, y: x + y, vals)

        return root_scalar(target_function, x0=x0, x1=x1, method="secant").root

    def moment(self, n: int, **kwargs) -> float:
        """Evaluate Mixture's n-th moment

        Args:
            x (t.Union[float, np.ndarray]): Moment order
            **kwargs: Additional arguments passed to individual mixture components' moment function

        Returns:
            t.Union[float, np.ndarray]: moment value
        """

        # the use of a list and a reduce is needed because mixture components might return scalars or vectors depending on their class and on the passed kwargs.
        vals = [
            w * dist.moment(n, **kwargs)
            for w, dist in zip(self.weights, self.distributions)
        ]
        return reduce(lambda x, y: x + y, vals)

    def mean(self, **kwargs) -> float:
        """Evaluate Mixture's mean

        Args:
            **kwargs: Additional arguments passed to individual mixture components' mean function

        Returns:
            float: mean value
        """

        # the use of a list and a reduce is needed because mixture components might return scalars or vectors depending on their class and on the passed kwargs.
        vals = [
            w * dist.mean(**kwargs) for w, dist in zip(self.weights, self.distributions)
        ]
        return reduce(lambda x, y: x + y, vals)

    def std(self, **kwargs) -> float:
        """Evaluate Mixture's standard deviation

        Args:
            **kwargs: Additional arguments passed to individual mixture components' standard deviation function

        Returns:
            float: standard deviation value
        """

        # the use of a list and a reduce is needed because mixture components might return scalars or vectors depending on their class and on the passed kwargs.
        vals = [
            w * (dist.std(**kwargs) ** 2 + dist.mean(**kwargs) ** 2)
            for w, dist in zip(self.weights, self.distributions)
        ]
        return np.sqrt(reduce(lambda x, y: x + y, vals) - self.mean() ** 2)


class GPTail(BaseDistribution):
    """Representation of a fitted Generalized Pareto distribution as an exceedance model. It's density is given by

    $$ f(x) = \\frac{1}{\\sigma} \\left( 1 + \\xi \\left( \\frac{x - \\mu}{\\sigma} \\right) \\right)_{+}^{-(1 + 1/\\xi)} $$

    where \\( \\mu, \\sigma, \\xi \\) are the location, scale and shape parameters, and \\( (\\cdot)_{+} = \\max(\\cdot, 0)\\). The location parameter is also the lower endpoint (or threshold) of the distribution.

    Args:
        threshold (float): modeling threshold
        shape (float): fitted shape parameter
        scale (float): fitted scale parameter
        data (np.array, optional): exceedance data
    """

    threshold: float
    shape: float
    scale: PositiveFloat
    data: t.Optional[np.ndarray]

    def __repr__(self):
        return f"Generalised Pareto tail model with (mu, scale, shape) = ({self.threshold},{self.scale},{self.shape}) components"

    @property
    def endpoint(self):
        return self.threshold - self.scale / self.shape if self.shape < 0 else np.Inf

    @property
    def model(self):
        return gpdist(loc=self.threshold, c=self.shape, scale=self.scale)

    # @classmethod
    # def fit(cls, data: np.array, threshold: float) -> GPTail:
    #   exceedances = data[data > threshold]
    #   shape, _, scale = gpdist.fit(exceedances, floc=threshold)
    #   return cls(
    #     threshold = threshold,
    #     shape = shape,
    #     scale = scale,
    #     data = exceedances)

    def __add__(self, other: float):

        if not isinstance(other, self._allowed_scalar_types):
            raise TypeError(
                f"+ is implemented for instances of types: {self._allowed_scalar_types}"
            )

        return GPTail(
            threshold=self.threshold + other,
            shape=self.shape,
            scale=self.scale,
            data=self.data + other if self.data is not None else None,
        )

    def __ge__(self, other: float):

        if not isinstance(other, self._allowed_scalar_types):
            raise TypeError(
                f">= and > are implemented for instances of types: {self._allowed_scalar_types}"
            )

        if other >= self.endpoint:
            raise ValueError(
                f"No probability mass above endpoint ({self.endpoint}); conditional distribution X | X >= {other} does not exist"
            )

        if other < self.threshold:
            return self >= self.threshold
        else:
            # condition on empirical data if applicable
            if self.data is None or max(self.data) < other:
                new_data = None
                warnings.warn(
                    f"No observed data above {other}; setting data to None in conditional model.",
                    stacklevel=2,
                )
            else:
                new_data = self.data[self.data >= other]

            return GPTail(
                threshold=other,
                shape=self.shape,
                scale=self.scale + self.shape * (other - self.threshold),
                data=new_data,
            )

    def __gt__(self, other: float) -> GPTail:

        return self.__ge__(other)

    def __mul__(self, other: float) -> GPTail:

        if not isinstance(other, self._allowed_scalar_types) or other <= 0:
            raise TypeError(
                f"* is implemented for positive instances of: {self._allowed_scalar_types}"
            )

        new_data = other * self.data if self.data is not None else None

        return GPTail(
            threshold=other * self.threshold,
            shape=self.shape,
            scale=other * self.scale,
            data=new_data,
        )

    def simulate(self, size: int) -> np.ndarray:
        return self.model.rvs(size=size)

    def moment(self, n: int, **kwargs) -> float:
        return self.model.moment(n)

    def ppf(
        self, q: t.Union[float, np.ndarray], **kwargs
    ) -> t.Union[float, np.ndarray]:
        return self.model.ppf(q)

    def cdf(
        self, x: t.Union[float, np.ndarray], **kwargs
    ) -> t.Union[float, np.ndarray]:
        return self.model.cdf(x)

    def pdf(
        self, x: t.Union[float, np.ndarray], **kwargs
    ) -> t.Union[float, np.ndarray]:
        return self.model.pdf(x)

    def std(self, **kwargs):
        return self.model.std()

    def mle_cov(self) -> np.ndarray:
        """Returns the estimated parameter covariance matrix evaluated at the fitted parameters

        Returns:
            np.ndarray: Covariance matrix
        """

        if self.data is None:
            raise ValueError(
                "exceedance data not provided for this instance of GPTail; covariance matrix can't be estimated"
            )
        else:
            hess = self.loglik_hessian(
                [self.scale, self.shape], threshold=self.threshold, data=self.data
            )
            return np.linalg.inv(-hess)

    @classmethod
    def loglik(cls, params: t.List[float], threshold: float, data: np.ndarray) -> float:
        """Returns the negative log-likelihood for a Generalised Pareto model

        Args:
            *params (t.List[float]): Vector parameter with scale and shape values, in that order
            threshold (float): model threshold
            data (np.ndarray): exceedance data

        Returns:
            float
        """
        scale, shape = params
        return np.sum(gpdist.logpdf(data, loc=threshold, c=shape, scale=scale))

    @classmethod
    def loglik_grad(
        cls, params: t.List[float], threshold: float, data: np.ndarray
    ) -> np.ndarray:
        """Returns the gradient of the negative log-likelihood for a Generalised Pareto model

        Args:
            *params (t.List[float]): Vector parameter with scale and shape values, in that order
            threshold (float): model threshold
            data (np.ndarray): exceedance data

        Returns:
            np.ndarray: gradient array
        """
        scale, shape = params
        y = data - threshold

        if not np.isclose(shape, 0, atol=cls._error_tol):
            grad_scale = np.sum((y - scale) / (scale * (y * shape + scale)))
            grad_shape = np.sum(
                -((y * (1 + shape)) / (shape * (y * shape + scale)))
                + np.log(1 + (y * shape) / scale) / shape**2
            )
        else:
            grad_scale = np.sum((y - scale) / scale**2)
            grad_shape = np.sum(y * (y - 2 * scale) / (2 * scale**2))

        return np.array([grad_scale, grad_shape])

    @classmethod
    def loglik_hessian(
        cls, params: t.List[float], threshold: float, data: np.ndarray
    ) -> np.ndarray:
        """Returns the Hessian matrix of the negative log-likelihood for a Generalised Pareto model

        Args:
            *params (t.List[float]): Vector parameter with scale and shape values, in that order
            threshold (float): model threshold
            data (np.ndarray): exceedance data above the threshold

        Returns:
            np.ndarray: hessian matrix array
        """
        scale, shape = params
        y = data - threshold

        if not np.isclose(shape, 0, atol=cls._error_tol):
            d2scale = np.sum(
                -(1 / (shape * scale**2))
                + (1 + shape) / (shape * (y * shape + scale) ** 2)
            )

            # d2shape = (y (3 y ξ + y ξ^2 + 2 σ))/(ξ^2 (y ξ + σ)^2) - (2 Log[1 + (y ξ)/σ])/ξ^3
            d2shape = np.sum(
                (y * (3 * y * shape + y * shape**2 + 2 * scale))
                / (shape**2 * (y * shape + scale) ** 2)
                - (2 * np.log(1 + (y * shape) / scale)) / shape**3
            )

            # dscale_dshape = (y (-y + σ))/(σ (y ξ + σ)^2)
            dscale_dshape = np.sum(
                (y * (-y + scale)) / (scale * (y * shape + scale) ** 2)
            )
        else:
            d2scale = np.sum((scale - 2 * y) / scale**3)
            dscale_dshape = np.sum(-y * (y - scale) / scale**3)
            d2shape = np.sum(y**2 * (3 * scale - 2 * y) / (3 * scale**3))

        hessian = np.array([[d2scale, dscale_dshape], [dscale_dshape, d2shape]])

        return hessian

    @classmethod
    def logreg(cls, params: t.List[float]) -> float:
        """The regularisation terms are inspired in uninformative and zero-mean Gaussian priors for the scale and shape respectively, thus it is given by

        $$r(\\sigma, \\xi) = 0.5 \\cdot \\log(\\sigma) - 0.5 \\xi^2$$

        Args:
            params (t.List[float]): scale and shape parameters in that order

        Returns:
            float: Description
        """
        scale, shape = params

        return 0.5 * (np.log(scale) + shape**2)

    @classmethod
    def logreg_grad(cls, params: t.List[float]) -> float:
        """Returns the gradient of the regularisation term

        Args:
            params (t.List[float]): scale and shape parameters in that order

        """
        scale, shape = params
        return 0.5 * np.array([1.0 / scale, 2 * shape])

    @classmethod
    def logreg_hessian(cls, params: t.List[float]) -> float:
        """Returns the Hessian of the regularisation term

        Args:
            params (t.List[float]): scale and shape parameters in that order

        """
        scale, shape = params
        return 0.5 * np.array([-1.0 / scale**2, 0, 0, 2]).reshape((2, 2))

    @classmethod
    def loss(
        cls, params: t.List[float], threshold: float, data: np.ndarray
    ) -> np.ndarray:
        """Calculate the loss function on the provided parameters; this is the sum of the (negative) data log-likelihood and (negative) regularisation terms for the scale and shape. Everything is divided by the number of data points.

        Args:
            params (t.List[float]): Description
            threshold (float): Model threshold; eequivalently, location parameter
            data (np.ndarray): exceedance data above threshold

        """
        scale, shape = params
        n = len(data)
        unnorm = cls.loglik(params, threshold, data) + cls.logreg(params)
        return -unnorm / n

    @classmethod
    def loss_grad(
        cls, params: t.List[float], threshold: float, data: np.ndarray
    ) -> np.ndarray:
        """Calculate the loss function's gradient on the provided parameters

        Args:
            params (t.List[float]): Description
            threshold (float): Model threshold; eequivalently, location parameter
            data (np.ndarray): exceedance data above threshold

        """
        n = len(data)
        return -(cls.loglik_grad(params, threshold, data) + cls.logreg_grad(params)) / n

    @classmethod
    def loss_hessian(
        cls, params: t.List[float], threshold: float, data: np.ndarray
    ) -> np.ndarray:
        """Calculate the loss function's Hessian on the provided parameters

        Args:
            params (t.List[float]): Description
            threshold (float): Model threshold; eequivalently, location parameter
            data (np.ndarray): exceedance data above threshold

        """
        n = len(data)
        return (
            -(cls.loglik_hessian(params, threshold, data) + cls.logreg_hessian(params))
            / n
        )

    @classmethod
    def fit(
        cls,
        data: np.ndarray,
        threshold: float,
        x0: np.ndarray = None,
        return_opt_results=False,
    ) -> t.Union[GPTail, sp.optimize.OptimizeResult]:
        """Fits a eneralised Pareto tail model using a constrained trust region method

        Args:
            data (np.ndarray): exceedance data above threshold
            threshold (float): Model threshold
            x0 (np.ndarray, optional): Initial guess for optimization; if None, the result of scipy.stats.genpareto.fit is used as a starting point.
            return_opt_results (bool, optional): If True, return the OptimizeResult object; otherwise return fitted instance of GPTail

        Returns:
            t.Union[GPTail, sp.optimize.OptimizeResult]: Description
        """
        exceedances = data[data > threshold]

        # rescale exceedances and threshold so that both parameters are in roughly the same scale, improving numerical conditioning
        sdev = np.std(exceedances)

        # rescaling the data rescales the location and scale parameter, and leaves the shape parameter unchanged
        norm_exceedances = exceedances / sdev
        norm_threshold = threshold / sdev

        norm_max = max(norm_exceedances)

        constraints = LinearConstraint(
            A=np.array([[1 / (norm_max - norm_threshold), 1], [1, 0]]),
            lb=np.zeros((2,)),
            ub=np.Inf,
        )

        if x0 is None:
            # use default scipy fitter to get initial estimate
            # this is almost always good enough
            shape, _, scale = gpdist.fit(norm_exceedances, floc=norm_threshold)
            x0 = np.array([scale, shape])

        loss_func = lambda params: GPTail.loss(params, norm_threshold, norm_exceedances)
        loss_grad = lambda params: GPTail.loss_grad(
            params, norm_threshold, norm_exceedances
        )
        loss_hessian = lambda params: GPTail.loss_hessian(
            params, norm_threshold, norm_exceedances
        )

        res = minimize(
            fun=loss_func,
            x0=x0,
            method="trust-constr",
            jac=loss_grad,
            hess=loss_hessian,
            constraints=[constraints],
        )

        # print(res)
        if return_opt_results:
            warnings.warn(
                "Returning raw results for rescaled exceedance data (sdev ~ 1)."
            )
            return res
        else:
            scale, shape = list(res.x)

            return cls(
                threshold=sdev * norm_threshold,
                scale=sdev * scale,
                shape=shape,
                data=sdev * norm_exceedances,
            )

    def plot_diagnostics(self) -> None:
        """Returns a figure with fit diagnostic for the GP model"""
        if self.data is None:
            raise ValueError("Exceedance data was not provided for this model.")

        fig, axs = plt.subplots(3, 2)

        #################### Profile log-likelihood

        # set profile intervals based on MLE variance
        mle_cov = self.mle_cov()
        scale_bounds, shape_bounds = (
            self.scale - np.sqrt(mle_cov[0, 0]),
            self.scale + np.sqrt(mle_cov[0, 0]),
        ), (self.shape - np.sqrt(mle_cov[1, 1]), self.shape + np.sqrt(mle_cov[1, 1]))

        # set profile grids
        scale_grid = np.linspace(scale_bounds[0], scale_bounds[1], 50)
        shape_grid = np.linspace(shape_bounds[0], shape_bounds[1], 50)

        # declare profile functions
        scale_profile_func = lambda x: self.loglik(
            [x, self.shape], self.threshold, self.data
        )
        shape_profile_func = lambda x: self.loglik(
            [self.scale, x], self.threshold, self.data
        )

        loss_value = scale_profile_func(self.scale)

        scale_profile = np.array([scale_profile_func(x) for x in scale_grid])
        shape_profile = np.array([shape_profile_func(x) for x in shape_grid])

        alpha = 2 if loss_value > 0 else 0.5

        # filter to almost-optimal values

        def filter_grid(grid, optimum):
            radius = 2 * np.abs(optimum)
            return np.logical_and(
                np.logical_not(np.isnan(grid)),
                np.isfinite(grid),
                np.abs(grid - optimum) < radius,
            )

        scale_filter = filter_grid(scale_profile, loss_value)
        shape_filter = filter_grid(shape_profile, loss_value)

        valid_scales = scale_grid[scale_filter]
        valid_scale_profile = scale_profile[scale_filter]

        axs[0, 0].plot(
            valid_scales, valid_scale_profile, color=self._figure_color_palette[0]
        )
        axs[0, 0].vlines(
            x=self.scale,
            ymin=min(valid_scale_profile),
            ymax=max(valid_scale_profile),
            linestyle="dashed",
            colors=self._figure_color_palette[1],
        )
        axs[0, 0].title.set_text("Profile scale log-likelihood")
        axs[0, 0].set_xlabel("Scale")
        axs[0, 0].set_ylabel("log-likelihood")
        axs[0, 0].grid()

        valid_shapes = shape_grid[shape_filter]
        valid_shape_profile = shape_profile[shape_filter]

        axs[0, 1].plot(
            valid_shapes, valid_shape_profile, color=self._figure_color_palette[0]
        )
        axs[0, 1].vlines(
            x=self.shape,
            ymin=min(valid_shape_profile),
            ymax=max(valid_shape_profile),
            linestyle="dashed",
            colors=self._figure_color_palette[1],
        )
        axs[0, 1].title.set_text("Profile shape log-likelihood")
        axs[0, 1].set_xlabel("Shape")
        axs[0, 1].set_ylabel("log-likelihood")
        axs[0, 1].grid()

        ######################## Log-likelihood surface ###############
        scale_grid, shape_grid = np.mgrid[
            min(valid_scales) : max(valid_scales) : 2 * np.sqrt(mle_cov[0, 0]) / 50,
            shape_bounds[0] : shape_bounds[1] : 2 * np.sqrt(mle_cov[0, 0]) / 50,
        ]

        scale_mesh, shape_mesh = np.meshgrid(
            np.linspace(min(valid_scales), max(valid_scales), 50),
            np.linspace(min(valid_shapes), max(valid_shapes), 50),
        )

        max_x = max(self.data)

        z = np.empty(scale_mesh.shape)
        for i in range(scale_mesh.shape[0]):
            for j in range(scale_mesh.shape[1]):
                shape = shape_mesh[i, j]
                scale = scale_mesh[i, j]
                if shape < 0 and self.threshold - scale / shape < max_x:
                    z[i, j] = np.nan
                else:
                    z[i, j] = self.loglik([scale, shape], self.threshold, self.data)

        # negate z to recover true loglikelihood
        axs[1, 0].contourf(scale_mesh, shape_mesh, z, levels=15)
        axs[1, 0].scatter([self.scale], [self.shape], color="darkorange", s=2)
        axs[1, 0].annotate(text="MLE", xy=(self.scale, self.shape), color="darkorange")
        axs[1, 0].title.set_text("Log-likelihood surface")
        axs[1, 0].set_xlabel("Scale")
        axs[1, 0].set_ylabel("Shape")

        ############## histogram vs density ################
        hist_data = axs[1, 1].hist(
            self.data, bins=25, edgecolor="white", color=self._figure_color_palette[0]
        )

        range_min, range_max = min(self.data), max(self.data)
        x_axis = np.linspace(range_min, range_max, 100)
        pdf_vals = self.pdf(x_axis)
        y_axis = hist_data[0][0] / pdf_vals[0] * pdf_vals

        axs[1, 1].plot(x_axis, y_axis, color=self._figure_color_palette[1])
        axs[1, 1].title.set_text("Data vs rescaled fitted density")
        axs[1, 1].set_xlabel("Exceedance data")
        axs[1, 1].yaxis.set_visible(False)  # Hide only x axis
        # axs[0, 0].set_aspect('equal', 'box')

        ############# Q-Q plot ################
        probability_range = np.linspace(0.01, 0.99, 99)
        empirical_quantiles = np.quantile(self.data, probability_range)
        tail_quantiles = self.ppf(probability_range)

        axs[2, 0].scatter(
            tail_quantiles, empirical_quantiles, color=self._figure_color_palette[0]
        )
        min_x, max_x = min(tail_quantiles), max(tail_quantiles)
        # axs[0,1].set_aspect('equal', 'box')
        axs[2, 0].title.set_text("Q-Q plot")
        axs[2, 0].set_xlabel("model quantiles")
        axs[2, 0].set_ylabel("Data quantiles")
        axs[2, 0].grid()
        axs[2, 0].plot([min_x, max_x], [min_x, max_x], linestyle="--", color="black")

        ############ Mean return plot ###############
        # scale, shape = self.scale, self.shape

        # n_obs = len(self.data)
        # exceedance_frequency = 1/np.logspace(1,4,20)
        # return_levels = self.ppf(1 - exceedance_frequency)

        # axs[2,1].plot(1.0/exceedance_frequency,return_levels,color=self._figure_color_palette[0])
        # axs[2,1].set_xscale("log")
        # axs[2,1].title.set_text("Exceedance model's return levels")
        # axs[2,1].set_xlabel('1/frequency')
        # axs[2,1].set_ylabel('Return level')
        # axs[2,1].grid()

        # exs_prob = 1 #carried over from older code

        # m = 10**np.linspace(np.log(1/exs_prob + 1)/np.log(10), 3,20)
        # return_levels = self.ppf(1 - 1/(exs_prob*m))

        # axs[2,1].plot(m,return_levels,color=self._figure_color_palette[0])
        # axs[2,1].set_xscale("log")
        # axs[2,1].title.set_text('Exceedance return levels')
        # axs[2,1].set_xlabel('1/frequency')
        # axs[2,1].set_ylabel('Return level')
        # axs[2,1].grid()

        # try:
        #   #for this bit, look at An Introduction to Statistical selfing of Extreme Values, p.82
        #   mle_cov = self.mle_cov()
        #   eigenvals, eigenvecs = np.linalg.eig(mle_cov)
        #   if np.all(eigenvals > 0):
        #     covariance = np.eye(3)
        #     covariance[1::,1::] = mle_cov
        #     covariance[0,0] = exs_prob*(1-exs_prob)/n_obs
        #     #
        #     return_stdevs = []
        #     for m_ in m:
        #       quantile_grad = np.array([
        #         scale*m_**(shape)*exs_prob**(shape-1),
        #         shape**(-1)*((exs_prob*m_)**shape-1),
        #         -scale*shape**(-2)*((exs_prob*m_)**shape-1)+scale*shape**(-1)*(exs_prob*m_)**shape*np.log(exs_prob*m_)
        #         ])
        #       #
        #       sdev = np.sqrt(quantile_grad.T.dot(covariance).dot(quantile_grad))
        #       return_stdevs.append(sdev)
        #     #
        #     axs[2,1].fill_between(m, return_levels - return_stdevs, return_levels + return_stdevs, alpha=0.2, color=self._figure_color_palette[1])
        #   else:
        #     warnings.warn("Covariance MLE matrix is not positive definite; it might be ill-conditioned", stacklevel=2)
        # except Exception as e:
        #   warnings.warn(f"Confidence bands for return level could not be calculated; covariance matrix might be ill-conditioned; full trace: {traceback.format_exc()}", stacklevel=2)

        plt.tight_layout()
        return fig


class GPTailMixture(BaseDistribution):

    """Mixture distribution with generalised Pareto components. This is a base class for  Bayesian generalised Pareto tail models, which can be seen as an uniformly weighted mixture of the posterior samples; the convolution of a discrete (or empirical) distribution with a generalised Pareto distribution also results in a mixture of this kind. Most methods inherited from `BaseDistribution` have an extra argument `return_all`. When it is True, the full posterior sample of the method evaluation is returned.

    Args:
        data(np.ndarray, optional): Data that induced the model, if applicable
        weights (np.ndarray): component weights
        thresholds (np.ndarray): vector of threshold parameters, one for each component
        scales (np.ndarray): vector of scale parameters, one for each component
        shapes (np.ndarray): vector of shape parameters, one for each component
    """

    data: t.Optional[np.ndarray]
    weights: np.ndarray
    thresholds: np.ndarray
    scales: np.ndarray
    shapes: np.ndarray

    def __repr__(self):
        return f"Mixture of generalised Pareto distributions with {len(self.weights)} components"

    @validator("weights", allow_reuse=True)
    def check_weigths(cls, weights):
        if not np.isclose(np.sum(weights), 1, atol=cls._error_tol):
            raise ValueError(f"Weights don't sum 1 (sum = {np.sum(weights)})")
        elif np.any(weights <= 0):
            raise ValueError("Negative or null weights are present")
        else:
            return weights

    def moment(self, n: int, return_all: bool = False) -> float:
        """Returns the n-th order moment

        Args:
            n (int): Moment's order
            return_all (bool, optional): If `True`, all posterior samples of the value are returned; otherwise  the moment value of the posterior predictive distribution is returned. The posterior distribution is taken to be the empirical distribution of posterior samples.

        Returns:
            float: moment value
        """
        vals = np.array(
            [
                gpdist.moment(n, loc=threshold, c=shape, scale=scale)
                for threshold, shape, scale in zip(
                    self.thresholds, self.shapes, self.scales
                )
            ]
        )

        if return_all:
            return vals
        else:
            return np.dot(self.weights, vals)

    def mean(self, return_all: bool = False) -> float:
        """Returns the mean value

        Args:
            return_all (bool, optional): If `True`, all posterior samples of the value are returned; the mean of the posterior predictive distribution is evaluated. The posterior distribution is taken to be the empirical distribution of posterior samples.

        Returns:
            float: mean value
        """
        vals = gpdist.mean(loc=self.thresholds, c=self.shapes, scale=self.scales)

        if return_all:
            return vals
        else:
            return np.dot(self.weights, vals)

    def std(self, return_all: bool = False) -> float:
        """Standard deviation value

        Args:
            return_all (bool, optional): If `True`, all posterior samples of the value are returned; otherwise the standard deviation of the posterior predictive distribution is returned. The posterior distribution is taken to be the empirical distribution of posterior samples.

        Returns:
            float: standard deviation value
        """
        var = gpdist.var(loc=self.thresholds, c=self.shapes, scale=self.scales)

        mean = gpdist.mean(loc=self.thresholds, c=self.shapes, scale=self.scales)

        if return_all:
            return np.sqrt(var)
        else:
            return np.sqrt(np.dot(self.weights, var + mean**2) - self.mean() ** 2)

    def cdf(
        self, x: t.Union[float, np.ndarray], return_all: bool = False
    ) -> t.Union[float, np.ndarray]:
        """Evaluates the CDF function

        Args:
            x (t.Union[float, np.ndarray]): Point at which to evaluate it
            return_all (bool, optional): If `True`, all posterior samples of the value are returned; the CDF of the posterior predictive distribution is evaluated. The posterior distribution is taken to be the empirical distribution of posterior samples.

        Returns:
            t.Union[float, np.ndarray]: CDF value
        """

        if isinstance(x, np.ndarray):
            return np.array([self.cdf(elem, return_all) for elem in x])

        vals = gpdist.cdf(x, loc=self.thresholds, c=self.shapes, scale=self.scales)

        if return_all:
            return vals
        else:
            return np.dot(self.weights, vals)

    def pdf(
        self, x: t.Union[float, np.ndarray], return_all: bool = False
    ) -> t.Union[float, np.ndarray]:
        """Evaluates the pdf function

        Args:
            x (t.Union[float, np.ndarray]): Point at which to evaluate it
            return_all (bool, optional): If `True`, all posterior samples of the value are returned; otherwise the pdf of the posterior predictive distribution is evaluated. The posterior distribution is taken to be the empirical distribution of posterior samples.

        Returns:
            t.Union[float, np.ndarray]: pdf value
        """

        if isinstance(x, np.ndarray):
            return np.array([self.pdf(elem, return_all) for elem in x])

        vals = gpdist.pdf(x, loc=self.thresholds, c=self.shapes, scale=self.scales)

        if return_all:
            return vals
        else:
            return np.dot(self.weights, vals)

    def ppf(
        self, q: t.Union[float, np.ndarray], return_all: bool = False
    ) -> t.Union[float, np.ndarray]:
        """Evaluates the quantile function

        Args:
            x (t.Union[float, np.ndarray]): probability level
            return_all (bool, optional): If `True`, all posterior samples of the value are returned; otherwise  the quantile function of the posterior predictive distribution is evaluated. The posterior distribution is taken to be the empirical distribution of posterior samples.

        Returns:
            t.Union[float, np.ndarray]: quantile function value value
        """
        if isinstance(q, np.ndarray):
            return np.array([self.ppf(elem, return_all) for elem in q])

        vals = gpdist.ppf(q, loc=self.thresholds, c=self.shapes, scale=self.scales)

        if return_all:
            return vals
        else:

            def target_function(x):
                return self.cdf(x) - q

            x0 = np.dot(self.weights, vals)
            return root_scalar(target_function, x0=x0, x1=x0 + 1, method="secant").root

    def cvar(self, p: float, return_all: bool = False) -> float:
        """Returns the conditional value at risk for a given probability level

        Args:
            x (t.Union[float, np.ndarray]): probability level
            return_all (bool, optional): If `True`, all posterior samples of the value are returned; otherwise  the cvar of the posterior predictive distribution is evaluated. The posterior distribution is taken to be the empirical distribution of posterior samples.

        Returns:
            t.Union[float, np.ndarray]: conditional value at risk
        """
        if p < 0 or p >= 1:
            raise ValueError("p must be in the open interval (0,1)")

        return (self >= self.ppf(p)).mean(return_all)

    def simulate(self, size: int) -> np.ndarray:
        n_samples = np.random.multinomial(n=size, pvals=self.weights, size=1)[0]
        indices = (n_samples > 0).nonzero()[0]
        samples = [
            gpdist.rvs(
                size=n_samples[i],
                c=self.shapes[i],
                scale=self.scales[i],
                loc=self.thresholds[i],
            )
            for i in indices
        ]
        return np.concatenate(samples, axis=0)

    def __gt__(self, other: float) -> GPTailMixture:
        exceedance_prob = 1 - self.cdf(other)
        # prob_exceedance = np.dot(self.weights, prob_cond_exceedance)
        if exceedance_prob == 0:
            raise ValueError(
                f"There is no probability mass above {other}; conditional distribution does not exist."
            )

        conditional_weights = (
            self.weights
            * gpdist.sf(other, c=self.shapes, scale=self.scales, loc=self.thresholds)
            / exceedance_prob
        )

        indices = (conditional_weights > 0).nonzero()[
            0
        ]  # indices of mixture components with nonzero exceedance probability

        new_weights = conditional_weights[indices]

        # disable warnings temporarily
        # with warnings.catch_warnings():
        #   warnings.simplefilter("ignore")
        #   new_thresholds = np.array([(GPTail(threshold = mu, shape = xi, scale = sigma) >= other).threshold for mu, sigma, xi in zip(self.thresholds[indices], self.scales[indices], self.shapes[indices])])

        new_thresholds = np.array(
            [max(other, threshold) for threshold in self.thresholds[indices]]
        )
        new_shapes = self.shapes[indices]
        new_scales = self.scales[indices] + new_shapes * (
            new_thresholds - self.thresholds[indices]
        )
        # new_thresholds = np.clip(self.thresholds[indices], a_min=other, a_max = np.Inf)
        # new_shapes = self.shapes[indices]
        # new_scales = self.scales[indices] + new_shapes*np.clip(other - new_thresholds, a_min = 0.0, a_max=np.Inf)

        if self.data is not None and np.all(self.data < other):
            warnings.warn(
                f"No observed data above {other}; setting data to None in conditioned model",
                stacklevel=2,
            )
            new_data = None
        elif self.data is None:
            new_data = None
        else:
            new_data = self.data[self.data > other]

        return type(self)(
            weights=new_weights,
            thresholds=new_thresholds,
            shapes=new_shapes,
            scales=new_scales,
            data=new_data,
        )

    def __ge__(self, other: float) -> BaseDistribution:
        return self.__gt__(other)

    def __add__(self, other: self._allowed_scalar_types):

        if type(other) not in self._allowed_scalar_types:
            raise TypeError(f"+ is implemented for types {self._allowed_scalar_types}")

        new_data = None if self.data is None else self.data + other

        return type(self)(
            weights=self.weights,
            thresholds=self.thresholds + other,
            shapes=self.shapes,
            scales=self.scales,
            data=new_data,
        )

    def __mul__(self, other: self._allowed_scalar_types):

        if type(other) not in self._allowed_scalar_types:
            raise TypeError(f"* is implemented for types {self._allowed_scalar_types}")

        if other <= 0:
            raise ValueError(f"product supported for positive scalars only")

        new_data = None if self.data is None else other * self.data

        return type(self)(
            weights=self.weights,
            thresholds=other * self.thresholds,
            shapes=self.shapes,
            scales=other * self.scales,
            data=new_data,
        )


class Empirical(BaseDistribution):

    """Model for an empirical probability distribution, induced by a sample of data.

    Args:
        support (np.ndarray): distribution support
        pdf_values (np.ndarray): pdf array
        data (np.array, optional): data
    """

    _sum_compatible = (GPTail, Mixture, GPTailMixture)

    support: np.ndarray
    pdf_values: np.ndarray
    data: t.Optional[np.ndarray]

    def __repr__(self):
        if self.data is not None:
            return f"Empirical distribution with {len(self.data)} points"
        else:
            return f"Discrete distribution with support of size {len(self.pdf_values)}"

    @validator("pdf_values", allow_reuse=True)
    def check_pdf_values(cls, pdf_values):
        if np.any(pdf_values < -cls._error_tol):
            raise ValueError("There are negative pdf values")
        if not np.isclose(np.sum(pdf_values), 1, atol=cls._error_tol):
            print(f"sum: {np.sum(pdf_values)}, pdf vals: {pdf_values}")
            raise ValueError("pdf values don't sum 1")
        # pdf_values = np.clip(pdf_values, a_min = 0.0, a_max = 1.0)
        # # normalise
        # pdf_values = pdf_values/np.sum(pdf_values)

        return pdf_values

    @property
    def is_valid(self):
        n = len(self.support)
        m = len(self.pdf_values)

        if n != m:
            raise ValueError("Lengths of support and pdf arrays don't match")

        if not np.all(np.diff(self.support) > 0):
            raise ValueError("Support array must be in increasing order")

        return True

    @property
    def pdf_lookup(self):
        """Mapping from values in the support to their probability mass"""
        return {key: val for key, val in zip(self.support, self.pdf_values)}

    @property
    def min(self):
        """Minimum value in the support"""
        return self.support[0]

    @property
    def max(self):
        """Maximum value in the support"""
        return self.support[-1]

    @property
    def cdf_values(self):
        """Mapping from values in the support to their cumulative probability"""
        x = np.cumsum(self.pdf_values)
        # make sure cdf reaches 1
        x[-1] = 1.0
        return x

    @property
    def ecdf(self):
        """Mapping from values in the support to their cumulative probability"""
        cdf_vals = np.cumsum(self.pdf_values)
        # make sure cdf reaches 1
        cdf_vals[-1] = 1.0

        return ed.StepFunction(x=self.support, y=cdf_vals, side="right")

    @property
    def ecdf_inv(self):
        """Linearly interpolated mapping from probability values to their quantiles"""
        return ed.monotone_fn_inverter(self.ecdf, self.support)

    def __mul__(self, factor: float):

        if not isinstance(factor, self._allowed_scalar_types) or factor == 0:
            raise TypeError(
                f"multiplication is supported only for nonzero instances of type:{self._allowed_scalar_types}"
            )

        new_data = None if self.data is None else self.data * factor
        return Empirical(
            support=factor * self.support,
            pdf_values=np.copy(self.pdf_values, order="C"),
            data=new_data,
        )

    def __add__(self, other: t.Union[int, float, GPTail, Mixture, GPTailMixture]):

        if isinstance(other, self._allowed_scalar_types):
            new_data = None if self.data is None else self.data + other
            return Empirical(
                support=self.support + other,
                pdf_values=np.copy(self.pdf_values, order="C"),
                data=new_data,
            )

        elif isinstance(other, GPTail):

            indices = (self.pdf_values > 0).nonzero()[0]
            nz_pdf = self.pdf_values[indices]
            nz_support = self.support[indices]
            # mixed GPTails don't carry over exceedance data for efficient memory use
            # dists = [GPTail(threshold=other.threshold + x, scale=other.scale, shape=other.shape) for x in nz_support]

            return GPTailMixture(
                data=other.data,
                weights=nz_pdf,
                thresholds=other.threshold + nz_support,
                scales=np.array([other.scale for w in nz_support]),
                shapes=np.array([other.shape for w in nz_support]),
            )

        elif isinstance(other, Mixture):
            return Mixture(
                weights=other.weights,
                distributions=[self + dist for dist in other.distributions],
            )

        elif isinstance(other, GPTailMixture):
            # Return a new mixture where the old mixture is replicated for each point in the discrete support
            new_weights = np.concatenate(
                [w * other.weights for w in self.pdf_values], axis=0
            )
            new_th = np.concatenate(
                [other.thresholds + t for t in self.support], axis=0
            )
            new_scales = np.tile(other.scales, len(self.support))
            new_shapes = np.tile(other.shapes, len(self.support))
            return GPTailMixture(
                data=other.data,
                weights=new_weights[new_weights > 0],
                thresholds=new_th[new_weights > 0],
                scales=new_scales[new_weights > 0],
                shapes=new_shapes[new_weights > 0],
            )

        else:
            raise TypeError(f"+ is supported only for types: {self._sum_compatible}")

    def __neg__(self):

        return self.map(lambda x: -x)

    def __sub__(self, other: Empirical):

        if isinstance(other, (Empirical,) + self._allowed_scalar_types):

            return self + (-other)

        else:
            raise TypeError(
                "Subtraction is only defined for instances of Empirical or float "
            )

    def __ge__(self, other: float):

        if not isinstance(other, self._allowed_scalar_types):
            raise TypeError(
                f">= is implemented for instances of types : {self._allowed_scalar_types}"
            )

        if 1 - self.cdf(other) + self.pdf(other) == 0.0:
            raise ValueError(
                f"No probability mass above conditional threshold ({other})."
            )

        index = self.support >= other

        new_data = None if self.data is None else self.data[self.data >= other]

        pdf_vals = self.pdf_values[index] / np.sum(self.pdf_values[index])

        return type(self)(
            support=self.support[index], pdf_values=pdf_vals, data=new_data
        )

    def __gt__(self, other: float):

        if not isinstance(other, self._allowed_scalar_types):
            raise TypeError(
                f"> is implemented for instances of types: {self._allowed_scalar_types}"
            )

        if 1 - self.cdf(other) == 0:
            raise ValueError(
                f"No probability mass above conditional threshold ({other})."
            )

        index = self.support > other

        new_data = None if self.data is None else self.data[self.data > other]

        return type(self)(
            support=self.support[index],
            pdf_values=self.pdf_values[index] / np.sum(self.pdf_values[index]),
            data=new_data,
        )

    def simulate(self, size: int) -> np.ndarray:
        """Draws simulated values from the distribution

        Args:
            size (int): Sample size

        Returns:
            np.ndarray: simulated sample
        """
        return np.random.choice(self.support, size=size, p=self.pdf_values)

    def moment(self, n: int, **kwargs) -> float:
        """Evaluates the n-th moment of the distribution

        Args:
            n (int): moment order
            **kwargs: dummy additional arguments (not used)

        Returns:
            float: n-th moment value
        """
        return np.sum(self.pdf_values * self.support**n)

    def ppf(
        self, q: t.Union[float, np.ndarray], **kwargs
    ) -> t.Union[float, np.ndarray]:
        """Inverse CDF function; it uses linear interpolation.

        Args:
            q (t.Union[float, np.ndarray]): probability level

        Returns:
            t.Union[float, np.ndarray]: Linearly interpolated quantile function

        """
        is_scalar = isinstance(q, self._allowed_scalar_types)

        if is_scalar:
            q = np.array([q])

        if np.any(q < 0) or np.any(q > 1):
            raise ValueError(f"q needs to be in the interval [0,1]")

        ppf_values = np.empty((len(q),))

        left_vals_idx = q <= self.ecdf_inv.x[0]
        right_vals_idx = q >= self.ecdf_inv.x[-1]
        inside_vals_idx = np.logical_and(
            np.logical_not(left_vals_idx), np.logical_not(right_vals_idx)
        )

        ppf_values[left_vals_idx] = self.ecdf_inv.y[0]
        ppf_values[right_vals_idx] = self.ecdf_inv.y[-1]
        ppf_values[inside_vals_idx] = self.ecdf_inv(q[inside_vals_idx])

        if is_scalar:
            return ppf_values[0]
        else:
            return ppf_values

    def cdf(
        self, x: t.Union[float, np.ndarray], **kwargs
    ) -> t.Union[float, np.ndarray]:
        return self.ecdf(x)

    def pdf(self, x: t.Union[float, np.ndarray], **kwargs):

        if isinstance(x, np.ndarray):
            return np.array([self.pdf(elem) for elem in x])

        try:
            pdf_val = self.pdf_lookup[x]
            return pdf_val
        except KeyError as e:
            return 0.0

    def std(self, **kwargs):
        return np.sqrt(self.map(lambda x: x - self.mean()).moment(2))

    @classmethod
    def from_data(cls, data: np.array):

        support, unnorm_pdf = np.unique(data, return_counts=True)
        n = np.sum(unnorm_pdf)
        return cls(support=support, pdf_values=unnorm_pdf / n, data=data)

    def plot_mean_residual_life(self, threshold: float) -> matplotlib.figure.Figure:
        """Produces a mean residual life plot for the tail of the distribution above a given threshold

        Args:
            threshold (float): threshold value

        Returns:
            matplotlib.figure.Figure: figure


        """
        fig = plt.figure()
        fitted = self.fit_tail_model(threshold)
        scale, shape = fitted.tail.scale, fitted.tail.shape

        x_vals = np.linspace(threshold, max(self.data))
        if shape >= 1:
            raise ValueError(
                f"Expectation is not finite: fitted shape parameter is {shape}"
            )
        # y_vals = (scale + shape*x_vals)/(1-shape)
        y_vals = np.array(
            [np.mean(fitted.tail.data[fitted.tail.data >= x]) for x in x_vals]
        )
        plt.plot(x_vals, y_vals, color=self._figure_color_palette[0])
        plt.scatter(x_vals, y_vals, color=self._figure_color_palette[0])
        plt.title("Mean residual life plot")
        plt.xlabel("Threshold")
        plt.ylabel("Mean exceedance")
        return fig

    def fit_tail_model(
        self, threshold: float, bayesian=False, **kwargs
    ) -> t.Union[EmpiricalWithGPTail, EmpiricalWithBayesianGPTail]:
        """Fits a tail GP model above a specified threshold and return the fitted semiparametric model

        Args:
            threshold (float): Threshold above which a Generalised Pareto distribution will be fitted
            bayesian (bool, optional): If True, fit model through Bayesian inference
            **kwargs: Additional parameters passed to BayesianGPTail.fit or to GPTail.fit

        Returns:
            t.Union[EmpiricalWithGPTail, EmpiricalWithBayesianGPTail]

        """

        if threshold >= self.max:
            raise ValueError(
                "Empirical pdf is 0 above the provided threshold. Select a lower threshold for estimation."
            )

        if self.data is None:
            raise ValueError(
                "Data is not set for this distribution, so a tail model cannot be fitted. You can simulate from it and use the sampled data instead"
            )
        else:
            data = self.data

        if bayesian:
            return EmpiricalWithBayesianGPTail.from_data(
                data, threshold, bin_empirical=isinstance(self, Binned), **kwargs
            )
        else:
            return EmpiricalWithGPTail.from_data(
                data, threshold, bin_empirical=isinstance(self, Binned), **kwargs
            )

    def map(self, f: t.Callable) -> Empirical:
        """Returns the distribution resulting from an arbitrary transformation

        Args:
            f (t.Callable): Target transformation; it should take a numpy array as input

        """
        dist_df = pd.DataFrame({"pdf": self.pdf_values, "support": f(self.support)})
        mapped_dist_df = (
            dist_df.groupby("support").sum().reset_index().sort_values("support")
        )

        return Empirical(
            support=np.array(mapped_dist_df["support"]),
            pdf_values=np.array(mapped_dist_df["pdf"]),
            data=f(self.data) if self.data is not None else None,
        )

    def to_integer(self):
        """Convert to Binned distribution"""
        return Binned.from_empirical(self)


class Binned(Empirical):

    """Empirical distribution with an integer support. This allows it to be convolved with other integer distribution to obtain the distribution of a sum of random variables, assuming independence between the summands."""

    _supported_types = [np.int64, int]

    def __repr__(self):
        return f"Integer distribution with support of size {len(self.pdf_values)} ({np.sum(self.pdf_values> 0)} non-zero)"

    @validator("support", allow_reuse=True)
    def integer_support(cls, support):
        n = len(support)
        if not np.all(np.diff(support) == 1):
            raise ValueError(
                "The support vector must contain every integer between its minimum and maximum value"
            )
        elif support.dtype not in cls._supported_types:
            raise ValueError(
                f"Support entry types must be one of {self._supported_types}"
            )
        else:
            return support

    def __mul__(self, factor: self._allowed_scalar_types):

        if not isinstance(factor, self._allowed_scalar_types) or factor == 0:
            raise TypeError(
                f"multiplication is supported only for nonzero instances of type:{self._allowed_scalar_types}"
            )

        if isinstance(factor, int):
            return self.from_empirical(float(factor) * self)
            # new_data = None if self.data is None else self.data*factor
            # return Binned(support = factor*self.support, pdf_values = self.pdf_values, data = new_data)

        else:
            return super().__mul__(factor)

    def __add__(self, other: t.Union[float, int, Binned, GPTail, Mixture]):

        if isinstance(other, int):
            new_support = np.arange(
                min(self.support) + other, max(self.support) + other + 1
            )
            new_data = None if self.data is None else self.data + other
            return Binned(
                support=new_support,
                pdf_values=np.copy(self.pdf_values, order="C"),
                data=new_data,
            )

        if isinstance(other, Binned):
            new_support = np.arange(
                min(self.support) + min(other.support),
                max(self.support) + max(other.support) + 1,
            )

            pdf_vals = fftconvolve(self.pdf_values, other.pdf_values)
            pdf_vals = np.abs(
                pdf_vals
            )  # some values are negative due to numerical rounding error, set to positive (they are infinitesimal in any case)
            pdf_vals = pdf_vals / np.sum(pdf_vals)

            # this block removes trailing support elements with zero probability
            while pdf_vals[-1] == 0.0:
                new_support = new_support[0 : len(pdf_vals) - 1]
                pdf_vals = pdf_vals[0 : len(pdf_vals) - 1]

            # add any missing probability mass
            error = 1.0 - np.sum(pdf_vals)
            pdf_vals[0] += np.sign(error) * np.abs(error)

            return Binned(
                support=new_support,
                pdf_values=np.abs(pdf_vals),  # some negative values persist
                data=None,
            )

        else:
            return super().__add__(other)

    def __sub__(self, other: float):

        if isinstance(other, (int, Binned)):
            return self + (-other)
        else:
            super().__sub__(other)

    def __neg__(self):

        return super().__neg__().to_integer()

    @classmethod
    def from_data(cls, data: np.ndarray) -> Binned:
        """Instantiates a Binned object from observed data

        Args:
            data (np.ndarray): Observed data

        Returns:
            Binned: Integer distribution object
        """
        data = np.array(data)
        if data.dtype not in self._supported_types:
            warnings.warn(
                "Casting input data to integer values by rounding", stacklevel=2
            )
            data = data.astype(np.int64)

        return super().from_data(data).to_integer()

    @classmethod
    def from_empirical(cls, dist: Empirical) -> Binned:
        """Takes an Empirical instance with discrete support and creates a Binned instance by casting the support to integer values and filling the gaps in the support


        Args:
            empirical_dist (Empirical): Empirical instance

        Returns:
            Binned: Binned distribution


        """
        empirical_dist = dist.map(lambda x: np.round(x))
        base_support = empirical_dist.support.astype(Binned._supported_types[0])
        full_support = np.arange(min(base_support), max(base_support) + 1)

        full_pdf = np.zeros(
            (
                len(
                    full_support,
                )
            ),
            dtype=np.float64,
        )
        indices = base_support - min(base_support)
        full_pdf[indices] = empirical_dist.pdf_values
        # try:
        #   full_pdf[indices] = empirical_dist.pdf_values
        # except Exception as e:
        #   print(f"base support: {base_support}")
        #   print(f"full support: {full_support}")

        data = (
            None
            if empirical_dist.data is None
            else empirical_dist.data.astype(Binned._supported_types[0])
        )

        return Binned(
            support=full_support.astype(cls._supported_types[0]),
            pdf_values=full_pdf,
            data=data,
        )

    def pdf(self, x: t.Union[float, np.ndarray], **kwargs):

        if isinstance(x, np.ndarray):
            return np.array([self.pdf(elem) for elem in x])

        if not isinstance(x, (int, np.int32, np.int64)) or x > self.max or x < self.min:
            return 0.0
        else:
            return self.pdf_values[x - self.min]


class EmpiricalWithGPTail(Mixture):

    """Represents a semiparametric extreme value model with a fitted Generalized Pareto distribution above a certain threshold, and an empirical distribution below it"""

    threshold: float

    def __repr__(self):
        return f"Semiparametric model with generalised Pareto tail. Modeling threshold: {self.threshold}, exceedance probability: {self.exs_prob}"

    @property
    def empirical(self) -> Empirical:
        """Empirical distribution below the modeling threshold

        Returns:
            Empirical: Distribution object
        """
        return self.distributions[0]

    @property
    def tail(self) -> GPTail:
        """Generalised Pareto tail model above modeling threshold

        Returns:
            GPTail: Distribution
        """
        return self.distributions[1]

    # @property
    # def threshold(self) -> float:
    #   """Modeling threshold

    #   Returns:
    #       float: threshold value
    #   """
    #   return self.distributions[1].thresholds

    @property
    def exs_prob(self) -> float:
        """Probability mass above threshold; probability weight of tail model.

        Returns:
            float: weight
        """
        return self.weights[1]

    def ppf(self, q: t.Union[float, np.ndarray]) -> t.Union[float, np.ndarray]:

        is_scalar = isinstance(q, self._allowed_scalar_types)

        if is_scalar:
            q = np.array([q])

        lower_idx = q <= 1 - self.exs_prob
        higher_idx = np.logical_not(lower_idx)

        ppf_values = np.empty((len(q),))
        ppf_values[lower_idx] = self.empirical.ppf(q[lower_idx] / (1 - self.exs_prob))
        ppf_values[higher_idx] = self.tail.ppf(
            (q[higher_idx] - (1 - self.exs_prob)) / self.exs_prob
        )

        if is_scalar:
            return ppf_values[0]
        else:
            return ppf_values

    @classmethod
    def from_data(
        cls, data: np.ndarray, threshold: float, bin_empirical: bool = False, **kwargs
    ) -> EmpiricalWithGPTail:
        """Fits a model from a given data array and threshold value

        Args:
            data (np.ndarray): Data
            threshold (float): Threshold value to use for the tail model
            bin_empirical (bool, optional): Whether to cast empirical mixture component to an integer distribution by rounding
            **kwargs: Additional arguments passed to GPTail.fit


        Returns:
            EmpiricalWithGPTail: Fitted model
        """
        exs_prob = 1 - Empirical.from_data(data).cdf(threshold)

        exceedances = data[data > threshold]

        empirical = Empirical.from_data(data[data <= threshold])
        if bin_empirical:
            empirical = empirical.to_integer()

        tail = GPTail.fit(data=exceedances, threshold=threshold, **kwargs)

        return cls(
            distributions=[empirical, tail],
            weights=np.array([1 - exs_prob, exs_prob]),
            threshold=threshold,
        )

    def plot_diagnostics(self) -> matplotlib.figure.Figure:
        return self.tail.plot_diagnostics()

    def plot_return_levels(self) -> matplotlib.figure.Figure:
        """Returns a figure with a return level plot using the fitted tail model

        Returns:
            matplotlib.figure.Figure: figure

        """
        fig = plt.figure()
        scale, shape = self.tail.scale, self.tail.shape
        exs_prob = self.exs_prob

        if self.tail.data is None:
            n_obs = (
                np.Inf
            )  # this only means that threshold estimation variance is ignored in figure confidence bounds
        else:
            n_obs = len(self.tail.data)

        # exceedance_frequency = 1/np.logspace(1,4,20)
        # exceedance_frequency = exceedance_frequency[exceedance_frequency < exs_prob] #plot only levels inside fitted tail model
        # shown return levels go from largest power of 10th below exceedance prob, to 1/10000-th of that.

        x_min = np.floor(np.log(exs_prob) / np.log(10))
        x_max = x_min - 4
        exceedance_frequency = 10 ** (np.linspace(x_min, x_max, 50))
        return_levels = self.ppf(1 - exceedance_frequency)

        plt.plot(
            1.0 / exceedance_frequency,
            return_levels,
            color=self._figure_color_palette[0],
        )
        plt.xscale("log")
        plt.title(" Return levels")
        plt.xlabel("Return period")
        plt.ylabel("Return level")
        plt.grid()

        try:
            # for this bit, look at An Introduction to Statistical selfing of Extreme Values, p.82
            mle_cov = self.tail.mle_cov()
            eigenvals, eigenvecs = np.linalg.eig(mle_cov)
            if np.all(eigenvals > 0):
                covariance = np.eye(3)
                covariance[1::, 1::] = mle_cov
                covariance[0, 0] = exs_prob * (1 - exs_prob) / n_obs
                #
                return_stdevs = []
                for m in 1.0 / exceedance_frequency:
                    quantile_grad = np.array(
                        [
                            scale * m * (m * exs_prob) ** (shape - 1),
                            1 / shape * ((exs_prob * m) ** shape - 1),
                            -scale / shape**2 * ((exs_prob * m) ** shape - 1)
                            + scale
                            / shape
                            * (exs_prob * m) ** shape
                            * np.log(exs_prob * m),
                        ]
                    )
                    #
                    sdev = np.sqrt(quantile_grad.T.dot(covariance).dot(quantile_grad))
                    return_stdevs.append(sdev)
                #
                return_stdevs = np.array(return_stdevs)
                plt.fill_between(
                    1.0 / exceedance_frequency,
                    return_levels - 1.96 * return_stdevs,
                    return_levels + 1.96 * return_stdevs,
                    alpha=0.2,
                    color=self._figure_color_palette[1],
                    linestyle="dashed",
                )
            else:
                warnings.warn(
                    "Covariance MLE matrix is not positive definite; it might be ill-conditioned",
                    stacklevel=2,
                )
        except Exception as e:
            warnings.warn(
                f"Confidence bands for return level could not be calculated; covariance matrix might be ill-conditioned; full trace: {traceback.format_exc()}",
                stacklevel=2,
            )

        return fig


class BayesianGPTail(GPTailMixture):

    """Generalised Pareto tail model which is fitted through Bayesian inference, using uninformative (uniform) priors for the shape and scale parameters

    Args:
        threshold (float): modeling threshold
        data (np.array, optional): exceedance data
        shape (np.ndarray): sample from posterior shape distribution
        scale (np.ndarray): sample from posterior scale distribution
    """

    # _self.posterior_trace = None

    @classmethod
    def fit(
        cls,
        data: np.ndarray,
        threshold: float,
        max_posterior_samples: int = 1000,
        chain_length: int = 2000,
        x0: np.ndarray = None,
        plot_diagnostics: bool = False,
        n_walkers: int = 32,
        n_cores: int = 4,
        burn_in: int = 100,
        thinning: int = None,
        log_prior: t.Callable = None,
    ) -> BayesianGPTail:
        """Fits a Generalised Pareto model through Bayesian inference using exceedance data, starting with flat, uninformative priors for both and sampling from posterior shape and scale parameter distributions.

        Args:
            data (np.ndarray): observational data
            threshold (float): modeling threshold; location parameter for Generalised Pareto model
            max_posterior_samples (int, optional): Maximum number of posterior samples to keep
            chain_length (int, optional): timesteps in each chain
            x0 (np.ndarray, optional): Starting point for the chains. If None, MLE estimates are used.
            plot_diagnostics (bool, optional): If True, plots MCMC diagnostics in the background.
            n_walkers (int, optional): Number of concurrent paths to use
            n_cores (int, optional): Number of cores to use
            burn_in (int, optional): Number of initial samples to drop
            thinning (int, optional): Thinning factor to reduce autocorrelation; if None, an automatic estimate from emcee's `get_autocorr_time` is used.
            log_prior (t.Callable, optional): Function that takes as input a single length-2 iterable with scale and shape parameters and outputs the prior log-likelihood. If None, a constant prior on the valid parameter support is used.

        Returns:
            BayesianGPTail: fitted model

        """
        exceedances = data[data > threshold]
        x_max = max(data - threshold)

        # def log_likelihood(theta, data):
        #   scale, shape = theta
        #   return np.sum(gpdist.logpdf(data, c=shape, scale=scale, loc=threshold))

        if log_prior is None:

            def log_prior(theta):
                scale, shape = theta
                if scale > 0 and shape > -scale / (x_max):
                    return 0.0
                else:
                    return -np.Inf

        def log_probability(theta, data):
            prior = log_prior(theta)
            if np.isfinite(prior):
                ll = prior + GPTail.loglik(
                    theta, threshold, data
                )  # log_likelihood(theta, data)
                # print(theta, ll)
                return ll
            else:
                return -np.Inf

        exceedances = data[data > threshold]
        ndim = 2

        if x0 is None:
            # make initial guess
            mle_model = GPTail.fit(data=exceedances, threshold=threshold)
            shape, scale = mle_model.shape, mle_model.scale
            x0 = np.array([scale, shape])

        # create random walkers
        pos = x0 + 1e-4 * np.random.randn(n_walkers, ndim)

        with Pool(n_cores) as pool:
            sampler = emcee.EnsembleSampler(
                nwalkers=n_walkers,
                ndim=ndim,
                log_prob_fn=log_probability,
                args=(exceedances,),
            )
            sampler.run_mcmc(pos, chain_length, progress=True)

        samples = sampler.get_chain()

        tau = sampler.get_autocorr_time()
        thinning = int(np.round(np.mean(tau)))
        print(
            f"Using a thinning factor of {thinning} (from emcee.EnsembleSampler.get_autocorr_time)"
        )
        flat_samples = sampler.get_chain(discard=burn_in, thin=thinning, flat=True)

        if flat_samples.shape[0] > max_posterior_samples:
            np.random.shuffle(flat_samples)
            flat_samples = flat_samples[0:max_posterior_samples, :]

        n_samples = flat_samples.shape[0]
        print(f"Got {n_samples} posterior samples.")

        if plot_diagnostics:
            fig, axes = plt.subplots(2, figsize=(10, 7), sharex=True)
            labels = ["scale", "shape"]
            for i in range(ndim):
                ax = axes[i]
                ax.plot(samples[:, :, i], alpha=0.3, color=cls._figure_color_palette[0])
                ax.set_xlim(0, len(samples))
                ax.set_ylabel(labels[i])
                if i == 0:
                    ax.set_title("Chain mixing")
                ax.yaxis.set_label_coords(-0.1, 0.5)
            print("Use pyplot.show() to view chain diagnostics.")

        scale_posterior = flat_samples[:, 0]
        shape_posterior = flat_samples[:, 1]

        return cls(
            weights=1 / n_samples * np.ones((n_samples,), dtype=np.float64),
            thresholds=threshold * np.ones((n_samples,)),
            data=exceedances,
            shapes=shape_posterior,
            scales=scale_posterior,
        )

    def plot_diagnostics(self) -> matplotlib.figure.Figure:
        """Returns a figure with fit diagnostic plots for the GP model

        Returns:
            matplotlib.figure.Figure: figure

        """
        if self.data is None:
            raise ValueError("Exceedance data was not provided for this model.")

        def map_to_colors(vals):
            colours = np.zeros((len(vals), 3))
            norm = Normalize(vmin=vals.min(), vmax=vals.max())
            # Can put any colormap you like here.
            colours = [
                cm.ScalarMappable(norm=norm, cmap="cool").to_rgba(val) for val in vals
            ]
            return colours

        fig, axs = plt.subplots(3, 2)

        #################### Bayesian inference diagnostics: posterior histograms and scatterplot

        axs[0, 0].hist(
            self.scales, bins=25, edgecolor="white", color=self._figure_color_palette[0]
        )
        axs[0, 0].title.set_text("Posterior scale histogram")

        axs[0, 1].hist(
            self.shapes, bins=25, edgecolor="white", color=self._figure_color_palette[0]
        )
        axs[0, 1].title.set_text("Posterior shape histogram")

        posterior = np.concatenate(
            [self.scales.reshape((-1, 1)), self.shapes.reshape((-1, 1))], axis=1
        )
        kernel = kde(posterior.T)

        colours = map_to_colors(kernel.evaluate(posterior.T))

        axs[1, 0].scatter(self.scales, self.shapes, color=colours)
        axs[1, 0].title.set_text("Posterior sample")
        axs[1, 0].set_xlabel("Scale")
        axs[1, 0].set_ylabel("Shape")

        ############## histogram vs density ################
        hist_data = axs[1, 1].hist(
            self.data, bins=25, edgecolor="white", color=self._figure_color_palette[0]
        )

        range_min, range_max = min(self.data), max(self.data)
        x_axis = np.linspace(range_min, range_max, 100)

        mean_pdf_vals = np.array([self.pdf(x) for x in x_axis])
        # q025_pdf_vals = np.array( [np.quantile(self.pdf(x, return_all=True),0.025) for x in x_axis])
        # q975_pdf_vals = np.array( [np.quantile(self.pdf(x, return_all=True),0.975) for x in x_axis])

        y_axis = hist_data[0][0] / mean_pdf_vals[0] * mean_pdf_vals

        axs[1, 1].plot(x_axis, y_axis, color=self._figure_color_palette[1])
        # axs[1,1].fill_between(x_axis, q025_pdf_vals, q975_pdf_vals, alpha=0.2, color=self._figure_color_palette[1])
        axs[1, 1].title.set_text("Data vs fitted density")
        axs[1, 1].set_xlabel("Exceedance data")
        axs[1, 1].yaxis.set_visible(False)  # Hide only x axis
        # axs[0, 0].set_aspect('equal', 'box')

        ############# Q-Q plot ################
        probability_range = np.linspace(0.01, 0.99, 99)
        empirical_quantiles = np.quantile(self.data, probability_range)

        posterior_quantiles = [self.ppf(p, return_all=True) for p in probability_range]

        # hat_return_levels are not the mean of posterior return level samples, as the mean is not an unbiased estimator
        hat_tail_quantiles = np.array([self.ppf(p) for p in probability_range])

        # q025_tail_quantiles = np.array([np.quantile(q, 0.025) for q in posterior_quantiles])
        # q975_tail_quantiles = np.array([np.quantile(q, 0.975) for q in posterior_quantiles])

        axs[2, 0].scatter(
            hat_tail_quantiles, empirical_quantiles, color=self._figure_color_palette[0]
        )
        min_x, max_x = min(hat_tail_quantiles), max(hat_tail_quantiles)
        # axs[0,1].set_aspect('equal', 'box')
        axs[2, 0].title.set_text("Q-Q plot")
        axs[2, 0].set_xlabel("model quantiles")
        axs[2, 0].set_ylabel("Exceedance quantiles")
        axs[2, 0].grid()
        axs[2, 0].plot([min_x, max_x], [min_x, max_x], linestyle="--", color="black")
        # axs[2,0].fill_between(hat_tail_quantiles, q025_tail_quantiles, q975_tail_quantiles, alpha=0.2, color=self._figure_color_palette[1])

        plt.tight_layout()
        return fig


class EmpiricalWithBayesianGPTail(EmpiricalWithGPTail):

    """Semiparametric Bayesian model with an empirical data distribution below a specified threshold and a Generalised Pareto exceedance model above it, fitted through Bayesian inference."""

    @classmethod
    def from_data(
        cls, data: np.ndarray, threshold: float, bin_empirical: bool = False, **kwargs
    ) -> EmpiricalWithBayesianGPTail:
        """Fits a Generalied Pareto tail model from a given data array and threshold value, using Jeffrey's priors

        Args:
            data (np.ndarray): data array
            threshold (float): Threshold value to use for the tail model
            bin_empirical (bool, optional): Whether to cast empirical mixture component to an integer distribution by rounding
            **kwargs: Additional arguments to be passed to BayesianGPTail.fit

        Returns:
            EmpiricalWithBayesianGPTail: Fitted model

        Deleted Parameters:
            n_posterior_samples (int): Number of samples from posterior distribution

        """
        exs_prob = 1 - Empirical.from_data(data).cdf(threshold)

        exceedances = data[data > threshold]

        empirical = Empirical.from_data(data[data <= threshold])
        if bin_empirical:
            empirical = empirical.to_integer()

        tail = BayesianGPTail.fit(data=exceedances, threshold=threshold, **kwargs)

        return cls(
            weights=np.array([1 - exs_prob, exs_prob]),
            distributions=[empirical, tail],
            threshold=threshold,
        )

    def ppf(
        self, q: t.Union[float, np.ndarray], return_all=False
    ) -> t.Union[float, np.ndarray]:
        """Returns the quantile function evaluated at some probability level

        Args:
            q (t.Union[float, np.ndarray]): probability level
            return_all (bool, optional): If True, returns posterior ppf sample; otherwise return pointwise estimator

        Returns:
            t.Union[float, np.ndarray]: ppf value(s).
        """
        if isinstance(q, np.ndarray):
            return np.array([self.ppf(elem) for elem in q])

        if q <= 1 - self.exs_prob:
            val = self.empirical.ppf(q / (1 - self.exs_prob), return_all=return_all)
            # if a vector is expected as output, vectorize scalar
            if return_all:
                return val * np.ones((len(self.tail.shapes),))
            else:
                return val
        else:
            return self.tail.ppf(
                (q - (1 - self.exs_prob)) / self.exs_prob, return_all=return_all
            )

    def plot_return_levels(self) -> matplotlib.figure.Figure:
        """Returns a figure with a return levels using the fitted tail model

        Returns:
            matplotlib.figure.Figure: figure

        """
        fig = plt.figure()
        exs_prob = self.exs_prob

        exceedance_frequency = 1 / np.logspace(1, 4, 20)
        exceedance_frequency = exceedance_frequency[
            exceedance_frequency < exs_prob
        ]  # plot only levels inside fitted tail model

        return_levels = [self.ppf(1 - x, return_all=True) for x in exceedance_frequency]

        # hat_return_levels are not the mean of posterior return level samples, as the mean is not an unbiased estimator
        hat_return_levels = np.array(
            [self.ppf(1 - x, return_all=False) for x in exceedance_frequency]
        )
        q025_return_levels = np.array([np.quantile(r, 0.025) for r in return_levels])
        q975_return_levels = np.array([np.quantile(r, 0.975) for r in return_levels])

        plt.plot(
            1.0 / exceedance_frequency,
            hat_return_levels,
            color=self._figure_color_palette[0],
        )
        plt.fill_between(
            1.0 / exceedance_frequency,
            q025_return_levels,
            q975_return_levels,
            alpha=0.2,
            color=self._figure_color_palette[1],
            linestyle="dashed",
        )
        plt.xscale("log")
        plt.title("Exceedance return levels")
        plt.xlabel("1/frequency")
        plt.ylabel("Return level")
        plt.grid()

        return fig

Classes

class BaseDistribution (**data: Any)

Base interface for available data model types

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class BaseDistribution(BaseModel):

    """Base interface for available data model types"""

    _allowed_scalar_types = (int, float, np.int64, np.int32, np.float32, np.float64)
    _figure_color_palette = ["tab:cyan", "deeppink"]
    _error_tol = 1e-6

    def __repr__(self):
        return "Base distribution object"

    def __str__(self):
        return self.__repr__()

    class Config:
        arbitrary_types_allowed = True

    @abstractmethod
    def simulate(self, size: int) -> np.ndarray:
        """Produces simulated values from model

        Args:
            n (int): Number of samples
        """
        pass

    @abstractmethod
    def moment(self, n: int) -> float:
        """Calculates non-centered moments

        Args:
            n (int): moment order
        """
        pass

    @abstractmethod
    def ppf(self, q: float) -> float:
        """Calculates the corresponding quantile for a given probability value

        Args:
            q (float): probability level
        """
        pass

    def mean(self, **kwargs) -> float:
        """Calculates the expected value

        Args:
            **kwargs: Additional arguments passed to `moments`. This is needed for Bayesian model instances in which a `return_all` parameter can be passed.

        Returns:
            float: mean value

        """
        return self.moment(1, **kwargs)

    def std(self, **kwargs) -> float:
        """Calculates the standard deviation

        Args:
            **kwargs: Additional arguments passed to `moments`. This is needed for Bayesian model instances in which a `return_all` parameter can be passed.

        Returns:
            float: standard deviation value

        """
        return np.sqrt(self.moment(2, **kwargs) - self.mean(**kwargs) ** 2)

    @abstractmethod
    def cdf(self, x: float) -> float:
        """Evaluates the cumulative probability function

        Args:
            x (float): a point in the support
        """
        pass

    @abstractmethod
    def pdf(self, x: float) -> float:
        """Calculates the probability mass or probability density function

        Args:
            x (float): a point in the support
        """
        pass

    def histogram(self, size: int = 1000) -> None:
        """Plots a histogram of a simulated sample

        Args:
            size (int, optional): sample size

        """

        # show histogram from 1k samples
        samples = self.simulate(size=size)
        plt.hist(
            samples, bins=25, edgecolor="white", color=self._figure_color_palette[0]
        )
        plt.title(f"Histogram from {np.round(size/1000,1)}K simulated samples")
        plt.show()

    def plot(self, size: int = 1000) -> None:
        """Plots a histogram of a simulated sample

        Args:
            size (int, optional): sample size

        """
        self.histogram(size)

    def cvar(self, p: float, **kwargs) -> float:
        """Calculates conditional value at risk for a probability level p, defined as the mean conditioned to an exceedance above the p-quantile.

        Args:
            p (float): Description
            **kwargs: Additional arguments passed to `moments`. This is needed for Bayesian model instances in which a `return_all` parameter can be passed.

        Returns:
            float: conditional value at risk

        Raises:
            ValueError: Description

        """
        if not isinstance(p, float) or p < 0 or p >= 1:
            raise ValueError("p must be in the open interval (0,1)")

        return (self >= self.ppf(p)).mean(**kwargs)

    @abstractmethod
    def __gt__(self, other: float) -> BaseDistribution:
        pass

    @abstractmethod
    def __ge__(self, other: float) -> BaseDistribution:
        pass

    @abstractmethod
    def __add__(self, other: self._allowed_scalar_types):
        pass

    @abstractmethod
    def __mul__(self, other: self._allowed_scalar_types):
        pass

    def __sub__(self, other: float):

        if not isinstance(other, self._allowed_scalar_types):
            raise TypeError(
                f"- is implemented for instances of types: {self._allowed_scalar_types}"
            )

        return self.__add__(-other)

    def __rmul__(self, other):
        return self.__mul__(other)

    def __radd__(self, other):
        return self.__add__(other)

    def __rsub__(self, other):
        return self.__sub__(other)

    # __radd__ = __add__

    # __rmul__ = __mul__

    # __rsub__ = __sub__

Ancestors

  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Subclasses

Class variables

var Config

Methods

def cdf(self, x: float) ‑> float

Evaluates the cumulative probability function

Args

x : float
a point in the support
Expand source code
@abstractmethod
def cdf(self, x: float) -> float:
    """Evaluates the cumulative probability function

    Args:
        x (float): a point in the support
    """
    pass
def cvar(self, p: float, **kwargs) ‑> float

Calculates conditional value at risk for a probability level p, defined as the mean conditioned to an exceedance above the p-quantile.

Args

p : float
Description
**kwargs
Additional arguments passed to moments. This is needed for Bayesian model instances in which a return_all parameter can be passed.

Returns

float
conditional value at risk

Raises

ValueError
Description
Expand source code
def cvar(self, p: float, **kwargs) -> float:
    """Calculates conditional value at risk for a probability level p, defined as the mean conditioned to an exceedance above the p-quantile.

    Args:
        p (float): Description
        **kwargs: Additional arguments passed to `moments`. This is needed for Bayesian model instances in which a `return_all` parameter can be passed.

    Returns:
        float: conditional value at risk

    Raises:
        ValueError: Description

    """
    if not isinstance(p, float) or p < 0 or p >= 1:
        raise ValueError("p must be in the open interval (0,1)")

    return (self >= self.ppf(p)).mean(**kwargs)
def histogram(self, size: int = 1000) ‑> None

Plots a histogram of a simulated sample

Args

size : int, optional
sample size
Expand source code
def histogram(self, size: int = 1000) -> None:
    """Plots a histogram of a simulated sample

    Args:
        size (int, optional): sample size

    """

    # show histogram from 1k samples
    samples = self.simulate(size=size)
    plt.hist(
        samples, bins=25, edgecolor="white", color=self._figure_color_palette[0]
    )
    plt.title(f"Histogram from {np.round(size/1000,1)}K simulated samples")
    plt.show()
def mean(self, **kwargs) ‑> float

Calculates the expected value

Args

**kwargs
Additional arguments passed to moments. This is needed for Bayesian model instances in which a return_all parameter can be passed.

Returns

float
mean value
Expand source code
def mean(self, **kwargs) -> float:
    """Calculates the expected value

    Args:
        **kwargs: Additional arguments passed to `moments`. This is needed for Bayesian model instances in which a `return_all` parameter can be passed.

    Returns:
        float: mean value

    """
    return self.moment(1, **kwargs)
def moment(self, n: int) ‑> float

Calculates non-centered moments

Args

n : int
moment order
Expand source code
@abstractmethod
def moment(self, n: int) -> float:
    """Calculates non-centered moments

    Args:
        n (int): moment order
    """
    pass
def pdf(self, x: float) ‑> float

Calculates the probability mass or probability density function

Args

x : float
a point in the support
Expand source code
@abstractmethod
def pdf(self, x: float) -> float:
    """Calculates the probability mass or probability density function

    Args:
        x (float): a point in the support
    """
    pass
def plot(self, size: int = 1000) ‑> None

Plots a histogram of a simulated sample

Args

size : int, optional
sample size
Expand source code
def plot(self, size: int = 1000) -> None:
    """Plots a histogram of a simulated sample

    Args:
        size (int, optional): sample size

    """
    self.histogram(size)
def ppf(self, q: float) ‑> float

Calculates the corresponding quantile for a given probability value

Args

q : float
probability level
Expand source code
@abstractmethod
def ppf(self, q: float) -> float:
    """Calculates the corresponding quantile for a given probability value

    Args:
        q (float): probability level
    """
    pass
def simulate(self, size: int) ‑> numpy.ndarray

Produces simulated values from model

Args

n : int
Number of samples
Expand source code
@abstractmethod
def simulate(self, size: int) -> np.ndarray:
    """Produces simulated values from model

    Args:
        n (int): Number of samples
    """
    pass
def std(self, **kwargs) ‑> float

Calculates the standard deviation

Args

**kwargs
Additional arguments passed to moments. This is needed for Bayesian model instances in which a return_all parameter can be passed.

Returns

float
standard deviation value
Expand source code
def std(self, **kwargs) -> float:
    """Calculates the standard deviation

    Args:
        **kwargs: Additional arguments passed to `moments`. This is needed for Bayesian model instances in which a `return_all` parameter can be passed.

    Returns:
        float: standard deviation value

    """
    return np.sqrt(self.moment(2, **kwargs) - self.mean(**kwargs) ** 2)
class BayesianGPTail (**data: Any)

Generalised Pareto tail model which is fitted through Bayesian inference, using uninformative (uniform) priors for the shape and scale parameters

Args

threshold : float
modeling threshold
data : np.array, optional
exceedance data
shape : np.ndarray
sample from posterior shape distribution
scale : np.ndarray
sample from posterior scale distribution

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class BayesianGPTail(GPTailMixture):

    """Generalised Pareto tail model which is fitted through Bayesian inference, using uninformative (uniform) priors for the shape and scale parameters

    Args:
        threshold (float): modeling threshold
        data (np.array, optional): exceedance data
        shape (np.ndarray): sample from posterior shape distribution
        scale (np.ndarray): sample from posterior scale distribution
    """

    # _self.posterior_trace = None

    @classmethod
    def fit(
        cls,
        data: np.ndarray,
        threshold: float,
        max_posterior_samples: int = 1000,
        chain_length: int = 2000,
        x0: np.ndarray = None,
        plot_diagnostics: bool = False,
        n_walkers: int = 32,
        n_cores: int = 4,
        burn_in: int = 100,
        thinning: int = None,
        log_prior: t.Callable = None,
    ) -> BayesianGPTail:
        """Fits a Generalised Pareto model through Bayesian inference using exceedance data, starting with flat, uninformative priors for both and sampling from posterior shape and scale parameter distributions.

        Args:
            data (np.ndarray): observational data
            threshold (float): modeling threshold; location parameter for Generalised Pareto model
            max_posterior_samples (int, optional): Maximum number of posterior samples to keep
            chain_length (int, optional): timesteps in each chain
            x0 (np.ndarray, optional): Starting point for the chains. If None, MLE estimates are used.
            plot_diagnostics (bool, optional): If True, plots MCMC diagnostics in the background.
            n_walkers (int, optional): Number of concurrent paths to use
            n_cores (int, optional): Number of cores to use
            burn_in (int, optional): Number of initial samples to drop
            thinning (int, optional): Thinning factor to reduce autocorrelation; if None, an automatic estimate from emcee's `get_autocorr_time` is used.
            log_prior (t.Callable, optional): Function that takes as input a single length-2 iterable with scale and shape parameters and outputs the prior log-likelihood. If None, a constant prior on the valid parameter support is used.

        Returns:
            BayesianGPTail: fitted model

        """
        exceedances = data[data > threshold]
        x_max = max(data - threshold)

        # def log_likelihood(theta, data):
        #   scale, shape = theta
        #   return np.sum(gpdist.logpdf(data, c=shape, scale=scale, loc=threshold))

        if log_prior is None:

            def log_prior(theta):
                scale, shape = theta
                if scale > 0 and shape > -scale / (x_max):
                    return 0.0
                else:
                    return -np.Inf

        def log_probability(theta, data):
            prior = log_prior(theta)
            if np.isfinite(prior):
                ll = prior + GPTail.loglik(
                    theta, threshold, data
                )  # log_likelihood(theta, data)
                # print(theta, ll)
                return ll
            else:
                return -np.Inf

        exceedances = data[data > threshold]
        ndim = 2

        if x0 is None:
            # make initial guess
            mle_model = GPTail.fit(data=exceedances, threshold=threshold)
            shape, scale = mle_model.shape, mle_model.scale
            x0 = np.array([scale, shape])

        # create random walkers
        pos = x0 + 1e-4 * np.random.randn(n_walkers, ndim)

        with Pool(n_cores) as pool:
            sampler = emcee.EnsembleSampler(
                nwalkers=n_walkers,
                ndim=ndim,
                log_prob_fn=log_probability,
                args=(exceedances,),
            )
            sampler.run_mcmc(pos, chain_length, progress=True)

        samples = sampler.get_chain()

        tau = sampler.get_autocorr_time()
        thinning = int(np.round(np.mean(tau)))
        print(
            f"Using a thinning factor of {thinning} (from emcee.EnsembleSampler.get_autocorr_time)"
        )
        flat_samples = sampler.get_chain(discard=burn_in, thin=thinning, flat=True)

        if flat_samples.shape[0] > max_posterior_samples:
            np.random.shuffle(flat_samples)
            flat_samples = flat_samples[0:max_posterior_samples, :]

        n_samples = flat_samples.shape[0]
        print(f"Got {n_samples} posterior samples.")

        if plot_diagnostics:
            fig, axes = plt.subplots(2, figsize=(10, 7), sharex=True)
            labels = ["scale", "shape"]
            for i in range(ndim):
                ax = axes[i]
                ax.plot(samples[:, :, i], alpha=0.3, color=cls._figure_color_palette[0])
                ax.set_xlim(0, len(samples))
                ax.set_ylabel(labels[i])
                if i == 0:
                    ax.set_title("Chain mixing")
                ax.yaxis.set_label_coords(-0.1, 0.5)
            print("Use pyplot.show() to view chain diagnostics.")

        scale_posterior = flat_samples[:, 0]
        shape_posterior = flat_samples[:, 1]

        return cls(
            weights=1 / n_samples * np.ones((n_samples,), dtype=np.float64),
            thresholds=threshold * np.ones((n_samples,)),
            data=exceedances,
            shapes=shape_posterior,
            scales=scale_posterior,
        )

    def plot_diagnostics(self) -> matplotlib.figure.Figure:
        """Returns a figure with fit diagnostic plots for the GP model

        Returns:
            matplotlib.figure.Figure: figure

        """
        if self.data is None:
            raise ValueError("Exceedance data was not provided for this model.")

        def map_to_colors(vals):
            colours = np.zeros((len(vals), 3))
            norm = Normalize(vmin=vals.min(), vmax=vals.max())
            # Can put any colormap you like here.
            colours = [
                cm.ScalarMappable(norm=norm, cmap="cool").to_rgba(val) for val in vals
            ]
            return colours

        fig, axs = plt.subplots(3, 2)

        #################### Bayesian inference diagnostics: posterior histograms and scatterplot

        axs[0, 0].hist(
            self.scales, bins=25, edgecolor="white", color=self._figure_color_palette[0]
        )
        axs[0, 0].title.set_text("Posterior scale histogram")

        axs[0, 1].hist(
            self.shapes, bins=25, edgecolor="white", color=self._figure_color_palette[0]
        )
        axs[0, 1].title.set_text("Posterior shape histogram")

        posterior = np.concatenate(
            [self.scales.reshape((-1, 1)), self.shapes.reshape((-1, 1))], axis=1
        )
        kernel = kde(posterior.T)

        colours = map_to_colors(kernel.evaluate(posterior.T))

        axs[1, 0].scatter(self.scales, self.shapes, color=colours)
        axs[1, 0].title.set_text("Posterior sample")
        axs[1, 0].set_xlabel("Scale")
        axs[1, 0].set_ylabel("Shape")

        ############## histogram vs density ################
        hist_data = axs[1, 1].hist(
            self.data, bins=25, edgecolor="white", color=self._figure_color_palette[0]
        )

        range_min, range_max = min(self.data), max(self.data)
        x_axis = np.linspace(range_min, range_max, 100)

        mean_pdf_vals = np.array([self.pdf(x) for x in x_axis])
        # q025_pdf_vals = np.array( [np.quantile(self.pdf(x, return_all=True),0.025) for x in x_axis])
        # q975_pdf_vals = np.array( [np.quantile(self.pdf(x, return_all=True),0.975) for x in x_axis])

        y_axis = hist_data[0][0] / mean_pdf_vals[0] * mean_pdf_vals

        axs[1, 1].plot(x_axis, y_axis, color=self._figure_color_palette[1])
        # axs[1,1].fill_between(x_axis, q025_pdf_vals, q975_pdf_vals, alpha=0.2, color=self._figure_color_palette[1])
        axs[1, 1].title.set_text("Data vs fitted density")
        axs[1, 1].set_xlabel("Exceedance data")
        axs[1, 1].yaxis.set_visible(False)  # Hide only x axis
        # axs[0, 0].set_aspect('equal', 'box')

        ############# Q-Q plot ################
        probability_range = np.linspace(0.01, 0.99, 99)
        empirical_quantiles = np.quantile(self.data, probability_range)

        posterior_quantiles = [self.ppf(p, return_all=True) for p in probability_range]

        # hat_return_levels are not the mean of posterior return level samples, as the mean is not an unbiased estimator
        hat_tail_quantiles = np.array([self.ppf(p) for p in probability_range])

        # q025_tail_quantiles = np.array([np.quantile(q, 0.025) for q in posterior_quantiles])
        # q975_tail_quantiles = np.array([np.quantile(q, 0.975) for q in posterior_quantiles])

        axs[2, 0].scatter(
            hat_tail_quantiles, empirical_quantiles, color=self._figure_color_palette[0]
        )
        min_x, max_x = min(hat_tail_quantiles), max(hat_tail_quantiles)
        # axs[0,1].set_aspect('equal', 'box')
        axs[2, 0].title.set_text("Q-Q plot")
        axs[2, 0].set_xlabel("model quantiles")
        axs[2, 0].set_ylabel("Exceedance quantiles")
        axs[2, 0].grid()
        axs[2, 0].plot([min_x, max_x], [min_x, max_x], linestyle="--", color="black")
        # axs[2,0].fill_between(hat_tail_quantiles, q025_tail_quantiles, q975_tail_quantiles, alpha=0.2, color=self._figure_color_palette[1])

        plt.tight_layout()
        return fig

Ancestors

Class variables

var data : Optional[numpy.ndarray]
var scales : numpy.ndarray
var shapes : numpy.ndarray
var thresholds : numpy.ndarray
var weights : numpy.ndarray

Static methods

def fit(data: np.ndarray, threshold: float, max_posterior_samples: int = 1000, chain_length: int = 2000, x0: np.ndarray = None, plot_diagnostics: bool = False, n_walkers: int = 32, n_cores: int = 4, burn_in: int = 100, thinning: int = None, log_prior: t.Callable = None) ‑> BayesianGPTail

Fits a Generalised Pareto model through Bayesian inference using exceedance data, starting with flat, uninformative priors for both and sampling from posterior shape and scale parameter distributions.

Args

data : np.ndarray
observational data
threshold : float
modeling threshold; location parameter for Generalised Pareto model
max_posterior_samples : int, optional
Maximum number of posterior samples to keep
chain_length : int, optional
timesteps in each chain
x0 : np.ndarray, optional
Starting point for the chains. If None, MLE estimates are used.
plot_diagnostics : bool, optional
If True, plots MCMC diagnostics in the background.
n_walkers : int, optional
Number of concurrent paths to use
n_cores : int, optional
Number of cores to use
burn_in : int, optional
Number of initial samples to drop
thinning : int, optional
Thinning factor to reduce autocorrelation; if None, an automatic estimate from emcee's get_autocorr_time is used.
log_prior : t.Callable, optional
Function that takes as input a single length-2 iterable with scale and shape parameters and outputs the prior log-likelihood. If None, a constant prior on the valid parameter support is used.

Returns

BayesianGPTail
fitted model
Expand source code
@classmethod
def fit(
    cls,
    data: np.ndarray,
    threshold: float,
    max_posterior_samples: int = 1000,
    chain_length: int = 2000,
    x0: np.ndarray = None,
    plot_diagnostics: bool = False,
    n_walkers: int = 32,
    n_cores: int = 4,
    burn_in: int = 100,
    thinning: int = None,
    log_prior: t.Callable = None,
) -> BayesianGPTail:
    """Fits a Generalised Pareto model through Bayesian inference using exceedance data, starting with flat, uninformative priors for both and sampling from posterior shape and scale parameter distributions.

    Args:
        data (np.ndarray): observational data
        threshold (float): modeling threshold; location parameter for Generalised Pareto model
        max_posterior_samples (int, optional): Maximum number of posterior samples to keep
        chain_length (int, optional): timesteps in each chain
        x0 (np.ndarray, optional): Starting point for the chains. If None, MLE estimates are used.
        plot_diagnostics (bool, optional): If True, plots MCMC diagnostics in the background.
        n_walkers (int, optional): Number of concurrent paths to use
        n_cores (int, optional): Number of cores to use
        burn_in (int, optional): Number of initial samples to drop
        thinning (int, optional): Thinning factor to reduce autocorrelation; if None, an automatic estimate from emcee's `get_autocorr_time` is used.
        log_prior (t.Callable, optional): Function that takes as input a single length-2 iterable with scale and shape parameters and outputs the prior log-likelihood. If None, a constant prior on the valid parameter support is used.

    Returns:
        BayesianGPTail: fitted model

    """
    exceedances = data[data > threshold]
    x_max = max(data - threshold)

    # def log_likelihood(theta, data):
    #   scale, shape = theta
    #   return np.sum(gpdist.logpdf(data, c=shape, scale=scale, loc=threshold))

    if log_prior is None:

        def log_prior(theta):
            scale, shape = theta
            if scale > 0 and shape > -scale / (x_max):
                return 0.0
            else:
                return -np.Inf

    def log_probability(theta, data):
        prior = log_prior(theta)
        if np.isfinite(prior):
            ll = prior + GPTail.loglik(
                theta, threshold, data
            )  # log_likelihood(theta, data)
            # print(theta, ll)
            return ll
        else:
            return -np.Inf

    exceedances = data[data > threshold]
    ndim = 2

    if x0 is None:
        # make initial guess
        mle_model = GPTail.fit(data=exceedances, threshold=threshold)
        shape, scale = mle_model.shape, mle_model.scale
        x0 = np.array([scale, shape])

    # create random walkers
    pos = x0 + 1e-4 * np.random.randn(n_walkers, ndim)

    with Pool(n_cores) as pool:
        sampler = emcee.EnsembleSampler(
            nwalkers=n_walkers,
            ndim=ndim,
            log_prob_fn=log_probability,
            args=(exceedances,),
        )
        sampler.run_mcmc(pos, chain_length, progress=True)

    samples = sampler.get_chain()

    tau = sampler.get_autocorr_time()
    thinning = int(np.round(np.mean(tau)))
    print(
        f"Using a thinning factor of {thinning} (from emcee.EnsembleSampler.get_autocorr_time)"
    )
    flat_samples = sampler.get_chain(discard=burn_in, thin=thinning, flat=True)

    if flat_samples.shape[0] > max_posterior_samples:
        np.random.shuffle(flat_samples)
        flat_samples = flat_samples[0:max_posterior_samples, :]

    n_samples = flat_samples.shape[0]
    print(f"Got {n_samples} posterior samples.")

    if plot_diagnostics:
        fig, axes = plt.subplots(2, figsize=(10, 7), sharex=True)
        labels = ["scale", "shape"]
        for i in range(ndim):
            ax = axes[i]
            ax.plot(samples[:, :, i], alpha=0.3, color=cls._figure_color_palette[0])
            ax.set_xlim(0, len(samples))
            ax.set_ylabel(labels[i])
            if i == 0:
                ax.set_title("Chain mixing")
            ax.yaxis.set_label_coords(-0.1, 0.5)
        print("Use pyplot.show() to view chain diagnostics.")

    scale_posterior = flat_samples[:, 0]
    shape_posterior = flat_samples[:, 1]

    return cls(
        weights=1 / n_samples * np.ones((n_samples,), dtype=np.float64),
        thresholds=threshold * np.ones((n_samples,)),
        data=exceedances,
        shapes=shape_posterior,
        scales=scale_posterior,
    )

Methods

def plot_diagnostics(self) ‑> matplotlib.figure.Figure

Returns a figure with fit diagnostic plots for the GP model

Returns

matplotlib.figure.Figure
figure
Expand source code
def plot_diagnostics(self) -> matplotlib.figure.Figure:
    """Returns a figure with fit diagnostic plots for the GP model

    Returns:
        matplotlib.figure.Figure: figure

    """
    if self.data is None:
        raise ValueError("Exceedance data was not provided for this model.")

    def map_to_colors(vals):
        colours = np.zeros((len(vals), 3))
        norm = Normalize(vmin=vals.min(), vmax=vals.max())
        # Can put any colormap you like here.
        colours = [
            cm.ScalarMappable(norm=norm, cmap="cool").to_rgba(val) for val in vals
        ]
        return colours

    fig, axs = plt.subplots(3, 2)

    #################### Bayesian inference diagnostics: posterior histograms and scatterplot

    axs[0, 0].hist(
        self.scales, bins=25, edgecolor="white", color=self._figure_color_palette[0]
    )
    axs[0, 0].title.set_text("Posterior scale histogram")

    axs[0, 1].hist(
        self.shapes, bins=25, edgecolor="white", color=self._figure_color_palette[0]
    )
    axs[0, 1].title.set_text("Posterior shape histogram")

    posterior = np.concatenate(
        [self.scales.reshape((-1, 1)), self.shapes.reshape((-1, 1))], axis=1
    )
    kernel = kde(posterior.T)

    colours = map_to_colors(kernel.evaluate(posterior.T))

    axs[1, 0].scatter(self.scales, self.shapes, color=colours)
    axs[1, 0].title.set_text("Posterior sample")
    axs[1, 0].set_xlabel("Scale")
    axs[1, 0].set_ylabel("Shape")

    ############## histogram vs density ################
    hist_data = axs[1, 1].hist(
        self.data, bins=25, edgecolor="white", color=self._figure_color_palette[0]
    )

    range_min, range_max = min(self.data), max(self.data)
    x_axis = np.linspace(range_min, range_max, 100)

    mean_pdf_vals = np.array([self.pdf(x) for x in x_axis])
    # q025_pdf_vals = np.array( [np.quantile(self.pdf(x, return_all=True),0.025) for x in x_axis])
    # q975_pdf_vals = np.array( [np.quantile(self.pdf(x, return_all=True),0.975) for x in x_axis])

    y_axis = hist_data[0][0] / mean_pdf_vals[0] * mean_pdf_vals

    axs[1, 1].plot(x_axis, y_axis, color=self._figure_color_palette[1])
    # axs[1,1].fill_between(x_axis, q025_pdf_vals, q975_pdf_vals, alpha=0.2, color=self._figure_color_palette[1])
    axs[1, 1].title.set_text("Data vs fitted density")
    axs[1, 1].set_xlabel("Exceedance data")
    axs[1, 1].yaxis.set_visible(False)  # Hide only x axis
    # axs[0, 0].set_aspect('equal', 'box')

    ############# Q-Q plot ################
    probability_range = np.linspace(0.01, 0.99, 99)
    empirical_quantiles = np.quantile(self.data, probability_range)

    posterior_quantiles = [self.ppf(p, return_all=True) for p in probability_range]

    # hat_return_levels are not the mean of posterior return level samples, as the mean is not an unbiased estimator
    hat_tail_quantiles = np.array([self.ppf(p) for p in probability_range])

    # q025_tail_quantiles = np.array([np.quantile(q, 0.025) for q in posterior_quantiles])
    # q975_tail_quantiles = np.array([np.quantile(q, 0.975) for q in posterior_quantiles])

    axs[2, 0].scatter(
        hat_tail_quantiles, empirical_quantiles, color=self._figure_color_palette[0]
    )
    min_x, max_x = min(hat_tail_quantiles), max(hat_tail_quantiles)
    # axs[0,1].set_aspect('equal', 'box')
    axs[2, 0].title.set_text("Q-Q plot")
    axs[2, 0].set_xlabel("model quantiles")
    axs[2, 0].set_ylabel("Exceedance quantiles")
    axs[2, 0].grid()
    axs[2, 0].plot([min_x, max_x], [min_x, max_x], linestyle="--", color="black")
    # axs[2,0].fill_between(hat_tail_quantiles, q025_tail_quantiles, q975_tail_quantiles, alpha=0.2, color=self._figure_color_palette[1])

    plt.tight_layout()
    return fig

Inherited members

class Binned (**data: Any)

Empirical distribution with an integer support. This allows it to be convolved with other integer distribution to obtain the distribution of a sum of random variables, assuming independence between the summands.

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class Binned(Empirical):

    """Empirical distribution with an integer support. This allows it to be convolved with other integer distribution to obtain the distribution of a sum of random variables, assuming independence between the summands."""

    _supported_types = [np.int64, int]

    def __repr__(self):
        return f"Integer distribution with support of size {len(self.pdf_values)} ({np.sum(self.pdf_values> 0)} non-zero)"

    @validator("support", allow_reuse=True)
    def integer_support(cls, support):
        n = len(support)
        if not np.all(np.diff(support) == 1):
            raise ValueError(
                "The support vector must contain every integer between its minimum and maximum value"
            )
        elif support.dtype not in cls._supported_types:
            raise ValueError(
                f"Support entry types must be one of {self._supported_types}"
            )
        else:
            return support

    def __mul__(self, factor: self._allowed_scalar_types):

        if not isinstance(factor, self._allowed_scalar_types) or factor == 0:
            raise TypeError(
                f"multiplication is supported only for nonzero instances of type:{self._allowed_scalar_types}"
            )

        if isinstance(factor, int):
            return self.from_empirical(float(factor) * self)
            # new_data = None if self.data is None else self.data*factor
            # return Binned(support = factor*self.support, pdf_values = self.pdf_values, data = new_data)

        else:
            return super().__mul__(factor)

    def __add__(self, other: t.Union[float, int, Binned, GPTail, Mixture]):

        if isinstance(other, int):
            new_support = np.arange(
                min(self.support) + other, max(self.support) + other + 1
            )
            new_data = None if self.data is None else self.data + other
            return Binned(
                support=new_support,
                pdf_values=np.copy(self.pdf_values, order="C"),
                data=new_data,
            )

        if isinstance(other, Binned):
            new_support = np.arange(
                min(self.support) + min(other.support),
                max(self.support) + max(other.support) + 1,
            )

            pdf_vals = fftconvolve(self.pdf_values, other.pdf_values)
            pdf_vals = np.abs(
                pdf_vals
            )  # some values are negative due to numerical rounding error, set to positive (they are infinitesimal in any case)
            pdf_vals = pdf_vals / np.sum(pdf_vals)

            # this block removes trailing support elements with zero probability
            while pdf_vals[-1] == 0.0:
                new_support = new_support[0 : len(pdf_vals) - 1]
                pdf_vals = pdf_vals[0 : len(pdf_vals) - 1]

            # add any missing probability mass
            error = 1.0 - np.sum(pdf_vals)
            pdf_vals[0] += np.sign(error) * np.abs(error)

            return Binned(
                support=new_support,
                pdf_values=np.abs(pdf_vals),  # some negative values persist
                data=None,
            )

        else:
            return super().__add__(other)

    def __sub__(self, other: float):

        if isinstance(other, (int, Binned)):
            return self + (-other)
        else:
            super().__sub__(other)

    def __neg__(self):

        return super().__neg__().to_integer()

    @classmethod
    def from_data(cls, data: np.ndarray) -> Binned:
        """Instantiates a Binned object from observed data

        Args:
            data (np.ndarray): Observed data

        Returns:
            Binned: Integer distribution object
        """
        data = np.array(data)
        if data.dtype not in self._supported_types:
            warnings.warn(
                "Casting input data to integer values by rounding", stacklevel=2
            )
            data = data.astype(np.int64)

        return super().from_data(data).to_integer()

    @classmethod
    def from_empirical(cls, dist: Empirical) -> Binned:
        """Takes an Empirical instance with discrete support and creates a Binned instance by casting the support to integer values and filling the gaps in the support


        Args:
            empirical_dist (Empirical): Empirical instance

        Returns:
            Binned: Binned distribution


        """
        empirical_dist = dist.map(lambda x: np.round(x))
        base_support = empirical_dist.support.astype(Binned._supported_types[0])
        full_support = np.arange(min(base_support), max(base_support) + 1)

        full_pdf = np.zeros(
            (
                len(
                    full_support,
                )
            ),
            dtype=np.float64,
        )
        indices = base_support - min(base_support)
        full_pdf[indices] = empirical_dist.pdf_values
        # try:
        #   full_pdf[indices] = empirical_dist.pdf_values
        # except Exception as e:
        #   print(f"base support: {base_support}")
        #   print(f"full support: {full_support}")

        data = (
            None
            if empirical_dist.data is None
            else empirical_dist.data.astype(Binned._supported_types[0])
        )

        return Binned(
            support=full_support.astype(cls._supported_types[0]),
            pdf_values=full_pdf,
            data=data,
        )

    def pdf(self, x: t.Union[float, np.ndarray], **kwargs):

        if isinstance(x, np.ndarray):
            return np.array([self.pdf(elem) for elem in x])

        if not isinstance(x, (int, np.int32, np.int64)) or x > self.max or x < self.min:
            return 0.0
        else:
            return self.pdf_values[x - self.min]

Ancestors

Subclasses

Class variables

var data : Optional[numpy.ndarray]
var pdf_values : numpy.ndarray
var support : numpy.ndarray

Static methods

def from_data(data: np.ndarray) ‑> Binned

Instantiates a Binned object from observed data

Args

data : np.ndarray
Observed data

Returns

Binned
Integer distribution object
Expand source code
@classmethod
def from_data(cls, data: np.ndarray) -> Binned:
    """Instantiates a Binned object from observed data

    Args:
        data (np.ndarray): Observed data

    Returns:
        Binned: Integer distribution object
    """
    data = np.array(data)
    if data.dtype not in self._supported_types:
        warnings.warn(
            "Casting input data to integer values by rounding", stacklevel=2
        )
        data = data.astype(np.int64)

    return super().from_data(data).to_integer()
def from_empirical(dist: Empirical) ‑> Binned

Takes an Empirical instance with discrete support and creates a Binned instance by casting the support to integer values and filling the gaps in the support

Args

empirical_dist : Empirical
Empirical instance

Returns

Binned
Binned distribution
Expand source code
@classmethod
def from_empirical(cls, dist: Empirical) -> Binned:
    """Takes an Empirical instance with discrete support and creates a Binned instance by casting the support to integer values and filling the gaps in the support


    Args:
        empirical_dist (Empirical): Empirical instance

    Returns:
        Binned: Binned distribution


    """
    empirical_dist = dist.map(lambda x: np.round(x))
    base_support = empirical_dist.support.astype(Binned._supported_types[0])
    full_support = np.arange(min(base_support), max(base_support) + 1)

    full_pdf = np.zeros(
        (
            len(
                full_support,
            )
        ),
        dtype=np.float64,
    )
    indices = base_support - min(base_support)
    full_pdf[indices] = empirical_dist.pdf_values
    # try:
    #   full_pdf[indices] = empirical_dist.pdf_values
    # except Exception as e:
    #   print(f"base support: {base_support}")
    #   print(f"full support: {full_support}")

    data = (
        None
        if empirical_dist.data is None
        else empirical_dist.data.astype(Binned._supported_types[0])
    )

    return Binned(
        support=full_support.astype(cls._supported_types[0]),
        pdf_values=full_pdf,
        data=data,
    )
def integer_support(support)
Expand source code
@validator("support", allow_reuse=True)
def integer_support(cls, support):
    n = len(support)
    if not np.all(np.diff(support) == 1):
        raise ValueError(
            "The support vector must contain every integer between its minimum and maximum value"
        )
    elif support.dtype not in cls._supported_types:
        raise ValueError(
            f"Support entry types must be one of {self._supported_types}"
        )
    else:
        return support

Inherited members

class Empirical (**data: Any)

Model for an empirical probability distribution, induced by a sample of data.

Args

support : np.ndarray
distribution support
pdf_values : np.ndarray
pdf array
data : np.array, optional
data

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class Empirical(BaseDistribution):

    """Model for an empirical probability distribution, induced by a sample of data.

    Args:
        support (np.ndarray): distribution support
        pdf_values (np.ndarray): pdf array
        data (np.array, optional): data
    """

    _sum_compatible = (GPTail, Mixture, GPTailMixture)

    support: np.ndarray
    pdf_values: np.ndarray
    data: t.Optional[np.ndarray]

    def __repr__(self):
        if self.data is not None:
            return f"Empirical distribution with {len(self.data)} points"
        else:
            return f"Discrete distribution with support of size {len(self.pdf_values)}"

    @validator("pdf_values", allow_reuse=True)
    def check_pdf_values(cls, pdf_values):
        if np.any(pdf_values < -cls._error_tol):
            raise ValueError("There are negative pdf values")
        if not np.isclose(np.sum(pdf_values), 1, atol=cls._error_tol):
            print(f"sum: {np.sum(pdf_values)}, pdf vals: {pdf_values}")
            raise ValueError("pdf values don't sum 1")
        # pdf_values = np.clip(pdf_values, a_min = 0.0, a_max = 1.0)
        # # normalise
        # pdf_values = pdf_values/np.sum(pdf_values)

        return pdf_values

    @property
    def is_valid(self):
        n = len(self.support)
        m = len(self.pdf_values)

        if n != m:
            raise ValueError("Lengths of support and pdf arrays don't match")

        if not np.all(np.diff(self.support) > 0):
            raise ValueError("Support array must be in increasing order")

        return True

    @property
    def pdf_lookup(self):
        """Mapping from values in the support to their probability mass"""
        return {key: val for key, val in zip(self.support, self.pdf_values)}

    @property
    def min(self):
        """Minimum value in the support"""
        return self.support[0]

    @property
    def max(self):
        """Maximum value in the support"""
        return self.support[-1]

    @property
    def cdf_values(self):
        """Mapping from values in the support to their cumulative probability"""
        x = np.cumsum(self.pdf_values)
        # make sure cdf reaches 1
        x[-1] = 1.0
        return x

    @property
    def ecdf(self):
        """Mapping from values in the support to their cumulative probability"""
        cdf_vals = np.cumsum(self.pdf_values)
        # make sure cdf reaches 1
        cdf_vals[-1] = 1.0

        return ed.StepFunction(x=self.support, y=cdf_vals, side="right")

    @property
    def ecdf_inv(self):
        """Linearly interpolated mapping from probability values to their quantiles"""
        return ed.monotone_fn_inverter(self.ecdf, self.support)

    def __mul__(self, factor: float):

        if not isinstance(factor, self._allowed_scalar_types) or factor == 0:
            raise TypeError(
                f"multiplication is supported only for nonzero instances of type:{self._allowed_scalar_types}"
            )

        new_data = None if self.data is None else self.data * factor
        return Empirical(
            support=factor * self.support,
            pdf_values=np.copy(self.pdf_values, order="C"),
            data=new_data,
        )

    def __add__(self, other: t.Union[int, float, GPTail, Mixture, GPTailMixture]):

        if isinstance(other, self._allowed_scalar_types):
            new_data = None if self.data is None else self.data + other
            return Empirical(
                support=self.support + other,
                pdf_values=np.copy(self.pdf_values, order="C"),
                data=new_data,
            )

        elif isinstance(other, GPTail):

            indices = (self.pdf_values > 0).nonzero()[0]
            nz_pdf = self.pdf_values[indices]
            nz_support = self.support[indices]
            # mixed GPTails don't carry over exceedance data for efficient memory use
            # dists = [GPTail(threshold=other.threshold + x, scale=other.scale, shape=other.shape) for x in nz_support]

            return GPTailMixture(
                data=other.data,
                weights=nz_pdf,
                thresholds=other.threshold + nz_support,
                scales=np.array([other.scale for w in nz_support]),
                shapes=np.array([other.shape for w in nz_support]),
            )

        elif isinstance(other, Mixture):
            return Mixture(
                weights=other.weights,
                distributions=[self + dist for dist in other.distributions],
            )

        elif isinstance(other, GPTailMixture):
            # Return a new mixture where the old mixture is replicated for each point in the discrete support
            new_weights = np.concatenate(
                [w * other.weights for w in self.pdf_values], axis=0
            )
            new_th = np.concatenate(
                [other.thresholds + t for t in self.support], axis=0
            )
            new_scales = np.tile(other.scales, len(self.support))
            new_shapes = np.tile(other.shapes, len(self.support))
            return GPTailMixture(
                data=other.data,
                weights=new_weights[new_weights > 0],
                thresholds=new_th[new_weights > 0],
                scales=new_scales[new_weights > 0],
                shapes=new_shapes[new_weights > 0],
            )

        else:
            raise TypeError(f"+ is supported only for types: {self._sum_compatible}")

    def __neg__(self):

        return self.map(lambda x: -x)

    def __sub__(self, other: Empirical):

        if isinstance(other, (Empirical,) + self._allowed_scalar_types):

            return self + (-other)

        else:
            raise TypeError(
                "Subtraction is only defined for instances of Empirical or float "
            )

    def __ge__(self, other: float):

        if not isinstance(other, self._allowed_scalar_types):
            raise TypeError(
                f">= is implemented for instances of types : {self._allowed_scalar_types}"
            )

        if 1 - self.cdf(other) + self.pdf(other) == 0.0:
            raise ValueError(
                f"No probability mass above conditional threshold ({other})."
            )

        index = self.support >= other

        new_data = None if self.data is None else self.data[self.data >= other]

        pdf_vals = self.pdf_values[index] / np.sum(self.pdf_values[index])

        return type(self)(
            support=self.support[index], pdf_values=pdf_vals, data=new_data
        )

    def __gt__(self, other: float):

        if not isinstance(other, self._allowed_scalar_types):
            raise TypeError(
                f"> is implemented for instances of types: {self._allowed_scalar_types}"
            )

        if 1 - self.cdf(other) == 0:
            raise ValueError(
                f"No probability mass above conditional threshold ({other})."
            )

        index = self.support > other

        new_data = None if self.data is None else self.data[self.data > other]

        return type(self)(
            support=self.support[index],
            pdf_values=self.pdf_values[index] / np.sum(self.pdf_values[index]),
            data=new_data,
        )

    def simulate(self, size: int) -> np.ndarray:
        """Draws simulated values from the distribution

        Args:
            size (int): Sample size

        Returns:
            np.ndarray: simulated sample
        """
        return np.random.choice(self.support, size=size, p=self.pdf_values)

    def moment(self, n: int, **kwargs) -> float:
        """Evaluates the n-th moment of the distribution

        Args:
            n (int): moment order
            **kwargs: dummy additional arguments (not used)

        Returns:
            float: n-th moment value
        """
        return np.sum(self.pdf_values * self.support**n)

    def ppf(
        self, q: t.Union[float, np.ndarray], **kwargs
    ) -> t.Union[float, np.ndarray]:
        """Inverse CDF function; it uses linear interpolation.

        Args:
            q (t.Union[float, np.ndarray]): probability level

        Returns:
            t.Union[float, np.ndarray]: Linearly interpolated quantile function

        """
        is_scalar = isinstance(q, self._allowed_scalar_types)

        if is_scalar:
            q = np.array([q])

        if np.any(q < 0) or np.any(q > 1):
            raise ValueError(f"q needs to be in the interval [0,1]")

        ppf_values = np.empty((len(q),))

        left_vals_idx = q <= self.ecdf_inv.x[0]
        right_vals_idx = q >= self.ecdf_inv.x[-1]
        inside_vals_idx = np.logical_and(
            np.logical_not(left_vals_idx), np.logical_not(right_vals_idx)
        )

        ppf_values[left_vals_idx] = self.ecdf_inv.y[0]
        ppf_values[right_vals_idx] = self.ecdf_inv.y[-1]
        ppf_values[inside_vals_idx] = self.ecdf_inv(q[inside_vals_idx])

        if is_scalar:
            return ppf_values[0]
        else:
            return ppf_values

    def cdf(
        self, x: t.Union[float, np.ndarray], **kwargs
    ) -> t.Union[float, np.ndarray]:
        return self.ecdf(x)

    def pdf(self, x: t.Union[float, np.ndarray], **kwargs):

        if isinstance(x, np.ndarray):
            return np.array([self.pdf(elem) for elem in x])

        try:
            pdf_val = self.pdf_lookup[x]
            return pdf_val
        except KeyError as e:
            return 0.0

    def std(self, **kwargs):
        return np.sqrt(self.map(lambda x: x - self.mean()).moment(2))

    @classmethod
    def from_data(cls, data: np.array):

        support, unnorm_pdf = np.unique(data, return_counts=True)
        n = np.sum(unnorm_pdf)
        return cls(support=support, pdf_values=unnorm_pdf / n, data=data)

    def plot_mean_residual_life(self, threshold: float) -> matplotlib.figure.Figure:
        """Produces a mean residual life plot for the tail of the distribution above a given threshold

        Args:
            threshold (float): threshold value

        Returns:
            matplotlib.figure.Figure: figure


        """
        fig = plt.figure()
        fitted = self.fit_tail_model(threshold)
        scale, shape = fitted.tail.scale, fitted.tail.shape

        x_vals = np.linspace(threshold, max(self.data))
        if shape >= 1:
            raise ValueError(
                f"Expectation is not finite: fitted shape parameter is {shape}"
            )
        # y_vals = (scale + shape*x_vals)/(1-shape)
        y_vals = np.array(
            [np.mean(fitted.tail.data[fitted.tail.data >= x]) for x in x_vals]
        )
        plt.plot(x_vals, y_vals, color=self._figure_color_palette[0])
        plt.scatter(x_vals, y_vals, color=self._figure_color_palette[0])
        plt.title("Mean residual life plot")
        plt.xlabel("Threshold")
        plt.ylabel("Mean exceedance")
        return fig

    def fit_tail_model(
        self, threshold: float, bayesian=False, **kwargs
    ) -> t.Union[EmpiricalWithGPTail, EmpiricalWithBayesianGPTail]:
        """Fits a tail GP model above a specified threshold and return the fitted semiparametric model

        Args:
            threshold (float): Threshold above which a Generalised Pareto distribution will be fitted
            bayesian (bool, optional): If True, fit model through Bayesian inference
            **kwargs: Additional parameters passed to BayesianGPTail.fit or to GPTail.fit

        Returns:
            t.Union[EmpiricalWithGPTail, EmpiricalWithBayesianGPTail]

        """

        if threshold >= self.max:
            raise ValueError(
                "Empirical pdf is 0 above the provided threshold. Select a lower threshold for estimation."
            )

        if self.data is None:
            raise ValueError(
                "Data is not set for this distribution, so a tail model cannot be fitted. You can simulate from it and use the sampled data instead"
            )
        else:
            data = self.data

        if bayesian:
            return EmpiricalWithBayesianGPTail.from_data(
                data, threshold, bin_empirical=isinstance(self, Binned), **kwargs
            )
        else:
            return EmpiricalWithGPTail.from_data(
                data, threshold, bin_empirical=isinstance(self, Binned), **kwargs
            )

    def map(self, f: t.Callable) -> Empirical:
        """Returns the distribution resulting from an arbitrary transformation

        Args:
            f (t.Callable): Target transformation; it should take a numpy array as input

        """
        dist_df = pd.DataFrame({"pdf": self.pdf_values, "support": f(self.support)})
        mapped_dist_df = (
            dist_df.groupby("support").sum().reset_index().sort_values("support")
        )

        return Empirical(
            support=np.array(mapped_dist_df["support"]),
            pdf_values=np.array(mapped_dist_df["pdf"]),
            data=f(self.data) if self.data is not None else None,
        )

    def to_integer(self):
        """Convert to Binned distribution"""
        return Binned.from_empirical(self)

Ancestors

Subclasses

Class variables

var data : Optional[numpy.ndarray]
var pdf_values : numpy.ndarray
var support : numpy.ndarray

Static methods

def check_pdf_values(pdf_values)
Expand source code
@validator("pdf_values", allow_reuse=True)
def check_pdf_values(cls, pdf_values):
    if np.any(pdf_values < -cls._error_tol):
        raise ValueError("There are negative pdf values")
    if not np.isclose(np.sum(pdf_values), 1, atol=cls._error_tol):
        print(f"sum: {np.sum(pdf_values)}, pdf vals: {pdf_values}")
        raise ValueError("pdf values don't sum 1")
    # pdf_values = np.clip(pdf_values, a_min = 0.0, a_max = 1.0)
    # # normalise
    # pdf_values = pdf_values/np.sum(pdf_values)

    return pdf_values
def from_data(data: np.array)
Expand source code
@classmethod
def from_data(cls, data: np.array):

    support, unnorm_pdf = np.unique(data, return_counts=True)
    n = np.sum(unnorm_pdf)
    return cls(support=support, pdf_values=unnorm_pdf / n, data=data)

Instance variables

var cdf_values

Mapping from values in the support to their cumulative probability

Expand source code
@property
def cdf_values(self):
    """Mapping from values in the support to their cumulative probability"""
    x = np.cumsum(self.pdf_values)
    # make sure cdf reaches 1
    x[-1] = 1.0
    return x
var ecdf

Mapping from values in the support to their cumulative probability

Expand source code
@property
def ecdf(self):
    """Mapping from values in the support to their cumulative probability"""
    cdf_vals = np.cumsum(self.pdf_values)
    # make sure cdf reaches 1
    cdf_vals[-1] = 1.0

    return ed.StepFunction(x=self.support, y=cdf_vals, side="right")
var ecdf_inv

Linearly interpolated mapping from probability values to their quantiles

Expand source code
@property
def ecdf_inv(self):
    """Linearly interpolated mapping from probability values to their quantiles"""
    return ed.monotone_fn_inverter(self.ecdf, self.support)
var is_valid
Expand source code
@property
def is_valid(self):
    n = len(self.support)
    m = len(self.pdf_values)

    if n != m:
        raise ValueError("Lengths of support and pdf arrays don't match")

    if not np.all(np.diff(self.support) > 0):
        raise ValueError("Support array must be in increasing order")

    return True
var max

Maximum value in the support

Expand source code
@property
def max(self):
    """Maximum value in the support"""
    return self.support[-1]
var min

Minimum value in the support

Expand source code
@property
def min(self):
    """Minimum value in the support"""
    return self.support[0]
var pdf_lookup

Mapping from values in the support to their probability mass

Expand source code
@property
def pdf_lookup(self):
    """Mapping from values in the support to their probability mass"""
    return {key: val for key, val in zip(self.support, self.pdf_values)}

Methods

def fit_tail_model(self, threshold: float, bayesian=False, **kwargs) ‑> Union[EmpiricalWithGPTailEmpiricalWithBayesianGPTail]

Fits a tail GP model above a specified threshold and return the fitted semiparametric model

Args

threshold : float
Threshold above which a Generalised Pareto distribution will be fitted
bayesian : bool, optional
If True, fit model through Bayesian inference
**kwargs
Additional parameters passed to BayesianGPTail.fit or to GPTail.fit

Returns

t.Union[EmpiricalWithGPTail, EmpiricalWithBayesianGPTail]

Expand source code
def fit_tail_model(
    self, threshold: float, bayesian=False, **kwargs
) -> t.Union[EmpiricalWithGPTail, EmpiricalWithBayesianGPTail]:
    """Fits a tail GP model above a specified threshold and return the fitted semiparametric model

    Args:
        threshold (float): Threshold above which a Generalised Pareto distribution will be fitted
        bayesian (bool, optional): If True, fit model through Bayesian inference
        **kwargs: Additional parameters passed to BayesianGPTail.fit or to GPTail.fit

    Returns:
        t.Union[EmpiricalWithGPTail, EmpiricalWithBayesianGPTail]

    """

    if threshold >= self.max:
        raise ValueError(
            "Empirical pdf is 0 above the provided threshold. Select a lower threshold for estimation."
        )

    if self.data is None:
        raise ValueError(
            "Data is not set for this distribution, so a tail model cannot be fitted. You can simulate from it and use the sampled data instead"
        )
    else:
        data = self.data

    if bayesian:
        return EmpiricalWithBayesianGPTail.from_data(
            data, threshold, bin_empirical=isinstance(self, Binned), **kwargs
        )
    else:
        return EmpiricalWithGPTail.from_data(
            data, threshold, bin_empirical=isinstance(self, Binned), **kwargs
        )
def map(self, f: t.Callable) ‑> Empirical

Returns the distribution resulting from an arbitrary transformation

Args

f : t.Callable
Target transformation; it should take a numpy array as input
Expand source code
def map(self, f: t.Callable) -> Empirical:
    """Returns the distribution resulting from an arbitrary transformation

    Args:
        f (t.Callable): Target transformation; it should take a numpy array as input

    """
    dist_df = pd.DataFrame({"pdf": self.pdf_values, "support": f(self.support)})
    mapped_dist_df = (
        dist_df.groupby("support").sum().reset_index().sort_values("support")
    )

    return Empirical(
        support=np.array(mapped_dist_df["support"]),
        pdf_values=np.array(mapped_dist_df["pdf"]),
        data=f(self.data) if self.data is not None else None,
    )
def moment(self, n: int, **kwargs) ‑> float

Evaluates the n-th moment of the distribution

Args

n : int
moment order
**kwargs
dummy additional arguments (not used)

Returns

float
n-th moment value
Expand source code
def moment(self, n: int, **kwargs) -> float:
    """Evaluates the n-th moment of the distribution

    Args:
        n (int): moment order
        **kwargs: dummy additional arguments (not used)

    Returns:
        float: n-th moment value
    """
    return np.sum(self.pdf_values * self.support**n)
def plot_mean_residual_life(self, threshold: float) ‑> matplotlib.figure.Figure

Produces a mean residual life plot for the tail of the distribution above a given threshold

Args

threshold : float
threshold value

Returns

matplotlib.figure.Figure
figure
Expand source code
def plot_mean_residual_life(self, threshold: float) -> matplotlib.figure.Figure:
    """Produces a mean residual life plot for the tail of the distribution above a given threshold

    Args:
        threshold (float): threshold value

    Returns:
        matplotlib.figure.Figure: figure


    """
    fig = plt.figure()
    fitted = self.fit_tail_model(threshold)
    scale, shape = fitted.tail.scale, fitted.tail.shape

    x_vals = np.linspace(threshold, max(self.data))
    if shape >= 1:
        raise ValueError(
            f"Expectation is not finite: fitted shape parameter is {shape}"
        )
    # y_vals = (scale + shape*x_vals)/(1-shape)
    y_vals = np.array(
        [np.mean(fitted.tail.data[fitted.tail.data >= x]) for x in x_vals]
    )
    plt.plot(x_vals, y_vals, color=self._figure_color_palette[0])
    plt.scatter(x_vals, y_vals, color=self._figure_color_palette[0])
    plt.title("Mean residual life plot")
    plt.xlabel("Threshold")
    plt.ylabel("Mean exceedance")
    return fig
def ppf(self, q: t.Union[float, np.ndarray], **kwargs) ‑> Union[float, numpy.ndarray]

Inverse CDF function; it uses linear interpolation.

Args

q : t.Union[float, np.ndarray]
probability level

Returns

t.Union[float, np.ndarray]
Linearly interpolated quantile function
Expand source code
def ppf(
    self, q: t.Union[float, np.ndarray], **kwargs
) -> t.Union[float, np.ndarray]:
    """Inverse CDF function; it uses linear interpolation.

    Args:
        q (t.Union[float, np.ndarray]): probability level

    Returns:
        t.Union[float, np.ndarray]: Linearly interpolated quantile function

    """
    is_scalar = isinstance(q, self._allowed_scalar_types)

    if is_scalar:
        q = np.array([q])

    if np.any(q < 0) or np.any(q > 1):
        raise ValueError(f"q needs to be in the interval [0,1]")

    ppf_values = np.empty((len(q),))

    left_vals_idx = q <= self.ecdf_inv.x[0]
    right_vals_idx = q >= self.ecdf_inv.x[-1]
    inside_vals_idx = np.logical_and(
        np.logical_not(left_vals_idx), np.logical_not(right_vals_idx)
    )

    ppf_values[left_vals_idx] = self.ecdf_inv.y[0]
    ppf_values[right_vals_idx] = self.ecdf_inv.y[-1]
    ppf_values[inside_vals_idx] = self.ecdf_inv(q[inside_vals_idx])

    if is_scalar:
        return ppf_values[0]
    else:
        return ppf_values
def simulate(self, size: int) ‑> numpy.ndarray

Draws simulated values from the distribution

Args

size : int
Sample size

Returns

np.ndarray
simulated sample
Expand source code
def simulate(self, size: int) -> np.ndarray:
    """Draws simulated values from the distribution

    Args:
        size (int): Sample size

    Returns:
        np.ndarray: simulated sample
    """
    return np.random.choice(self.support, size=size, p=self.pdf_values)
def to_integer(self)

Convert to Binned distribution

Expand source code
def to_integer(self):
    """Convert to Binned distribution"""
    return Binned.from_empirical(self)

Inherited members

class EmpiricalWithBayesianGPTail (**data: Any)

Semiparametric Bayesian model with an empirical data distribution below a specified threshold and a Generalised Pareto exceedance model above it, fitted through Bayesian inference.

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class EmpiricalWithBayesianGPTail(EmpiricalWithGPTail):

    """Semiparametric Bayesian model with an empirical data distribution below a specified threshold and a Generalised Pareto exceedance model above it, fitted through Bayesian inference."""

    @classmethod
    def from_data(
        cls, data: np.ndarray, threshold: float, bin_empirical: bool = False, **kwargs
    ) -> EmpiricalWithBayesianGPTail:
        """Fits a Generalied Pareto tail model from a given data array and threshold value, using Jeffrey's priors

        Args:
            data (np.ndarray): data array
            threshold (float): Threshold value to use for the tail model
            bin_empirical (bool, optional): Whether to cast empirical mixture component to an integer distribution by rounding
            **kwargs: Additional arguments to be passed to BayesianGPTail.fit

        Returns:
            EmpiricalWithBayesianGPTail: Fitted model

        Deleted Parameters:
            n_posterior_samples (int): Number of samples from posterior distribution

        """
        exs_prob = 1 - Empirical.from_data(data).cdf(threshold)

        exceedances = data[data > threshold]

        empirical = Empirical.from_data(data[data <= threshold])
        if bin_empirical:
            empirical = empirical.to_integer()

        tail = BayesianGPTail.fit(data=exceedances, threshold=threshold, **kwargs)

        return cls(
            weights=np.array([1 - exs_prob, exs_prob]),
            distributions=[empirical, tail],
            threshold=threshold,
        )

    def ppf(
        self, q: t.Union[float, np.ndarray], return_all=False
    ) -> t.Union[float, np.ndarray]:
        """Returns the quantile function evaluated at some probability level

        Args:
            q (t.Union[float, np.ndarray]): probability level
            return_all (bool, optional): If True, returns posterior ppf sample; otherwise return pointwise estimator

        Returns:
            t.Union[float, np.ndarray]: ppf value(s).
        """
        if isinstance(q, np.ndarray):
            return np.array([self.ppf(elem) for elem in q])

        if q <= 1 - self.exs_prob:
            val = self.empirical.ppf(q / (1 - self.exs_prob), return_all=return_all)
            # if a vector is expected as output, vectorize scalar
            if return_all:
                return val * np.ones((len(self.tail.shapes),))
            else:
                return val
        else:
            return self.tail.ppf(
                (q - (1 - self.exs_prob)) / self.exs_prob, return_all=return_all
            )

    def plot_return_levels(self) -> matplotlib.figure.Figure:
        """Returns a figure with a return levels using the fitted tail model

        Returns:
            matplotlib.figure.Figure: figure

        """
        fig = plt.figure()
        exs_prob = self.exs_prob

        exceedance_frequency = 1 / np.logspace(1, 4, 20)
        exceedance_frequency = exceedance_frequency[
            exceedance_frequency < exs_prob
        ]  # plot only levels inside fitted tail model

        return_levels = [self.ppf(1 - x, return_all=True) for x in exceedance_frequency]

        # hat_return_levels are not the mean of posterior return level samples, as the mean is not an unbiased estimator
        hat_return_levels = np.array(
            [self.ppf(1 - x, return_all=False) for x in exceedance_frequency]
        )
        q025_return_levels = np.array([np.quantile(r, 0.025) for r in return_levels])
        q975_return_levels = np.array([np.quantile(r, 0.975) for r in return_levels])

        plt.plot(
            1.0 / exceedance_frequency,
            hat_return_levels,
            color=self._figure_color_palette[0],
        )
        plt.fill_between(
            1.0 / exceedance_frequency,
            q025_return_levels,
            q975_return_levels,
            alpha=0.2,
            color=self._figure_color_palette[1],
            linestyle="dashed",
        )
        plt.xscale("log")
        plt.title("Exceedance return levels")
        plt.xlabel("1/frequency")
        plt.ylabel("Return level")
        plt.grid()

        return fig

Ancestors

Class variables

var threshold : float

Static methods

def from_data(data: np.ndarray, threshold: float, bin_empirical: bool = False, **kwargs) ‑> EmpiricalWithBayesianGPTail

Fits a Generalied Pareto tail model from a given data array and threshold value, using Jeffrey's priors

Args

data : np.ndarray
data array
threshold : float
Threshold value to use for the tail model
bin_empirical : bool, optional
Whether to cast empirical mixture component to an integer distribution by rounding
**kwargs
Additional arguments to be passed to BayesianGPTail.fit

Returns

EmpiricalWithBayesianGPTail
Fitted model

Deleted Parameters: n_posterior_samples (int): Number of samples from posterior distribution

Expand source code
@classmethod
def from_data(
    cls, data: np.ndarray, threshold: float, bin_empirical: bool = False, **kwargs
) -> EmpiricalWithBayesianGPTail:
    """Fits a Generalied Pareto tail model from a given data array and threshold value, using Jeffrey's priors

    Args:
        data (np.ndarray): data array
        threshold (float): Threshold value to use for the tail model
        bin_empirical (bool, optional): Whether to cast empirical mixture component to an integer distribution by rounding
        **kwargs: Additional arguments to be passed to BayesianGPTail.fit

    Returns:
        EmpiricalWithBayesianGPTail: Fitted model

    Deleted Parameters:
        n_posterior_samples (int): Number of samples from posterior distribution

    """
    exs_prob = 1 - Empirical.from_data(data).cdf(threshold)

    exceedances = data[data > threshold]

    empirical = Empirical.from_data(data[data <= threshold])
    if bin_empirical:
        empirical = empirical.to_integer()

    tail = BayesianGPTail.fit(data=exceedances, threshold=threshold, **kwargs)

    return cls(
        weights=np.array([1 - exs_prob, exs_prob]),
        distributions=[empirical, tail],
        threshold=threshold,
    )

Methods

def plot_return_levels(self) ‑> matplotlib.figure.Figure

Returns a figure with a return levels using the fitted tail model

Returns

matplotlib.figure.Figure
figure
Expand source code
def plot_return_levels(self) -> matplotlib.figure.Figure:
    """Returns a figure with a return levels using the fitted tail model

    Returns:
        matplotlib.figure.Figure: figure

    """
    fig = plt.figure()
    exs_prob = self.exs_prob

    exceedance_frequency = 1 / np.logspace(1, 4, 20)
    exceedance_frequency = exceedance_frequency[
        exceedance_frequency < exs_prob
    ]  # plot only levels inside fitted tail model

    return_levels = [self.ppf(1 - x, return_all=True) for x in exceedance_frequency]

    # hat_return_levels are not the mean of posterior return level samples, as the mean is not an unbiased estimator
    hat_return_levels = np.array(
        [self.ppf(1 - x, return_all=False) for x in exceedance_frequency]
    )
    q025_return_levels = np.array([np.quantile(r, 0.025) for r in return_levels])
    q975_return_levels = np.array([np.quantile(r, 0.975) for r in return_levels])

    plt.plot(
        1.0 / exceedance_frequency,
        hat_return_levels,
        color=self._figure_color_palette[0],
    )
    plt.fill_between(
        1.0 / exceedance_frequency,
        q025_return_levels,
        q975_return_levels,
        alpha=0.2,
        color=self._figure_color_palette[1],
        linestyle="dashed",
    )
    plt.xscale("log")
    plt.title("Exceedance return levels")
    plt.xlabel("1/frequency")
    plt.ylabel("Return level")
    plt.grid()

    return fig
def ppf(self, q: t.Union[float, np.ndarray], return_all=False) ‑> Union[float, numpy.ndarray]

Returns the quantile function evaluated at some probability level

Args

q : t.Union[float, np.ndarray]
probability level
return_all : bool, optional
If True, returns posterior ppf sample; otherwise return pointwise estimator

Returns

t.Union[float, np.ndarray]
ppf value(s).
Expand source code
def ppf(
    self, q: t.Union[float, np.ndarray], return_all=False
) -> t.Union[float, np.ndarray]:
    """Returns the quantile function evaluated at some probability level

    Args:
        q (t.Union[float, np.ndarray]): probability level
        return_all (bool, optional): If True, returns posterior ppf sample; otherwise return pointwise estimator

    Returns:
        t.Union[float, np.ndarray]: ppf value(s).
    """
    if isinstance(q, np.ndarray):
        return np.array([self.ppf(elem) for elem in q])

    if q <= 1 - self.exs_prob:
        val = self.empirical.ppf(q / (1 - self.exs_prob), return_all=return_all)
        # if a vector is expected as output, vectorize scalar
        if return_all:
            return val * np.ones((len(self.tail.shapes),))
        else:
            return val
    else:
        return self.tail.ppf(
            (q - (1 - self.exs_prob)) / self.exs_prob, return_all=return_all
        )

Inherited members

class EmpiricalWithGPTail (**data: Any)

Represents a semiparametric extreme value model with a fitted Generalized Pareto distribution above a certain threshold, and an empirical distribution below it

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class EmpiricalWithGPTail(Mixture):

    """Represents a semiparametric extreme value model with a fitted Generalized Pareto distribution above a certain threshold, and an empirical distribution below it"""

    threshold: float

    def __repr__(self):
        return f"Semiparametric model with generalised Pareto tail. Modeling threshold: {self.threshold}, exceedance probability: {self.exs_prob}"

    @property
    def empirical(self) -> Empirical:
        """Empirical distribution below the modeling threshold

        Returns:
            Empirical: Distribution object
        """
        return self.distributions[0]

    @property
    def tail(self) -> GPTail:
        """Generalised Pareto tail model above modeling threshold

        Returns:
            GPTail: Distribution
        """
        return self.distributions[1]

    # @property
    # def threshold(self) -> float:
    #   """Modeling threshold

    #   Returns:
    #       float: threshold value
    #   """
    #   return self.distributions[1].thresholds

    @property
    def exs_prob(self) -> float:
        """Probability mass above threshold; probability weight of tail model.

        Returns:
            float: weight
        """
        return self.weights[1]

    def ppf(self, q: t.Union[float, np.ndarray]) -> t.Union[float, np.ndarray]:

        is_scalar = isinstance(q, self._allowed_scalar_types)

        if is_scalar:
            q = np.array([q])

        lower_idx = q <= 1 - self.exs_prob
        higher_idx = np.logical_not(lower_idx)

        ppf_values = np.empty((len(q),))
        ppf_values[lower_idx] = self.empirical.ppf(q[lower_idx] / (1 - self.exs_prob))
        ppf_values[higher_idx] = self.tail.ppf(
            (q[higher_idx] - (1 - self.exs_prob)) / self.exs_prob
        )

        if is_scalar:
            return ppf_values[0]
        else:
            return ppf_values

    @classmethod
    def from_data(
        cls, data: np.ndarray, threshold: float, bin_empirical: bool = False, **kwargs
    ) -> EmpiricalWithGPTail:
        """Fits a model from a given data array and threshold value

        Args:
            data (np.ndarray): Data
            threshold (float): Threshold value to use for the tail model
            bin_empirical (bool, optional): Whether to cast empirical mixture component to an integer distribution by rounding
            **kwargs: Additional arguments passed to GPTail.fit


        Returns:
            EmpiricalWithGPTail: Fitted model
        """
        exs_prob = 1 - Empirical.from_data(data).cdf(threshold)

        exceedances = data[data > threshold]

        empirical = Empirical.from_data(data[data <= threshold])
        if bin_empirical:
            empirical = empirical.to_integer()

        tail = GPTail.fit(data=exceedances, threshold=threshold, **kwargs)

        return cls(
            distributions=[empirical, tail],
            weights=np.array([1 - exs_prob, exs_prob]),
            threshold=threshold,
        )

    def plot_diagnostics(self) -> matplotlib.figure.Figure:
        return self.tail.plot_diagnostics()

    def plot_return_levels(self) -> matplotlib.figure.Figure:
        """Returns a figure with a return level plot using the fitted tail model

        Returns:
            matplotlib.figure.Figure: figure

        """
        fig = plt.figure()
        scale, shape = self.tail.scale, self.tail.shape
        exs_prob = self.exs_prob

        if self.tail.data is None:
            n_obs = (
                np.Inf
            )  # this only means that threshold estimation variance is ignored in figure confidence bounds
        else:
            n_obs = len(self.tail.data)

        # exceedance_frequency = 1/np.logspace(1,4,20)
        # exceedance_frequency = exceedance_frequency[exceedance_frequency < exs_prob] #plot only levels inside fitted tail model
        # shown return levels go from largest power of 10th below exceedance prob, to 1/10000-th of that.

        x_min = np.floor(np.log(exs_prob) / np.log(10))
        x_max = x_min - 4
        exceedance_frequency = 10 ** (np.linspace(x_min, x_max, 50))
        return_levels = self.ppf(1 - exceedance_frequency)

        plt.plot(
            1.0 / exceedance_frequency,
            return_levels,
            color=self._figure_color_palette[0],
        )
        plt.xscale("log")
        plt.title(" Return levels")
        plt.xlabel("Return period")
        plt.ylabel("Return level")
        plt.grid()

        try:
            # for this bit, look at An Introduction to Statistical selfing of Extreme Values, p.82
            mle_cov = self.tail.mle_cov()
            eigenvals, eigenvecs = np.linalg.eig(mle_cov)
            if np.all(eigenvals > 0):
                covariance = np.eye(3)
                covariance[1::, 1::] = mle_cov
                covariance[0, 0] = exs_prob * (1 - exs_prob) / n_obs
                #
                return_stdevs = []
                for m in 1.0 / exceedance_frequency:
                    quantile_grad = np.array(
                        [
                            scale * m * (m * exs_prob) ** (shape - 1),
                            1 / shape * ((exs_prob * m) ** shape - 1),
                            -scale / shape**2 * ((exs_prob * m) ** shape - 1)
                            + scale
                            / shape
                            * (exs_prob * m) ** shape
                            * np.log(exs_prob * m),
                        ]
                    )
                    #
                    sdev = np.sqrt(quantile_grad.T.dot(covariance).dot(quantile_grad))
                    return_stdevs.append(sdev)
                #
                return_stdevs = np.array(return_stdevs)
                plt.fill_between(
                    1.0 / exceedance_frequency,
                    return_levels - 1.96 * return_stdevs,
                    return_levels + 1.96 * return_stdevs,
                    alpha=0.2,
                    color=self._figure_color_palette[1],
                    linestyle="dashed",
                )
            else:
                warnings.warn(
                    "Covariance MLE matrix is not positive definite; it might be ill-conditioned",
                    stacklevel=2,
                )
        except Exception as e:
            warnings.warn(
                f"Confidence bands for return level could not be calculated; covariance matrix might be ill-conditioned; full trace: {traceback.format_exc()}",
                stacklevel=2,
            )

        return fig

Ancestors

Subclasses

Class variables

var threshold : float

Static methods

def from_data(data: np.ndarray, threshold: float, bin_empirical: bool = False, **kwargs) ‑> EmpiricalWithGPTail

Fits a model from a given data array and threshold value

Args

data : np.ndarray
Data
threshold : float
Threshold value to use for the tail model
bin_empirical : bool, optional
Whether to cast empirical mixture component to an integer distribution by rounding
**kwargs
Additional arguments passed to GPTail.fit

Returns

EmpiricalWithGPTail
Fitted model
Expand source code
@classmethod
def from_data(
    cls, data: np.ndarray, threshold: float, bin_empirical: bool = False, **kwargs
) -> EmpiricalWithGPTail:
    """Fits a model from a given data array and threshold value

    Args:
        data (np.ndarray): Data
        threshold (float): Threshold value to use for the tail model
        bin_empirical (bool, optional): Whether to cast empirical mixture component to an integer distribution by rounding
        **kwargs: Additional arguments passed to GPTail.fit


    Returns:
        EmpiricalWithGPTail: Fitted model
    """
    exs_prob = 1 - Empirical.from_data(data).cdf(threshold)

    exceedances = data[data > threshold]

    empirical = Empirical.from_data(data[data <= threshold])
    if bin_empirical:
        empirical = empirical.to_integer()

    tail = GPTail.fit(data=exceedances, threshold=threshold, **kwargs)

    return cls(
        distributions=[empirical, tail],
        weights=np.array([1 - exs_prob, exs_prob]),
        threshold=threshold,
    )

Instance variables

var empiricalEmpirical

Empirical distribution below the modeling threshold

Returns

Empirical
Distribution object
Expand source code
@property
def empirical(self) -> Empirical:
    """Empirical distribution below the modeling threshold

    Returns:
        Empirical: Distribution object
    """
    return self.distributions[0]
var exs_prob : float

Probability mass above threshold; probability weight of tail model.

Returns

float
weight
Expand source code
@property
def exs_prob(self) -> float:
    """Probability mass above threshold; probability weight of tail model.

    Returns:
        float: weight
    """
    return self.weights[1]
var tailGPTail

Generalised Pareto tail model above modeling threshold

Returns

GPTail
Distribution
Expand source code
@property
def tail(self) -> GPTail:
    """Generalised Pareto tail model above modeling threshold

    Returns:
        GPTail: Distribution
    """
    return self.distributions[1]

Methods

def plot_diagnostics(self) ‑> matplotlib.figure.Figure
Expand source code
def plot_diagnostics(self) -> matplotlib.figure.Figure:
    return self.tail.plot_diagnostics()
def plot_return_levels(self) ‑> matplotlib.figure.Figure

Returns a figure with a return level plot using the fitted tail model

Returns

matplotlib.figure.Figure
figure
Expand source code
def plot_return_levels(self) -> matplotlib.figure.Figure:
    """Returns a figure with a return level plot using the fitted tail model

    Returns:
        matplotlib.figure.Figure: figure

    """
    fig = plt.figure()
    scale, shape = self.tail.scale, self.tail.shape
    exs_prob = self.exs_prob

    if self.tail.data is None:
        n_obs = (
            np.Inf
        )  # this only means that threshold estimation variance is ignored in figure confidence bounds
    else:
        n_obs = len(self.tail.data)

    # exceedance_frequency = 1/np.logspace(1,4,20)
    # exceedance_frequency = exceedance_frequency[exceedance_frequency < exs_prob] #plot only levels inside fitted tail model
    # shown return levels go from largest power of 10th below exceedance prob, to 1/10000-th of that.

    x_min = np.floor(np.log(exs_prob) / np.log(10))
    x_max = x_min - 4
    exceedance_frequency = 10 ** (np.linspace(x_min, x_max, 50))
    return_levels = self.ppf(1 - exceedance_frequency)

    plt.plot(
        1.0 / exceedance_frequency,
        return_levels,
        color=self._figure_color_palette[0],
    )
    plt.xscale("log")
    plt.title(" Return levels")
    plt.xlabel("Return period")
    plt.ylabel("Return level")
    plt.grid()

    try:
        # for this bit, look at An Introduction to Statistical selfing of Extreme Values, p.82
        mle_cov = self.tail.mle_cov()
        eigenvals, eigenvecs = np.linalg.eig(mle_cov)
        if np.all(eigenvals > 0):
            covariance = np.eye(3)
            covariance[1::, 1::] = mle_cov
            covariance[0, 0] = exs_prob * (1 - exs_prob) / n_obs
            #
            return_stdevs = []
            for m in 1.0 / exceedance_frequency:
                quantile_grad = np.array(
                    [
                        scale * m * (m * exs_prob) ** (shape - 1),
                        1 / shape * ((exs_prob * m) ** shape - 1),
                        -scale / shape**2 * ((exs_prob * m) ** shape - 1)
                        + scale
                        / shape
                        * (exs_prob * m) ** shape
                        * np.log(exs_prob * m),
                    ]
                )
                #
                sdev = np.sqrt(quantile_grad.T.dot(covariance).dot(quantile_grad))
                return_stdevs.append(sdev)
            #
            return_stdevs = np.array(return_stdevs)
            plt.fill_between(
                1.0 / exceedance_frequency,
                return_levels - 1.96 * return_stdevs,
                return_levels + 1.96 * return_stdevs,
                alpha=0.2,
                color=self._figure_color_palette[1],
                linestyle="dashed",
            )
        else:
            warnings.warn(
                "Covariance MLE matrix is not positive definite; it might be ill-conditioned",
                stacklevel=2,
            )
    except Exception as e:
        warnings.warn(
            f"Confidence bands for return level could not be calculated; covariance matrix might be ill-conditioned; full trace: {traceback.format_exc()}",
            stacklevel=2,
        )

    return fig

Inherited members

class GPTail (**data: Any)

Representation of a fitted Generalized Pareto distribution as an exceedance model. It's density is given by

f(x) = \frac{1}{\sigma} \left( 1 + \xi \left( \frac{x - \mu}{\sigma} \right) \right)_{+}^{-(1 + 1/\xi)}

where \mu, \sigma, \xi are the location, scale and shape parameters, and (\cdot)_{+} = \max(\cdot, 0). The location parameter is also the lower endpoint (or threshold) of the distribution.

Args

threshold : float
modeling threshold
shape : float
fitted shape parameter
scale : float
fitted scale parameter
data : np.array, optional
exceedance data

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class GPTail(BaseDistribution):
    """Representation of a fitted Generalized Pareto distribution as an exceedance model. It's density is given by

    $$ f(x) = \\frac{1}{\\sigma} \\left( 1 + \\xi \\left( \\frac{x - \\mu}{\\sigma} \\right) \\right)_{+}^{-(1 + 1/\\xi)} $$

    where \\( \\mu, \\sigma, \\xi \\) are the location, scale and shape parameters, and \\( (\\cdot)_{+} = \\max(\\cdot, 0)\\). The location parameter is also the lower endpoint (or threshold) of the distribution.

    Args:
        threshold (float): modeling threshold
        shape (float): fitted shape parameter
        scale (float): fitted scale parameter
        data (np.array, optional): exceedance data
    """

    threshold: float
    shape: float
    scale: PositiveFloat
    data: t.Optional[np.ndarray]

    def __repr__(self):
        return f"Generalised Pareto tail model with (mu, scale, shape) = ({self.threshold},{self.scale},{self.shape}) components"

    @property
    def endpoint(self):
        return self.threshold - self.scale / self.shape if self.shape < 0 else np.Inf

    @property
    def model(self):
        return gpdist(loc=self.threshold, c=self.shape, scale=self.scale)

    # @classmethod
    # def fit(cls, data: np.array, threshold: float) -> GPTail:
    #   exceedances = data[data > threshold]
    #   shape, _, scale = gpdist.fit(exceedances, floc=threshold)
    #   return cls(
    #     threshold = threshold,
    #     shape = shape,
    #     scale = scale,
    #     data = exceedances)

    def __add__(self, other: float):

        if not isinstance(other, self._allowed_scalar_types):
            raise TypeError(
                f"+ is implemented for instances of types: {self._allowed_scalar_types}"
            )

        return GPTail(
            threshold=self.threshold + other,
            shape=self.shape,
            scale=self.scale,
            data=self.data + other if self.data is not None else None,
        )

    def __ge__(self, other: float):

        if not isinstance(other, self._allowed_scalar_types):
            raise TypeError(
                f">= and > are implemented for instances of types: {self._allowed_scalar_types}"
            )

        if other >= self.endpoint:
            raise ValueError(
                f"No probability mass above endpoint ({self.endpoint}); conditional distribution X | X >= {other} does not exist"
            )

        if other < self.threshold:
            return self >= self.threshold
        else:
            # condition on empirical data if applicable
            if self.data is None or max(self.data) < other:
                new_data = None
                warnings.warn(
                    f"No observed data above {other}; setting data to None in conditional model.",
                    stacklevel=2,
                )
            else:
                new_data = self.data[self.data >= other]

            return GPTail(
                threshold=other,
                shape=self.shape,
                scale=self.scale + self.shape * (other - self.threshold),
                data=new_data,
            )

    def __gt__(self, other: float) -> GPTail:

        return self.__ge__(other)

    def __mul__(self, other: float) -> GPTail:

        if not isinstance(other, self._allowed_scalar_types) or other <= 0:
            raise TypeError(
                f"* is implemented for positive instances of: {self._allowed_scalar_types}"
            )

        new_data = other * self.data if self.data is not None else None

        return GPTail(
            threshold=other * self.threshold,
            shape=self.shape,
            scale=other * self.scale,
            data=new_data,
        )

    def simulate(self, size: int) -> np.ndarray:
        return self.model.rvs(size=size)

    def moment(self, n: int, **kwargs) -> float:
        return self.model.moment(n)

    def ppf(
        self, q: t.Union[float, np.ndarray], **kwargs
    ) -> t.Union[float, np.ndarray]:
        return self.model.ppf(q)

    def cdf(
        self, x: t.Union[float, np.ndarray], **kwargs
    ) -> t.Union[float, np.ndarray]:
        return self.model.cdf(x)

    def pdf(
        self, x: t.Union[float, np.ndarray], **kwargs
    ) -> t.Union[float, np.ndarray]:
        return self.model.pdf(x)

    def std(self, **kwargs):
        return self.model.std()

    def mle_cov(self) -> np.ndarray:
        """Returns the estimated parameter covariance matrix evaluated at the fitted parameters

        Returns:
            np.ndarray: Covariance matrix
        """

        if self.data is None:
            raise ValueError(
                "exceedance data not provided for this instance of GPTail; covariance matrix can't be estimated"
            )
        else:
            hess = self.loglik_hessian(
                [self.scale, self.shape], threshold=self.threshold, data=self.data
            )
            return np.linalg.inv(-hess)

    @classmethod
    def loglik(cls, params: t.List[float], threshold: float, data: np.ndarray) -> float:
        """Returns the negative log-likelihood for a Generalised Pareto model

        Args:
            *params (t.List[float]): Vector parameter with scale and shape values, in that order
            threshold (float): model threshold
            data (np.ndarray): exceedance data

        Returns:
            float
        """
        scale, shape = params
        return np.sum(gpdist.logpdf(data, loc=threshold, c=shape, scale=scale))

    @classmethod
    def loglik_grad(
        cls, params: t.List[float], threshold: float, data: np.ndarray
    ) -> np.ndarray:
        """Returns the gradient of the negative log-likelihood for a Generalised Pareto model

        Args:
            *params (t.List[float]): Vector parameter with scale and shape values, in that order
            threshold (float): model threshold
            data (np.ndarray): exceedance data

        Returns:
            np.ndarray: gradient array
        """
        scale, shape = params
        y = data - threshold

        if not np.isclose(shape, 0, atol=cls._error_tol):
            grad_scale = np.sum((y - scale) / (scale * (y * shape + scale)))
            grad_shape = np.sum(
                -((y * (1 + shape)) / (shape * (y * shape + scale)))
                + np.log(1 + (y * shape) / scale) / shape**2
            )
        else:
            grad_scale = np.sum((y - scale) / scale**2)
            grad_shape = np.sum(y * (y - 2 * scale) / (2 * scale**2))

        return np.array([grad_scale, grad_shape])

    @classmethod
    def loglik_hessian(
        cls, params: t.List[float], threshold: float, data: np.ndarray
    ) -> np.ndarray:
        """Returns the Hessian matrix of the negative log-likelihood for a Generalised Pareto model

        Args:
            *params (t.List[float]): Vector parameter with scale and shape values, in that order
            threshold (float): model threshold
            data (np.ndarray): exceedance data above the threshold

        Returns:
            np.ndarray: hessian matrix array
        """
        scale, shape = params
        y = data - threshold

        if not np.isclose(shape, 0, atol=cls._error_tol):
            d2scale = np.sum(
                -(1 / (shape * scale**2))
                + (1 + shape) / (shape * (y * shape + scale) ** 2)
            )

            # d2shape = (y (3 y ξ + y ξ^2 + 2 σ))/(ξ^2 (y ξ + σ)^2) - (2 Log[1 + (y ξ)/σ])/ξ^3
            d2shape = np.sum(
                (y * (3 * y * shape + y * shape**2 + 2 * scale))
                / (shape**2 * (y * shape + scale) ** 2)
                - (2 * np.log(1 + (y * shape) / scale)) / shape**3
            )

            # dscale_dshape = (y (-y + σ))/(σ (y ξ + σ)^2)
            dscale_dshape = np.sum(
                (y * (-y + scale)) / (scale * (y * shape + scale) ** 2)
            )
        else:
            d2scale = np.sum((scale - 2 * y) / scale**3)
            dscale_dshape = np.sum(-y * (y - scale) / scale**3)
            d2shape = np.sum(y**2 * (3 * scale - 2 * y) / (3 * scale**3))

        hessian = np.array([[d2scale, dscale_dshape], [dscale_dshape, d2shape]])

        return hessian

    @classmethod
    def logreg(cls, params: t.List[float]) -> float:
        """The regularisation terms are inspired in uninformative and zero-mean Gaussian priors for the scale and shape respectively, thus it is given by

        $$r(\\sigma, \\xi) = 0.5 \\cdot \\log(\\sigma) - 0.5 \\xi^2$$

        Args:
            params (t.List[float]): scale and shape parameters in that order

        Returns:
            float: Description
        """
        scale, shape = params

        return 0.5 * (np.log(scale) + shape**2)

    @classmethod
    def logreg_grad(cls, params: t.List[float]) -> float:
        """Returns the gradient of the regularisation term

        Args:
            params (t.List[float]): scale and shape parameters in that order

        """
        scale, shape = params
        return 0.5 * np.array([1.0 / scale, 2 * shape])

    @classmethod
    def logreg_hessian(cls, params: t.List[float]) -> float:
        """Returns the Hessian of the regularisation term

        Args:
            params (t.List[float]): scale and shape parameters in that order

        """
        scale, shape = params
        return 0.5 * np.array([-1.0 / scale**2, 0, 0, 2]).reshape((2, 2))

    @classmethod
    def loss(
        cls, params: t.List[float], threshold: float, data: np.ndarray
    ) -> np.ndarray:
        """Calculate the loss function on the provided parameters; this is the sum of the (negative) data log-likelihood and (negative) regularisation terms for the scale and shape. Everything is divided by the number of data points.

        Args:
            params (t.List[float]): Description
            threshold (float): Model threshold; eequivalently, location parameter
            data (np.ndarray): exceedance data above threshold

        """
        scale, shape = params
        n = len(data)
        unnorm = cls.loglik(params, threshold, data) + cls.logreg(params)
        return -unnorm / n

    @classmethod
    def loss_grad(
        cls, params: t.List[float], threshold: float, data: np.ndarray
    ) -> np.ndarray:
        """Calculate the loss function's gradient on the provided parameters

        Args:
            params (t.List[float]): Description
            threshold (float): Model threshold; eequivalently, location parameter
            data (np.ndarray): exceedance data above threshold

        """
        n = len(data)
        return -(cls.loglik_grad(params, threshold, data) + cls.logreg_grad(params)) / n

    @classmethod
    def loss_hessian(
        cls, params: t.List[float], threshold: float, data: np.ndarray
    ) -> np.ndarray:
        """Calculate the loss function's Hessian on the provided parameters

        Args:
            params (t.List[float]): Description
            threshold (float): Model threshold; eequivalently, location parameter
            data (np.ndarray): exceedance data above threshold

        """
        n = len(data)
        return (
            -(cls.loglik_hessian(params, threshold, data) + cls.logreg_hessian(params))
            / n
        )

    @classmethod
    def fit(
        cls,
        data: np.ndarray,
        threshold: float,
        x0: np.ndarray = None,
        return_opt_results=False,
    ) -> t.Union[GPTail, sp.optimize.OptimizeResult]:
        """Fits a eneralised Pareto tail model using a constrained trust region method

        Args:
            data (np.ndarray): exceedance data above threshold
            threshold (float): Model threshold
            x0 (np.ndarray, optional): Initial guess for optimization; if None, the result of scipy.stats.genpareto.fit is used as a starting point.
            return_opt_results (bool, optional): If True, return the OptimizeResult object; otherwise return fitted instance of GPTail

        Returns:
            t.Union[GPTail, sp.optimize.OptimizeResult]: Description
        """
        exceedances = data[data > threshold]

        # rescale exceedances and threshold so that both parameters are in roughly the same scale, improving numerical conditioning
        sdev = np.std(exceedances)

        # rescaling the data rescales the location and scale parameter, and leaves the shape parameter unchanged
        norm_exceedances = exceedances / sdev
        norm_threshold = threshold / sdev

        norm_max = max(norm_exceedances)

        constraints = LinearConstraint(
            A=np.array([[1 / (norm_max - norm_threshold), 1], [1, 0]]),
            lb=np.zeros((2,)),
            ub=np.Inf,
        )

        if x0 is None:
            # use default scipy fitter to get initial estimate
            # this is almost always good enough
            shape, _, scale = gpdist.fit(norm_exceedances, floc=norm_threshold)
            x0 = np.array([scale, shape])

        loss_func = lambda params: GPTail.loss(params, norm_threshold, norm_exceedances)
        loss_grad = lambda params: GPTail.loss_grad(
            params, norm_threshold, norm_exceedances
        )
        loss_hessian = lambda params: GPTail.loss_hessian(
            params, norm_threshold, norm_exceedances
        )

        res = minimize(
            fun=loss_func,
            x0=x0,
            method="trust-constr",
            jac=loss_grad,
            hess=loss_hessian,
            constraints=[constraints],
        )

        # print(res)
        if return_opt_results:
            warnings.warn(
                "Returning raw results for rescaled exceedance data (sdev ~ 1)."
            )
            return res
        else:
            scale, shape = list(res.x)

            return cls(
                threshold=sdev * norm_threshold,
                scale=sdev * scale,
                shape=shape,
                data=sdev * norm_exceedances,
            )

    def plot_diagnostics(self) -> None:
        """Returns a figure with fit diagnostic for the GP model"""
        if self.data is None:
            raise ValueError("Exceedance data was not provided for this model.")

        fig, axs = plt.subplots(3, 2)

        #################### Profile log-likelihood

        # set profile intervals based on MLE variance
        mle_cov = self.mle_cov()
        scale_bounds, shape_bounds = (
            self.scale - np.sqrt(mle_cov[0, 0]),
            self.scale + np.sqrt(mle_cov[0, 0]),
        ), (self.shape - np.sqrt(mle_cov[1, 1]), self.shape + np.sqrt(mle_cov[1, 1]))

        # set profile grids
        scale_grid = np.linspace(scale_bounds[0], scale_bounds[1], 50)
        shape_grid = np.linspace(shape_bounds[0], shape_bounds[1], 50)

        # declare profile functions
        scale_profile_func = lambda x: self.loglik(
            [x, self.shape], self.threshold, self.data
        )
        shape_profile_func = lambda x: self.loglik(
            [self.scale, x], self.threshold, self.data
        )

        loss_value = scale_profile_func(self.scale)

        scale_profile = np.array([scale_profile_func(x) for x in scale_grid])
        shape_profile = np.array([shape_profile_func(x) for x in shape_grid])

        alpha = 2 if loss_value > 0 else 0.5

        # filter to almost-optimal values

        def filter_grid(grid, optimum):
            radius = 2 * np.abs(optimum)
            return np.logical_and(
                np.logical_not(np.isnan(grid)),
                np.isfinite(grid),
                np.abs(grid - optimum) < radius,
            )

        scale_filter = filter_grid(scale_profile, loss_value)
        shape_filter = filter_grid(shape_profile, loss_value)

        valid_scales = scale_grid[scale_filter]
        valid_scale_profile = scale_profile[scale_filter]

        axs[0, 0].plot(
            valid_scales, valid_scale_profile, color=self._figure_color_palette[0]
        )
        axs[0, 0].vlines(
            x=self.scale,
            ymin=min(valid_scale_profile),
            ymax=max(valid_scale_profile),
            linestyle="dashed",
            colors=self._figure_color_palette[1],
        )
        axs[0, 0].title.set_text("Profile scale log-likelihood")
        axs[0, 0].set_xlabel("Scale")
        axs[0, 0].set_ylabel("log-likelihood")
        axs[0, 0].grid()

        valid_shapes = shape_grid[shape_filter]
        valid_shape_profile = shape_profile[shape_filter]

        axs[0, 1].plot(
            valid_shapes, valid_shape_profile, color=self._figure_color_palette[0]
        )
        axs[0, 1].vlines(
            x=self.shape,
            ymin=min(valid_shape_profile),
            ymax=max(valid_shape_profile),
            linestyle="dashed",
            colors=self._figure_color_palette[1],
        )
        axs[0, 1].title.set_text("Profile shape log-likelihood")
        axs[0, 1].set_xlabel("Shape")
        axs[0, 1].set_ylabel("log-likelihood")
        axs[0, 1].grid()

        ######################## Log-likelihood surface ###############
        scale_grid, shape_grid = np.mgrid[
            min(valid_scales) : max(valid_scales) : 2 * np.sqrt(mle_cov[0, 0]) / 50,
            shape_bounds[0] : shape_bounds[1] : 2 * np.sqrt(mle_cov[0, 0]) / 50,
        ]

        scale_mesh, shape_mesh = np.meshgrid(
            np.linspace(min(valid_scales), max(valid_scales), 50),
            np.linspace(min(valid_shapes), max(valid_shapes), 50),
        )

        max_x = max(self.data)

        z = np.empty(scale_mesh.shape)
        for i in range(scale_mesh.shape[0]):
            for j in range(scale_mesh.shape[1]):
                shape = shape_mesh[i, j]
                scale = scale_mesh[i, j]
                if shape < 0 and self.threshold - scale / shape < max_x:
                    z[i, j] = np.nan
                else:
                    z[i, j] = self.loglik([scale, shape], self.threshold, self.data)

        # negate z to recover true loglikelihood
        axs[1, 0].contourf(scale_mesh, shape_mesh, z, levels=15)
        axs[1, 0].scatter([self.scale], [self.shape], color="darkorange", s=2)
        axs[1, 0].annotate(text="MLE", xy=(self.scale, self.shape), color="darkorange")
        axs[1, 0].title.set_text("Log-likelihood surface")
        axs[1, 0].set_xlabel("Scale")
        axs[1, 0].set_ylabel("Shape")

        ############## histogram vs density ################
        hist_data = axs[1, 1].hist(
            self.data, bins=25, edgecolor="white", color=self._figure_color_palette[0]
        )

        range_min, range_max = min(self.data), max(self.data)
        x_axis = np.linspace(range_min, range_max, 100)
        pdf_vals = self.pdf(x_axis)
        y_axis = hist_data[0][0] / pdf_vals[0] * pdf_vals

        axs[1, 1].plot(x_axis, y_axis, color=self._figure_color_palette[1])
        axs[1, 1].title.set_text("Data vs rescaled fitted density")
        axs[1, 1].set_xlabel("Exceedance data")
        axs[1, 1].yaxis.set_visible(False)  # Hide only x axis
        # axs[0, 0].set_aspect('equal', 'box')

        ############# Q-Q plot ################
        probability_range = np.linspace(0.01, 0.99, 99)
        empirical_quantiles = np.quantile(self.data, probability_range)
        tail_quantiles = self.ppf(probability_range)

        axs[2, 0].scatter(
            tail_quantiles, empirical_quantiles, color=self._figure_color_palette[0]
        )
        min_x, max_x = min(tail_quantiles), max(tail_quantiles)
        # axs[0,1].set_aspect('equal', 'box')
        axs[2, 0].title.set_text("Q-Q plot")
        axs[2, 0].set_xlabel("model quantiles")
        axs[2, 0].set_ylabel("Data quantiles")
        axs[2, 0].grid()
        axs[2, 0].plot([min_x, max_x], [min_x, max_x], linestyle="--", color="black")

        ############ Mean return plot ###############
        # scale, shape = self.scale, self.shape

        # n_obs = len(self.data)
        # exceedance_frequency = 1/np.logspace(1,4,20)
        # return_levels = self.ppf(1 - exceedance_frequency)

        # axs[2,1].plot(1.0/exceedance_frequency,return_levels,color=self._figure_color_palette[0])
        # axs[2,1].set_xscale("log")
        # axs[2,1].title.set_text("Exceedance model's return levels")
        # axs[2,1].set_xlabel('1/frequency')
        # axs[2,1].set_ylabel('Return level')
        # axs[2,1].grid()

        # exs_prob = 1 #carried over from older code

        # m = 10**np.linspace(np.log(1/exs_prob + 1)/np.log(10), 3,20)
        # return_levels = self.ppf(1 - 1/(exs_prob*m))

        # axs[2,1].plot(m,return_levels,color=self._figure_color_palette[0])
        # axs[2,1].set_xscale("log")
        # axs[2,1].title.set_text('Exceedance return levels')
        # axs[2,1].set_xlabel('1/frequency')
        # axs[2,1].set_ylabel('Return level')
        # axs[2,1].grid()

        # try:
        #   #for this bit, look at An Introduction to Statistical selfing of Extreme Values, p.82
        #   mle_cov = self.mle_cov()
        #   eigenvals, eigenvecs = np.linalg.eig(mle_cov)
        #   if np.all(eigenvals > 0):
        #     covariance = np.eye(3)
        #     covariance[1::,1::] = mle_cov
        #     covariance[0,0] = exs_prob*(1-exs_prob)/n_obs
        #     #
        #     return_stdevs = []
        #     for m_ in m:
        #       quantile_grad = np.array([
        #         scale*m_**(shape)*exs_prob**(shape-1),
        #         shape**(-1)*((exs_prob*m_)**shape-1),
        #         -scale*shape**(-2)*((exs_prob*m_)**shape-1)+scale*shape**(-1)*(exs_prob*m_)**shape*np.log(exs_prob*m_)
        #         ])
        #       #
        #       sdev = np.sqrt(quantile_grad.T.dot(covariance).dot(quantile_grad))
        #       return_stdevs.append(sdev)
        #     #
        #     axs[2,1].fill_between(m, return_levels - return_stdevs, return_levels + return_stdevs, alpha=0.2, color=self._figure_color_palette[1])
        #   else:
        #     warnings.warn("Covariance MLE matrix is not positive definite; it might be ill-conditioned", stacklevel=2)
        # except Exception as e:
        #   warnings.warn(f"Confidence bands for return level could not be calculated; covariance matrix might be ill-conditioned; full trace: {traceback.format_exc()}", stacklevel=2)

        plt.tight_layout()
        return fig

Ancestors

Class variables

var data : Optional[numpy.ndarray]
var scale : pydantic.types.PositiveFloat
var shape : float
var threshold : float

Static methods

def fit(data: np.ndarray, threshold: float, x0: np.ndarray = None, return_opt_results=False) ‑> Union[GPTail, scipy.optimize.optimize.OptimizeResult]

Fits a eneralised Pareto tail model using a constrained trust region method

Args

data : np.ndarray
exceedance data above threshold
threshold : float
Model threshold
x0 : np.ndarray, optional
Initial guess for optimization; if None, the result of scipy.stats.genpareto.fit is used as a starting point.
return_opt_results : bool, optional
If True, return the OptimizeResult object; otherwise return fitted instance of GPTail

Returns

t.Union[GPTail, sp.optimize.OptimizeResult]
Description
Expand source code
@classmethod
def fit(
    cls,
    data: np.ndarray,
    threshold: float,
    x0: np.ndarray = None,
    return_opt_results=False,
) -> t.Union[GPTail, sp.optimize.OptimizeResult]:
    """Fits a eneralised Pareto tail model using a constrained trust region method

    Args:
        data (np.ndarray): exceedance data above threshold
        threshold (float): Model threshold
        x0 (np.ndarray, optional): Initial guess for optimization; if None, the result of scipy.stats.genpareto.fit is used as a starting point.
        return_opt_results (bool, optional): If True, return the OptimizeResult object; otherwise return fitted instance of GPTail

    Returns:
        t.Union[GPTail, sp.optimize.OptimizeResult]: Description
    """
    exceedances = data[data > threshold]

    # rescale exceedances and threshold so that both parameters are in roughly the same scale, improving numerical conditioning
    sdev = np.std(exceedances)

    # rescaling the data rescales the location and scale parameter, and leaves the shape parameter unchanged
    norm_exceedances = exceedances / sdev
    norm_threshold = threshold / sdev

    norm_max = max(norm_exceedances)

    constraints = LinearConstraint(
        A=np.array([[1 / (norm_max - norm_threshold), 1], [1, 0]]),
        lb=np.zeros((2,)),
        ub=np.Inf,
    )

    if x0 is None:
        # use default scipy fitter to get initial estimate
        # this is almost always good enough
        shape, _, scale = gpdist.fit(norm_exceedances, floc=norm_threshold)
        x0 = np.array([scale, shape])

    loss_func = lambda params: GPTail.loss(params, norm_threshold, norm_exceedances)
    loss_grad = lambda params: GPTail.loss_grad(
        params, norm_threshold, norm_exceedances
    )
    loss_hessian = lambda params: GPTail.loss_hessian(
        params, norm_threshold, norm_exceedances
    )

    res = minimize(
        fun=loss_func,
        x0=x0,
        method="trust-constr",
        jac=loss_grad,
        hess=loss_hessian,
        constraints=[constraints],
    )

    # print(res)
    if return_opt_results:
        warnings.warn(
            "Returning raw results for rescaled exceedance data (sdev ~ 1)."
        )
        return res
    else:
        scale, shape = list(res.x)

        return cls(
            threshold=sdev * norm_threshold,
            scale=sdev * scale,
            shape=shape,
            data=sdev * norm_exceedances,
        )
def loglik(params: t.List[float], threshold: float, data: np.ndarray) ‑> float

Returns the negative log-likelihood for a Generalised Pareto model

Args

*params : t.List[float]
Vector parameter with scale and shape values, in that order
threshold : float
model threshold
data : np.ndarray
exceedance data

Returns

float

Expand source code
@classmethod
def loglik(cls, params: t.List[float], threshold: float, data: np.ndarray) -> float:
    """Returns the negative log-likelihood for a Generalised Pareto model

    Args:
        *params (t.List[float]): Vector parameter with scale and shape values, in that order
        threshold (float): model threshold
        data (np.ndarray): exceedance data

    Returns:
        float
    """
    scale, shape = params
    return np.sum(gpdist.logpdf(data, loc=threshold, c=shape, scale=scale))
def loglik_grad(params: t.List[float], threshold: float, data: np.ndarray) ‑> numpy.ndarray

Returns the gradient of the negative log-likelihood for a Generalised Pareto model

Args

*params : t.List[float]
Vector parameter with scale and shape values, in that order
threshold : float
model threshold
data : np.ndarray
exceedance data

Returns

np.ndarray
gradient array
Expand source code
@classmethod
def loglik_grad(
    cls, params: t.List[float], threshold: float, data: np.ndarray
) -> np.ndarray:
    """Returns the gradient of the negative log-likelihood for a Generalised Pareto model

    Args:
        *params (t.List[float]): Vector parameter with scale and shape values, in that order
        threshold (float): model threshold
        data (np.ndarray): exceedance data

    Returns:
        np.ndarray: gradient array
    """
    scale, shape = params
    y = data - threshold

    if not np.isclose(shape, 0, atol=cls._error_tol):
        grad_scale = np.sum((y - scale) / (scale * (y * shape + scale)))
        grad_shape = np.sum(
            -((y * (1 + shape)) / (shape * (y * shape + scale)))
            + np.log(1 + (y * shape) / scale) / shape**2
        )
    else:
        grad_scale = np.sum((y - scale) / scale**2)
        grad_shape = np.sum(y * (y - 2 * scale) / (2 * scale**2))

    return np.array([grad_scale, grad_shape])
def loglik_hessian(params: t.List[float], threshold: float, data: np.ndarray) ‑> numpy.ndarray

Returns the Hessian matrix of the negative log-likelihood for a Generalised Pareto model

Args

*params : t.List[float]
Vector parameter with scale and shape values, in that order
threshold : float
model threshold
data : np.ndarray
exceedance data above the threshold

Returns

np.ndarray
hessian matrix array
Expand source code
@classmethod
def loglik_hessian(
    cls, params: t.List[float], threshold: float, data: np.ndarray
) -> np.ndarray:
    """Returns the Hessian matrix of the negative log-likelihood for a Generalised Pareto model

    Args:
        *params (t.List[float]): Vector parameter with scale and shape values, in that order
        threshold (float): model threshold
        data (np.ndarray): exceedance data above the threshold

    Returns:
        np.ndarray: hessian matrix array
    """
    scale, shape = params
    y = data - threshold

    if not np.isclose(shape, 0, atol=cls._error_tol):
        d2scale = np.sum(
            -(1 / (shape * scale**2))
            + (1 + shape) / (shape * (y * shape + scale) ** 2)
        )

        # d2shape = (y (3 y ξ + y ξ^2 + 2 σ))/(ξ^2 (y ξ + σ)^2) - (2 Log[1 + (y ξ)/σ])/ξ^3
        d2shape = np.sum(
            (y * (3 * y * shape + y * shape**2 + 2 * scale))
            / (shape**2 * (y * shape + scale) ** 2)
            - (2 * np.log(1 + (y * shape) / scale)) / shape**3
        )

        # dscale_dshape = (y (-y + σ))/(σ (y ξ + σ)^2)
        dscale_dshape = np.sum(
            (y * (-y + scale)) / (scale * (y * shape + scale) ** 2)
        )
    else:
        d2scale = np.sum((scale - 2 * y) / scale**3)
        dscale_dshape = np.sum(-y * (y - scale) / scale**3)
        d2shape = np.sum(y**2 * (3 * scale - 2 * y) / (3 * scale**3))

    hessian = np.array([[d2scale, dscale_dshape], [dscale_dshape, d2shape]])

    return hessian
def logreg(params: t.List[float]) ‑> float

The regularisation terms are inspired in uninformative and zero-mean Gaussian priors for the scale and shape respectively, thus it is given by

r(\sigma, \xi) = 0.5 \cdot \log(\sigma) - 0.5 \xi^2

Args

params : t.List[float]
scale and shape parameters in that order

Returns

float
Description
Expand source code
@classmethod
def logreg(cls, params: t.List[float]) -> float:
    """The regularisation terms are inspired in uninformative and zero-mean Gaussian priors for the scale and shape respectively, thus it is given by

    $$r(\\sigma, \\xi) = 0.5 \\cdot \\log(\\sigma) - 0.5 \\xi^2$$

    Args:
        params (t.List[float]): scale and shape parameters in that order

    Returns:
        float: Description
    """
    scale, shape = params

    return 0.5 * (np.log(scale) + shape**2)
def logreg_grad(params: t.List[float]) ‑> float

Returns the gradient of the regularisation term

Args

params : t.List[float]
scale and shape parameters in that order
Expand source code
@classmethod
def logreg_grad(cls, params: t.List[float]) -> float:
    """Returns the gradient of the regularisation term

    Args:
        params (t.List[float]): scale and shape parameters in that order

    """
    scale, shape = params
    return 0.5 * np.array([1.0 / scale, 2 * shape])
def logreg_hessian(params: t.List[float]) ‑> float

Returns the Hessian of the regularisation term

Args

params : t.List[float]
scale and shape parameters in that order
Expand source code
@classmethod
def logreg_hessian(cls, params: t.List[float]) -> float:
    """Returns the Hessian of the regularisation term

    Args:
        params (t.List[float]): scale and shape parameters in that order

    """
    scale, shape = params
    return 0.5 * np.array([-1.0 / scale**2, 0, 0, 2]).reshape((2, 2))
def loss(params: t.List[float], threshold: float, data: np.ndarray) ‑> numpy.ndarray

Calculate the loss function on the provided parameters; this is the sum of the (negative) data log-likelihood and (negative) regularisation terms for the scale and shape. Everything is divided by the number of data points.

Args

params : t.List[float]
Description
threshold : float
Model threshold; eequivalently, location parameter
data : np.ndarray
exceedance data above threshold
Expand source code
@classmethod
def loss(
    cls, params: t.List[float], threshold: float, data: np.ndarray
) -> np.ndarray:
    """Calculate the loss function on the provided parameters; this is the sum of the (negative) data log-likelihood and (negative) regularisation terms for the scale and shape. Everything is divided by the number of data points.

    Args:
        params (t.List[float]): Description
        threshold (float): Model threshold; eequivalently, location parameter
        data (np.ndarray): exceedance data above threshold

    """
    scale, shape = params
    n = len(data)
    unnorm = cls.loglik(params, threshold, data) + cls.logreg(params)
    return -unnorm / n
def loss_grad(params: t.List[float], threshold: float, data: np.ndarray) ‑> numpy.ndarray

Calculate the loss function's gradient on the provided parameters

Args

params : t.List[float]
Description
threshold : float
Model threshold; eequivalently, location parameter
data : np.ndarray
exceedance data above threshold
Expand source code
@classmethod
def loss_grad(
    cls, params: t.List[float], threshold: float, data: np.ndarray
) -> np.ndarray:
    """Calculate the loss function's gradient on the provided parameters

    Args:
        params (t.List[float]): Description
        threshold (float): Model threshold; eequivalently, location parameter
        data (np.ndarray): exceedance data above threshold

    """
    n = len(data)
    return -(cls.loglik_grad(params, threshold, data) + cls.logreg_grad(params)) / n
def loss_hessian(params: t.List[float], threshold: float, data: np.ndarray) ‑> numpy.ndarray

Calculate the loss function's Hessian on the provided parameters

Args

params : t.List[float]
Description
threshold : float
Model threshold; eequivalently, location parameter
data : np.ndarray
exceedance data above threshold
Expand source code
@classmethod
def loss_hessian(
    cls, params: t.List[float], threshold: float, data: np.ndarray
) -> np.ndarray:
    """Calculate the loss function's Hessian on the provided parameters

    Args:
        params (t.List[float]): Description
        threshold (float): Model threshold; eequivalently, location parameter
        data (np.ndarray): exceedance data above threshold

    """
    n = len(data)
    return (
        -(cls.loglik_hessian(params, threshold, data) + cls.logreg_hessian(params))
        / n
    )

Instance variables

var endpoint
Expand source code
@property
def endpoint(self):
    return self.threshold - self.scale / self.shape if self.shape < 0 else np.Inf
var model
Expand source code
@property
def model(self):
    return gpdist(loc=self.threshold, c=self.shape, scale=self.scale)

Methods

def mle_cov(self) ‑> numpy.ndarray

Returns the estimated parameter covariance matrix evaluated at the fitted parameters

Returns

np.ndarray
Covariance matrix
Expand source code
def mle_cov(self) -> np.ndarray:
    """Returns the estimated parameter covariance matrix evaluated at the fitted parameters

    Returns:
        np.ndarray: Covariance matrix
    """

    if self.data is None:
        raise ValueError(
            "exceedance data not provided for this instance of GPTail; covariance matrix can't be estimated"
        )
    else:
        hess = self.loglik_hessian(
            [self.scale, self.shape], threshold=self.threshold, data=self.data
        )
        return np.linalg.inv(-hess)
def plot_diagnostics(self) ‑> None

Returns a figure with fit diagnostic for the GP model

Expand source code
def plot_diagnostics(self) -> None:
    """Returns a figure with fit diagnostic for the GP model"""
    if self.data is None:
        raise ValueError("Exceedance data was not provided for this model.")

    fig, axs = plt.subplots(3, 2)

    #################### Profile log-likelihood

    # set profile intervals based on MLE variance
    mle_cov = self.mle_cov()
    scale_bounds, shape_bounds = (
        self.scale - np.sqrt(mle_cov[0, 0]),
        self.scale + np.sqrt(mle_cov[0, 0]),
    ), (self.shape - np.sqrt(mle_cov[1, 1]), self.shape + np.sqrt(mle_cov[1, 1]))

    # set profile grids
    scale_grid = np.linspace(scale_bounds[0], scale_bounds[1], 50)
    shape_grid = np.linspace(shape_bounds[0], shape_bounds[1], 50)

    # declare profile functions
    scale_profile_func = lambda x: self.loglik(
        [x, self.shape], self.threshold, self.data
    )
    shape_profile_func = lambda x: self.loglik(
        [self.scale, x], self.threshold, self.data
    )

    loss_value = scale_profile_func(self.scale)

    scale_profile = np.array([scale_profile_func(x) for x in scale_grid])
    shape_profile = np.array([shape_profile_func(x) for x in shape_grid])

    alpha = 2 if loss_value > 0 else 0.5

    # filter to almost-optimal values

    def filter_grid(grid, optimum):
        radius = 2 * np.abs(optimum)
        return np.logical_and(
            np.logical_not(np.isnan(grid)),
            np.isfinite(grid),
            np.abs(grid - optimum) < radius,
        )

    scale_filter = filter_grid(scale_profile, loss_value)
    shape_filter = filter_grid(shape_profile, loss_value)

    valid_scales = scale_grid[scale_filter]
    valid_scale_profile = scale_profile[scale_filter]

    axs[0, 0].plot(
        valid_scales, valid_scale_profile, color=self._figure_color_palette[0]
    )
    axs[0, 0].vlines(
        x=self.scale,
        ymin=min(valid_scale_profile),
        ymax=max(valid_scale_profile),
        linestyle="dashed",
        colors=self._figure_color_palette[1],
    )
    axs[0, 0].title.set_text("Profile scale log-likelihood")
    axs[0, 0].set_xlabel("Scale")
    axs[0, 0].set_ylabel("log-likelihood")
    axs[0, 0].grid()

    valid_shapes = shape_grid[shape_filter]
    valid_shape_profile = shape_profile[shape_filter]

    axs[0, 1].plot(
        valid_shapes, valid_shape_profile, color=self._figure_color_palette[0]
    )
    axs[0, 1].vlines(
        x=self.shape,
        ymin=min(valid_shape_profile),
        ymax=max(valid_shape_profile),
        linestyle="dashed",
        colors=self._figure_color_palette[1],
    )
    axs[0, 1].title.set_text("Profile shape log-likelihood")
    axs[0, 1].set_xlabel("Shape")
    axs[0, 1].set_ylabel("log-likelihood")
    axs[0, 1].grid()

    ######################## Log-likelihood surface ###############
    scale_grid, shape_grid = np.mgrid[
        min(valid_scales) : max(valid_scales) : 2 * np.sqrt(mle_cov[0, 0]) / 50,
        shape_bounds[0] : shape_bounds[1] : 2 * np.sqrt(mle_cov[0, 0]) / 50,
    ]

    scale_mesh, shape_mesh = np.meshgrid(
        np.linspace(min(valid_scales), max(valid_scales), 50),
        np.linspace(min(valid_shapes), max(valid_shapes), 50),
    )

    max_x = max(self.data)

    z = np.empty(scale_mesh.shape)
    for i in range(scale_mesh.shape[0]):
        for j in range(scale_mesh.shape[1]):
            shape = shape_mesh[i, j]
            scale = scale_mesh[i, j]
            if shape < 0 and self.threshold - scale / shape < max_x:
                z[i, j] = np.nan
            else:
                z[i, j] = self.loglik([scale, shape], self.threshold, self.data)

    # negate z to recover true loglikelihood
    axs[1, 0].contourf(scale_mesh, shape_mesh, z, levels=15)
    axs[1, 0].scatter([self.scale], [self.shape], color="darkorange", s=2)
    axs[1, 0].annotate(text="MLE", xy=(self.scale, self.shape), color="darkorange")
    axs[1, 0].title.set_text("Log-likelihood surface")
    axs[1, 0].set_xlabel("Scale")
    axs[1, 0].set_ylabel("Shape")

    ############## histogram vs density ################
    hist_data = axs[1, 1].hist(
        self.data, bins=25, edgecolor="white", color=self._figure_color_palette[0]
    )

    range_min, range_max = min(self.data), max(self.data)
    x_axis = np.linspace(range_min, range_max, 100)
    pdf_vals = self.pdf(x_axis)
    y_axis = hist_data[0][0] / pdf_vals[0] * pdf_vals

    axs[1, 1].plot(x_axis, y_axis, color=self._figure_color_palette[1])
    axs[1, 1].title.set_text("Data vs rescaled fitted density")
    axs[1, 1].set_xlabel("Exceedance data")
    axs[1, 1].yaxis.set_visible(False)  # Hide only x axis
    # axs[0, 0].set_aspect('equal', 'box')

    ############# Q-Q plot ################
    probability_range = np.linspace(0.01, 0.99, 99)
    empirical_quantiles = np.quantile(self.data, probability_range)
    tail_quantiles = self.ppf(probability_range)

    axs[2, 0].scatter(
        tail_quantiles, empirical_quantiles, color=self._figure_color_palette[0]
    )
    min_x, max_x = min(tail_quantiles), max(tail_quantiles)
    # axs[0,1].set_aspect('equal', 'box')
    axs[2, 0].title.set_text("Q-Q plot")
    axs[2, 0].set_xlabel("model quantiles")
    axs[2, 0].set_ylabel("Data quantiles")
    axs[2, 0].grid()
    axs[2, 0].plot([min_x, max_x], [min_x, max_x], linestyle="--", color="black")

    ############ Mean return plot ###############
    # scale, shape = self.scale, self.shape

    # n_obs = len(self.data)
    # exceedance_frequency = 1/np.logspace(1,4,20)
    # return_levels = self.ppf(1 - exceedance_frequency)

    # axs[2,1].plot(1.0/exceedance_frequency,return_levels,color=self._figure_color_palette[0])
    # axs[2,1].set_xscale("log")
    # axs[2,1].title.set_text("Exceedance model's return levels")
    # axs[2,1].set_xlabel('1/frequency')
    # axs[2,1].set_ylabel('Return level')
    # axs[2,1].grid()

    # exs_prob = 1 #carried over from older code

    # m = 10**np.linspace(np.log(1/exs_prob + 1)/np.log(10), 3,20)
    # return_levels = self.ppf(1 - 1/(exs_prob*m))

    # axs[2,1].plot(m,return_levels,color=self._figure_color_palette[0])
    # axs[2,1].set_xscale("log")
    # axs[2,1].title.set_text('Exceedance return levels')
    # axs[2,1].set_xlabel('1/frequency')
    # axs[2,1].set_ylabel('Return level')
    # axs[2,1].grid()

    # try:
    #   #for this bit, look at An Introduction to Statistical selfing of Extreme Values, p.82
    #   mle_cov = self.mle_cov()
    #   eigenvals, eigenvecs = np.linalg.eig(mle_cov)
    #   if np.all(eigenvals > 0):
    #     covariance = np.eye(3)
    #     covariance[1::,1::] = mle_cov
    #     covariance[0,0] = exs_prob*(1-exs_prob)/n_obs
    #     #
    #     return_stdevs = []
    #     for m_ in m:
    #       quantile_grad = np.array([
    #         scale*m_**(shape)*exs_prob**(shape-1),
    #         shape**(-1)*((exs_prob*m_)**shape-1),
    #         -scale*shape**(-2)*((exs_prob*m_)**shape-1)+scale*shape**(-1)*(exs_prob*m_)**shape*np.log(exs_prob*m_)
    #         ])
    #       #
    #       sdev = np.sqrt(quantile_grad.T.dot(covariance).dot(quantile_grad))
    #       return_stdevs.append(sdev)
    #     #
    #     axs[2,1].fill_between(m, return_levels - return_stdevs, return_levels + return_stdevs, alpha=0.2, color=self._figure_color_palette[1])
    #   else:
    #     warnings.warn("Covariance MLE matrix is not positive definite; it might be ill-conditioned", stacklevel=2)
    # except Exception as e:
    #   warnings.warn(f"Confidence bands for return level could not be calculated; covariance matrix might be ill-conditioned; full trace: {traceback.format_exc()}", stacklevel=2)

    plt.tight_layout()
    return fig

Inherited members

class GPTailMixture (**data: Any)

Mixture distribution with generalised Pareto components. This is a base class for Bayesian generalised Pareto tail models, which can be seen as an uniformly weighted mixture of the posterior samples; the convolution of a discrete (or empirical) distribution with a generalised Pareto distribution also results in a mixture of this kind. Most methods inherited from BaseDistribution have an extra argument return_all. When it is True, the full posterior sample of the method evaluation is returned.

Args

data(np.ndarray, optional): Data that induced the model, if applicable
weights : np.ndarray
component weights
thresholds : np.ndarray
vector of threshold parameters, one for each component
scales : np.ndarray
vector of scale parameters, one for each component
shapes : np.ndarray
vector of shape parameters, one for each component

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class GPTailMixture(BaseDistribution):

    """Mixture distribution with generalised Pareto components. This is a base class for  Bayesian generalised Pareto tail models, which can be seen as an uniformly weighted mixture of the posterior samples; the convolution of a discrete (or empirical) distribution with a generalised Pareto distribution also results in a mixture of this kind. Most methods inherited from `BaseDistribution` have an extra argument `return_all`. When it is True, the full posterior sample of the method evaluation is returned.

    Args:
        data(np.ndarray, optional): Data that induced the model, if applicable
        weights (np.ndarray): component weights
        thresholds (np.ndarray): vector of threshold parameters, one for each component
        scales (np.ndarray): vector of scale parameters, one for each component
        shapes (np.ndarray): vector of shape parameters, one for each component
    """

    data: t.Optional[np.ndarray]
    weights: np.ndarray
    thresholds: np.ndarray
    scales: np.ndarray
    shapes: np.ndarray

    def __repr__(self):
        return f"Mixture of generalised Pareto distributions with {len(self.weights)} components"

    @validator("weights", allow_reuse=True)
    def check_weigths(cls, weights):
        if not np.isclose(np.sum(weights), 1, atol=cls._error_tol):
            raise ValueError(f"Weights don't sum 1 (sum = {np.sum(weights)})")
        elif np.any(weights <= 0):
            raise ValueError("Negative or null weights are present")
        else:
            return weights

    def moment(self, n: int, return_all: bool = False) -> float:
        """Returns the n-th order moment

        Args:
            n (int): Moment's order
            return_all (bool, optional): If `True`, all posterior samples of the value are returned; otherwise  the moment value of the posterior predictive distribution is returned. The posterior distribution is taken to be the empirical distribution of posterior samples.

        Returns:
            float: moment value
        """
        vals = np.array(
            [
                gpdist.moment(n, loc=threshold, c=shape, scale=scale)
                for threshold, shape, scale in zip(
                    self.thresholds, self.shapes, self.scales
                )
            ]
        )

        if return_all:
            return vals
        else:
            return np.dot(self.weights, vals)

    def mean(self, return_all: bool = False) -> float:
        """Returns the mean value

        Args:
            return_all (bool, optional): If `True`, all posterior samples of the value are returned; the mean of the posterior predictive distribution is evaluated. The posterior distribution is taken to be the empirical distribution of posterior samples.

        Returns:
            float: mean value
        """
        vals = gpdist.mean(loc=self.thresholds, c=self.shapes, scale=self.scales)

        if return_all:
            return vals
        else:
            return np.dot(self.weights, vals)

    def std(self, return_all: bool = False) -> float:
        """Standard deviation value

        Args:
            return_all (bool, optional): If `True`, all posterior samples of the value are returned; otherwise the standard deviation of the posterior predictive distribution is returned. The posterior distribution is taken to be the empirical distribution of posterior samples.

        Returns:
            float: standard deviation value
        """
        var = gpdist.var(loc=self.thresholds, c=self.shapes, scale=self.scales)

        mean = gpdist.mean(loc=self.thresholds, c=self.shapes, scale=self.scales)

        if return_all:
            return np.sqrt(var)
        else:
            return np.sqrt(np.dot(self.weights, var + mean**2) - self.mean() ** 2)

    def cdf(
        self, x: t.Union[float, np.ndarray], return_all: bool = False
    ) -> t.Union[float, np.ndarray]:
        """Evaluates the CDF function

        Args:
            x (t.Union[float, np.ndarray]): Point at which to evaluate it
            return_all (bool, optional): If `True`, all posterior samples of the value are returned; the CDF of the posterior predictive distribution is evaluated. The posterior distribution is taken to be the empirical distribution of posterior samples.

        Returns:
            t.Union[float, np.ndarray]: CDF value
        """

        if isinstance(x, np.ndarray):
            return np.array([self.cdf(elem, return_all) for elem in x])

        vals = gpdist.cdf(x, loc=self.thresholds, c=self.shapes, scale=self.scales)

        if return_all:
            return vals
        else:
            return np.dot(self.weights, vals)

    def pdf(
        self, x: t.Union[float, np.ndarray], return_all: bool = False
    ) -> t.Union[float, np.ndarray]:
        """Evaluates the pdf function

        Args:
            x (t.Union[float, np.ndarray]): Point at which to evaluate it
            return_all (bool, optional): If `True`, all posterior samples of the value are returned; otherwise the pdf of the posterior predictive distribution is evaluated. The posterior distribution is taken to be the empirical distribution of posterior samples.

        Returns:
            t.Union[float, np.ndarray]: pdf value
        """

        if isinstance(x, np.ndarray):
            return np.array([self.pdf(elem, return_all) for elem in x])

        vals = gpdist.pdf(x, loc=self.thresholds, c=self.shapes, scale=self.scales)

        if return_all:
            return vals
        else:
            return np.dot(self.weights, vals)

    def ppf(
        self, q: t.Union[float, np.ndarray], return_all: bool = False
    ) -> t.Union[float, np.ndarray]:
        """Evaluates the quantile function

        Args:
            x (t.Union[float, np.ndarray]): probability level
            return_all (bool, optional): If `True`, all posterior samples of the value are returned; otherwise  the quantile function of the posterior predictive distribution is evaluated. The posterior distribution is taken to be the empirical distribution of posterior samples.

        Returns:
            t.Union[float, np.ndarray]: quantile function value value
        """
        if isinstance(q, np.ndarray):
            return np.array([self.ppf(elem, return_all) for elem in q])

        vals = gpdist.ppf(q, loc=self.thresholds, c=self.shapes, scale=self.scales)

        if return_all:
            return vals
        else:

            def target_function(x):
                return self.cdf(x) - q

            x0 = np.dot(self.weights, vals)
            return root_scalar(target_function, x0=x0, x1=x0 + 1, method="secant").root

    def cvar(self, p: float, return_all: bool = False) -> float:
        """Returns the conditional value at risk for a given probability level

        Args:
            x (t.Union[float, np.ndarray]): probability level
            return_all (bool, optional): If `True`, all posterior samples of the value are returned; otherwise  the cvar of the posterior predictive distribution is evaluated. The posterior distribution is taken to be the empirical distribution of posterior samples.

        Returns:
            t.Union[float, np.ndarray]: conditional value at risk
        """
        if p < 0 or p >= 1:
            raise ValueError("p must be in the open interval (0,1)")

        return (self >= self.ppf(p)).mean(return_all)

    def simulate(self, size: int) -> np.ndarray:
        n_samples = np.random.multinomial(n=size, pvals=self.weights, size=1)[0]
        indices = (n_samples > 0).nonzero()[0]
        samples = [
            gpdist.rvs(
                size=n_samples[i],
                c=self.shapes[i],
                scale=self.scales[i],
                loc=self.thresholds[i],
            )
            for i in indices
        ]
        return np.concatenate(samples, axis=0)

    def __gt__(self, other: float) -> GPTailMixture:
        exceedance_prob = 1 - self.cdf(other)
        # prob_exceedance = np.dot(self.weights, prob_cond_exceedance)
        if exceedance_prob == 0:
            raise ValueError(
                f"There is no probability mass above {other}; conditional distribution does not exist."
            )

        conditional_weights = (
            self.weights
            * gpdist.sf(other, c=self.shapes, scale=self.scales, loc=self.thresholds)
            / exceedance_prob
        )

        indices = (conditional_weights > 0).nonzero()[
            0
        ]  # indices of mixture components with nonzero exceedance probability

        new_weights = conditional_weights[indices]

        # disable warnings temporarily
        # with warnings.catch_warnings():
        #   warnings.simplefilter("ignore")
        #   new_thresholds = np.array([(GPTail(threshold = mu, shape = xi, scale = sigma) >= other).threshold for mu, sigma, xi in zip(self.thresholds[indices], self.scales[indices], self.shapes[indices])])

        new_thresholds = np.array(
            [max(other, threshold) for threshold in self.thresholds[indices]]
        )
        new_shapes = self.shapes[indices]
        new_scales = self.scales[indices] + new_shapes * (
            new_thresholds - self.thresholds[indices]
        )
        # new_thresholds = np.clip(self.thresholds[indices], a_min=other, a_max = np.Inf)
        # new_shapes = self.shapes[indices]
        # new_scales = self.scales[indices] + new_shapes*np.clip(other - new_thresholds, a_min = 0.0, a_max=np.Inf)

        if self.data is not None and np.all(self.data < other):
            warnings.warn(
                f"No observed data above {other}; setting data to None in conditioned model",
                stacklevel=2,
            )
            new_data = None
        elif self.data is None:
            new_data = None
        else:
            new_data = self.data[self.data > other]

        return type(self)(
            weights=new_weights,
            thresholds=new_thresholds,
            shapes=new_shapes,
            scales=new_scales,
            data=new_data,
        )

    def __ge__(self, other: float) -> BaseDistribution:
        return self.__gt__(other)

    def __add__(self, other: self._allowed_scalar_types):

        if type(other) not in self._allowed_scalar_types:
            raise TypeError(f"+ is implemented for types {self._allowed_scalar_types}")

        new_data = None if self.data is None else self.data + other

        return type(self)(
            weights=self.weights,
            thresholds=self.thresholds + other,
            shapes=self.shapes,
            scales=self.scales,
            data=new_data,
        )

    def __mul__(self, other: self._allowed_scalar_types):

        if type(other) not in self._allowed_scalar_types:
            raise TypeError(f"* is implemented for types {self._allowed_scalar_types}")

        if other <= 0:
            raise ValueError(f"product supported for positive scalars only")

        new_data = None if self.data is None else other * self.data

        return type(self)(
            weights=self.weights,
            thresholds=other * self.thresholds,
            shapes=self.shapes,
            scales=other * self.scales,
            data=new_data,
        )

Ancestors

Subclasses

Class variables

var data : Optional[numpy.ndarray]
var scales : numpy.ndarray
var shapes : numpy.ndarray
var thresholds : numpy.ndarray
var weights : numpy.ndarray

Static methods

def check_weigths(weights)
Expand source code
@validator("weights", allow_reuse=True)
def check_weigths(cls, weights):
    if not np.isclose(np.sum(weights), 1, atol=cls._error_tol):
        raise ValueError(f"Weights don't sum 1 (sum = {np.sum(weights)})")
    elif np.any(weights <= 0):
        raise ValueError("Negative or null weights are present")
    else:
        return weights

Methods

def cdf(self, x: t.Union[float, np.ndarray], return_all: bool = False) ‑> Union[float, numpy.ndarray]

Evaluates the CDF function

Args

x : t.Union[float, np.ndarray]
Point at which to evaluate it
return_all : bool, optional
If True, all posterior samples of the value are returned; the CDF of the posterior predictive distribution is evaluated. The posterior distribution is taken to be the empirical distribution of posterior samples.

Returns

t.Union[float, np.ndarray]
CDF value
Expand source code
def cdf(
    self, x: t.Union[float, np.ndarray], return_all: bool = False
) -> t.Union[float, np.ndarray]:
    """Evaluates the CDF function

    Args:
        x (t.Union[float, np.ndarray]): Point at which to evaluate it
        return_all (bool, optional): If `True`, all posterior samples of the value are returned; the CDF of the posterior predictive distribution is evaluated. The posterior distribution is taken to be the empirical distribution of posterior samples.

    Returns:
        t.Union[float, np.ndarray]: CDF value
    """

    if isinstance(x, np.ndarray):
        return np.array([self.cdf(elem, return_all) for elem in x])

    vals = gpdist.cdf(x, loc=self.thresholds, c=self.shapes, scale=self.scales)

    if return_all:
        return vals
    else:
        return np.dot(self.weights, vals)
def cvar(self, p: float, return_all: bool = False) ‑> float

Returns the conditional value at risk for a given probability level

Args

x : t.Union[float, np.ndarray]
probability level
return_all : bool, optional
If True, all posterior samples of the value are returned; otherwise the cvar of the posterior predictive distribution is evaluated. The posterior distribution is taken to be the empirical distribution of posterior samples.

Returns

t.Union[float, np.ndarray]
conditional value at risk
Expand source code
def cvar(self, p: float, return_all: bool = False) -> float:
    """Returns the conditional value at risk for a given probability level

    Args:
        x (t.Union[float, np.ndarray]): probability level
        return_all (bool, optional): If `True`, all posterior samples of the value are returned; otherwise  the cvar of the posterior predictive distribution is evaluated. The posterior distribution is taken to be the empirical distribution of posterior samples.

    Returns:
        t.Union[float, np.ndarray]: conditional value at risk
    """
    if p < 0 or p >= 1:
        raise ValueError("p must be in the open interval (0,1)")

    return (self >= self.ppf(p)).mean(return_all)
def mean(self, return_all: bool = False) ‑> float

Returns the mean value

Args

return_all : bool, optional
If True, all posterior samples of the value are returned; the mean of the posterior predictive distribution is evaluated. The posterior distribution is taken to be the empirical distribution of posterior samples.

Returns

float
mean value
Expand source code
def mean(self, return_all: bool = False) -> float:
    """Returns the mean value

    Args:
        return_all (bool, optional): If `True`, all posterior samples of the value are returned; the mean of the posterior predictive distribution is evaluated. The posterior distribution is taken to be the empirical distribution of posterior samples.

    Returns:
        float: mean value
    """
    vals = gpdist.mean(loc=self.thresholds, c=self.shapes, scale=self.scales)

    if return_all:
        return vals
    else:
        return np.dot(self.weights, vals)
def moment(self, n: int, return_all: bool = False) ‑> float

Returns the n-th order moment

Args

n : int
Moment's order
return_all : bool, optional
If True, all posterior samples of the value are returned; otherwise the moment value of the posterior predictive distribution is returned. The posterior distribution is taken to be the empirical distribution of posterior samples.

Returns

float
moment value
Expand source code
def moment(self, n: int, return_all: bool = False) -> float:
    """Returns the n-th order moment

    Args:
        n (int): Moment's order
        return_all (bool, optional): If `True`, all posterior samples of the value are returned; otherwise  the moment value of the posterior predictive distribution is returned. The posterior distribution is taken to be the empirical distribution of posterior samples.

    Returns:
        float: moment value
    """
    vals = np.array(
        [
            gpdist.moment(n, loc=threshold, c=shape, scale=scale)
            for threshold, shape, scale in zip(
                self.thresholds, self.shapes, self.scales
            )
        ]
    )

    if return_all:
        return vals
    else:
        return np.dot(self.weights, vals)
def pdf(self, x: t.Union[float, np.ndarray], return_all: bool = False) ‑> Union[float, numpy.ndarray]

Evaluates the pdf function

Args

x : t.Union[float, np.ndarray]
Point at which to evaluate it
return_all : bool, optional
If True, all posterior samples of the value are returned; otherwise the pdf of the posterior predictive distribution is evaluated. The posterior distribution is taken to be the empirical distribution of posterior samples.

Returns

t.Union[float, np.ndarray]
pdf value
Expand source code
def pdf(
    self, x: t.Union[float, np.ndarray], return_all: bool = False
) -> t.Union[float, np.ndarray]:
    """Evaluates the pdf function

    Args:
        x (t.Union[float, np.ndarray]): Point at which to evaluate it
        return_all (bool, optional): If `True`, all posterior samples of the value are returned; otherwise the pdf of the posterior predictive distribution is evaluated. The posterior distribution is taken to be the empirical distribution of posterior samples.

    Returns:
        t.Union[float, np.ndarray]: pdf value
    """

    if isinstance(x, np.ndarray):
        return np.array([self.pdf(elem, return_all) for elem in x])

    vals = gpdist.pdf(x, loc=self.thresholds, c=self.shapes, scale=self.scales)

    if return_all:
        return vals
    else:
        return np.dot(self.weights, vals)
def ppf(self, q: t.Union[float, np.ndarray], return_all: bool = False) ‑> Union[float, numpy.ndarray]

Evaluates the quantile function

Args

x : t.Union[float, np.ndarray]
probability level
return_all : bool, optional
If True, all posterior samples of the value are returned; otherwise the quantile function of the posterior predictive distribution is evaluated. The posterior distribution is taken to be the empirical distribution of posterior samples.

Returns

t.Union[float, np.ndarray]
quantile function value value
Expand source code
def ppf(
    self, q: t.Union[float, np.ndarray], return_all: bool = False
) -> t.Union[float, np.ndarray]:
    """Evaluates the quantile function

    Args:
        x (t.Union[float, np.ndarray]): probability level
        return_all (bool, optional): If `True`, all posterior samples of the value are returned; otherwise  the quantile function of the posterior predictive distribution is evaluated. The posterior distribution is taken to be the empirical distribution of posterior samples.

    Returns:
        t.Union[float, np.ndarray]: quantile function value value
    """
    if isinstance(q, np.ndarray):
        return np.array([self.ppf(elem, return_all) for elem in q])

    vals = gpdist.ppf(q, loc=self.thresholds, c=self.shapes, scale=self.scales)

    if return_all:
        return vals
    else:

        def target_function(x):
            return self.cdf(x) - q

        x0 = np.dot(self.weights, vals)
        return root_scalar(target_function, x0=x0, x1=x0 + 1, method="secant").root
def std(self, return_all: bool = False) ‑> float

Standard deviation value

Args

return_all : bool, optional
If True, all posterior samples of the value are returned; otherwise the standard deviation of the posterior predictive distribution is returned. The posterior distribution is taken to be the empirical distribution of posterior samples.

Returns

float
standard deviation value
Expand source code
def std(self, return_all: bool = False) -> float:
    """Standard deviation value

    Args:
        return_all (bool, optional): If `True`, all posterior samples of the value are returned; otherwise the standard deviation of the posterior predictive distribution is returned. The posterior distribution is taken to be the empirical distribution of posterior samples.

    Returns:
        float: standard deviation value
    """
    var = gpdist.var(loc=self.thresholds, c=self.shapes, scale=self.scales)

    mean = gpdist.mean(loc=self.thresholds, c=self.shapes, scale=self.scales)

    if return_all:
        return np.sqrt(var)
    else:
        return np.sqrt(np.dot(self.weights, var + mean**2) - self.mean() ** 2)

Inherited members

class Mixture (**data: Any)

This class represents a probability distribution given by a mixture of weighted continuous and empirical densities; as the base continuous densities can only be of class GPTail, this class is intended to represent either a semiparametric model with a Generalised Pareto tail, or the convolution of such a model with an integer distribution, as is the case for the power surplus distribution in power system reliability modeling.

Args

distributions : t.List[BaseDistribution]
list of distributions that make up the mixture
weights : np.ndarray
weights for each of the distribution. The weights must be a distribution themselves

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class Mixture(BaseDistribution):

    """This class represents a probability distribution given by a mixture of weighted continuous and empirical densities; as the base continuous densities can only be of class GPTail, this class is intended to represent either a semiparametric model with a Generalised Pareto tail, or the convolution of such a model with an integer distribution, as is the case for the power surplus distribution in power system reliability modeling.

    Args:
        distributions (t.List[BaseDistribution]): list of distributions that make up the mixture
        weights (np.ndarray): weights for each of the distribution. The weights must be a distribution themselves
    """

    distributions: t.List[BaseDistribution]
    weights: np.ndarray

    def __repr__(self):
        return f"Mixture with {len(self.weights)} components"

    @validator("weights", allow_reuse=True)
    def check_weigths(cls, weights):
        if not np.isclose(np.sum(weights), 1, atol=cls._error_tol):
            raise ValidationError(f"Weights don't sum 1 (sum = {np.sum(weights)})")
        elif np.any(weights <= 0):
            raise ValidationError("Negative or null weights are present")
        else:
            return weights

    def __mul__(self, factor: float):

        return Mixture(
            weights=self.weights,
            distributions=[factor * dist for dist in self.distributions],
        )

    def __add__(self, factor: float):

        return Mixture(
            weights=self.weights,
            distributions=[factor + dist for dist in self.distributions],
        )

    def __ge__(self, other: float):

        if not isinstance(other, self._allowed_scalar_types):
            raise TypeError(
                f">= is implemented for instances of types : {self._allowed_scalar_types}"
            )

        if self.cdf(other) == 1:
            raise ValueError("There is no probability mass above provided threshold")

        cond_weights = np.array(
            [
                1 - dist.cdf(other) + (isinstance(dist, Empirical)) * dist.pdf(other)
                for dist in self.distributions
            ]
        )
        new_weights = cond_weights * self.weights

        indices = (new_weights > 0).nonzero()[0]

        nz_weights = new_weights[indices]
        nz_dists = [self.distributions[i] for i in indices]

        return Mixture(
            weights=nz_weights / np.sum(nz_weights),
            distributions=[dist >= other for dist in nz_dists],
        )

    def __gt__(self, other: float):

        if not isinstance(other, self._allowed_scalar_types):
            raise TypeError(
                f"> is implemented for instances of types : {self._allowed_scalar_types}"
            )

        if self.cdf(other) == 1:
            raise ValueError("There is no probability mass above provided threshold")

        cond_weights = np.array(
            [
                1 - dist.cdf(other) + (isinstance(dist, Empirical)) * dist.pdf(other)
                for dist in self.distributions
            ]
        )

        new_weights = cond_weights * self.weights

        indices = (new_weights > 0).nonzero()[0]

        nz_weights = new_weights[indices]
        nz_dists = [self.distributions[i] for i in indices]

        return Mixture(
            weights=nz_weights / np.sum(nz_weights),
            distributions=[dist > other for dist in nz_dists],
        )

        # index = self.support > other

        # return type(self)(
        #   self.support[index],
        #   self.pdf_values[index]/np.sum(self.pdf_values[index]),
        #   self.data[self.data > other])

    def simulate(self, size: int) -> np.ndarray:
        """Simulate values from mixture distribution

        Args:
            size (int): Sample size

        Returns:
            np.ndarray: simulated sample
        """
        n_samples = np.random.multinomial(n=size, pvals=self.weights, size=1)[0]
        indices = (n_samples > 0).nonzero()[0]
        samples = [
            dist.simulate(size=k)
            for dist, k in zip(
                [self.distributions[k] for k in indices], n_samples[indices]
            )
        ]
        samples = np.concatenate(samples, axis=0)
        np.random.shuffle(samples)
        return samples

    def cdf(
        self, x: t.Union[float, np.ndarray], **kwargs
    ) -> t.Union[float, np.ndarray]:
        """Evaluate Mixture's CDF function

        Args:
            x (t.Union[float, np.ndarray]): point at which to evaluate CDF
            **kwargs: Additional arguments passed to individual mixture component's CDF function

        Returns:
            t.Union[float, np.ndarray]: CDF value
        """

        # the use of a list and a reduce is needed because mixture components might return scalars or vectors depending on their class and on the passed kwargs.
        vals = [
            w * dist.cdf(x, **kwargs)
            for w, dist in zip(self.weights, self.distributions)
        ]
        return reduce(lambda x, y: x + y, vals)

    def pdf(
        self, x: t.Union[float, np.ndarray], **kwargs
    ) -> t.Union[float, np.ndarray]:
        """Evaluate Mixture's pdf function

        Args:
            x (t.Union[float, np.ndarray]): point at which to evaluate CDF
            **kwargs: Additional arguments passed to individual mixture component's pdf function

        Returns:
            t.Union[float, np.ndarray]: pdf value
        """

        # the use of a list and a reduce is needed because mixture components might return scalars or vectors depending on their class and on the passed kwargs.
        vals = [
            w * dist.pdf(x, **kwargs)
            for w, dist in zip(self.weights, self.distributions)
        ]
        return reduce(lambda x, y: x + y, vals)

    def ppf(
        self, q: t.Union[float, np.ndarray], **kwargs
    ) -> t.Union[float, np.ndarray]:
        """Evaluate Mixture's quantile function function

        Args:
            x (t.Union[float, np.ndarray]): point at which to evaluate quantile function
            **kwargs: Additional arguments passed to individual mixture component's quantile function

        Returns:
            t.Union[float, np.ndarray]: quantile value
        """

        if isinstance(q, np.ndarray):
            return np.array([self.ppf(elem, **kwargs) for elem in q])

        def target_function(x):
            return self.cdf(x) - q

        vals = [
            w * dist.ppf(q, **kwargs)
            for w, dist in zip(self.weights, self.distributions)
        ]
        x0 = reduce(lambda x, y: x + y, vals)

        # the use of a list and a reduce is needed because mixture components might return scalars or vectors depending on their class and on the passed kwargs.
        vals = [
            w * dist.ppf(0.5 + q / 2, **kwargs)
            for w, dist in zip(self.weights, self.distributions)
        ]
        x1 = reduce(lambda x, y: x + y, vals)

        return root_scalar(target_function, x0=x0, x1=x1, method="secant").root

    def moment(self, n: int, **kwargs) -> float:
        """Evaluate Mixture's n-th moment

        Args:
            x (t.Union[float, np.ndarray]): Moment order
            **kwargs: Additional arguments passed to individual mixture components' moment function

        Returns:
            t.Union[float, np.ndarray]: moment value
        """

        # the use of a list and a reduce is needed because mixture components might return scalars or vectors depending on their class and on the passed kwargs.
        vals = [
            w * dist.moment(n, **kwargs)
            for w, dist in zip(self.weights, self.distributions)
        ]
        return reduce(lambda x, y: x + y, vals)

    def mean(self, **kwargs) -> float:
        """Evaluate Mixture's mean

        Args:
            **kwargs: Additional arguments passed to individual mixture components' mean function

        Returns:
            float: mean value
        """

        # the use of a list and a reduce is needed because mixture components might return scalars or vectors depending on their class and on the passed kwargs.
        vals = [
            w * dist.mean(**kwargs) for w, dist in zip(self.weights, self.distributions)
        ]
        return reduce(lambda x, y: x + y, vals)

    def std(self, **kwargs) -> float:
        """Evaluate Mixture's standard deviation

        Args:
            **kwargs: Additional arguments passed to individual mixture components' standard deviation function

        Returns:
            float: standard deviation value
        """

        # the use of a list and a reduce is needed because mixture components might return scalars or vectors depending on their class and on the passed kwargs.
        vals = [
            w * (dist.std(**kwargs) ** 2 + dist.mean(**kwargs) ** 2)
            for w, dist in zip(self.weights, self.distributions)
        ]
        return np.sqrt(reduce(lambda x, y: x + y, vals) - self.mean() ** 2)

Ancestors

Subclasses

Class variables

var distributions : List[BaseDistribution]
var weights : numpy.ndarray

Static methods

def check_weigths(weights)
Expand source code
@validator("weights", allow_reuse=True)
def check_weigths(cls, weights):
    if not np.isclose(np.sum(weights), 1, atol=cls._error_tol):
        raise ValidationError(f"Weights don't sum 1 (sum = {np.sum(weights)})")
    elif np.any(weights <= 0):
        raise ValidationError("Negative or null weights are present")
    else:
        return weights

Methods

def cdf(self, x: t.Union[float, np.ndarray], **kwargs) ‑> Union[float, numpy.ndarray]

Evaluate Mixture's CDF function

Args

x : t.Union[float, np.ndarray]
point at which to evaluate CDF
**kwargs
Additional arguments passed to individual mixture component's CDF function

Returns

t.Union[float, np.ndarray]
CDF value
Expand source code
def cdf(
    self, x: t.Union[float, np.ndarray], **kwargs
) -> t.Union[float, np.ndarray]:
    """Evaluate Mixture's CDF function

    Args:
        x (t.Union[float, np.ndarray]): point at which to evaluate CDF
        **kwargs: Additional arguments passed to individual mixture component's CDF function

    Returns:
        t.Union[float, np.ndarray]: CDF value
    """

    # the use of a list and a reduce is needed because mixture components might return scalars or vectors depending on their class and on the passed kwargs.
    vals = [
        w * dist.cdf(x, **kwargs)
        for w, dist in zip(self.weights, self.distributions)
    ]
    return reduce(lambda x, y: x + y, vals)
def mean(self, **kwargs) ‑> float

Evaluate Mixture's mean

Args

**kwargs
Additional arguments passed to individual mixture components' mean function

Returns

float
mean value
Expand source code
def mean(self, **kwargs) -> float:
    """Evaluate Mixture's mean

    Args:
        **kwargs: Additional arguments passed to individual mixture components' mean function

    Returns:
        float: mean value
    """

    # the use of a list and a reduce is needed because mixture components might return scalars or vectors depending on their class and on the passed kwargs.
    vals = [
        w * dist.mean(**kwargs) for w, dist in zip(self.weights, self.distributions)
    ]
    return reduce(lambda x, y: x + y, vals)
def moment(self, n: int, **kwargs) ‑> float

Evaluate Mixture's n-th moment

Args

x : t.Union[float, np.ndarray]
Moment order
**kwargs
Additional arguments passed to individual mixture components' moment function

Returns

t.Union[float, np.ndarray]
moment value
Expand source code
def moment(self, n: int, **kwargs) -> float:
    """Evaluate Mixture's n-th moment

    Args:
        x (t.Union[float, np.ndarray]): Moment order
        **kwargs: Additional arguments passed to individual mixture components' moment function

    Returns:
        t.Union[float, np.ndarray]: moment value
    """

    # the use of a list and a reduce is needed because mixture components might return scalars or vectors depending on their class and on the passed kwargs.
    vals = [
        w * dist.moment(n, **kwargs)
        for w, dist in zip(self.weights, self.distributions)
    ]
    return reduce(lambda x, y: x + y, vals)
def pdf(self, x: t.Union[float, np.ndarray], **kwargs) ‑> Union[float, numpy.ndarray]

Evaluate Mixture's pdf function

Args

x : t.Union[float, np.ndarray]
point at which to evaluate CDF
**kwargs
Additional arguments passed to individual mixture component's pdf function

Returns

t.Union[float, np.ndarray]
pdf value
Expand source code
def pdf(
    self, x: t.Union[float, np.ndarray], **kwargs
) -> t.Union[float, np.ndarray]:
    """Evaluate Mixture's pdf function

    Args:
        x (t.Union[float, np.ndarray]): point at which to evaluate CDF
        **kwargs: Additional arguments passed to individual mixture component's pdf function

    Returns:
        t.Union[float, np.ndarray]: pdf value
    """

    # the use of a list and a reduce is needed because mixture components might return scalars or vectors depending on their class and on the passed kwargs.
    vals = [
        w * dist.pdf(x, **kwargs)
        for w, dist in zip(self.weights, self.distributions)
    ]
    return reduce(lambda x, y: x + y, vals)
def ppf(self, q: t.Union[float, np.ndarray], **kwargs) ‑> Union[float, numpy.ndarray]

Evaluate Mixture's quantile function function

Args

x : t.Union[float, np.ndarray]
point at which to evaluate quantile function
**kwargs
Additional arguments passed to individual mixture component's quantile function

Returns

t.Union[float, np.ndarray]
quantile value
Expand source code
def ppf(
    self, q: t.Union[float, np.ndarray], **kwargs
) -> t.Union[float, np.ndarray]:
    """Evaluate Mixture's quantile function function

    Args:
        x (t.Union[float, np.ndarray]): point at which to evaluate quantile function
        **kwargs: Additional arguments passed to individual mixture component's quantile function

    Returns:
        t.Union[float, np.ndarray]: quantile value
    """

    if isinstance(q, np.ndarray):
        return np.array([self.ppf(elem, **kwargs) for elem in q])

    def target_function(x):
        return self.cdf(x) - q

    vals = [
        w * dist.ppf(q, **kwargs)
        for w, dist in zip(self.weights, self.distributions)
    ]
    x0 = reduce(lambda x, y: x + y, vals)

    # the use of a list and a reduce is needed because mixture components might return scalars or vectors depending on their class and on the passed kwargs.
    vals = [
        w * dist.ppf(0.5 + q / 2, **kwargs)
        for w, dist in zip(self.weights, self.distributions)
    ]
    x1 = reduce(lambda x, y: x + y, vals)

    return root_scalar(target_function, x0=x0, x1=x1, method="secant").root
def simulate(self, size: int) ‑> numpy.ndarray

Simulate values from mixture distribution

Args

size : int
Sample size

Returns

np.ndarray
simulated sample
Expand source code
def simulate(self, size: int) -> np.ndarray:
    """Simulate values from mixture distribution

    Args:
        size (int): Sample size

    Returns:
        np.ndarray: simulated sample
    """
    n_samples = np.random.multinomial(n=size, pvals=self.weights, size=1)[0]
    indices = (n_samples > 0).nonzero()[0]
    samples = [
        dist.simulate(size=k)
        for dist, k in zip(
            [self.distributions[k] for k in indices], n_samples[indices]
        )
    ]
    samples = np.concatenate(samples, axis=0)
    np.random.shuffle(samples)
    return samples
def std(self, **kwargs) ‑> float

Evaluate Mixture's standard deviation

Args

**kwargs
Additional arguments passed to individual mixture components' standard deviation function

Returns

float
standard deviation value
Expand source code
def std(self, **kwargs) -> float:
    """Evaluate Mixture's standard deviation

    Args:
        **kwargs: Additional arguments passed to individual mixture components' standard deviation function

    Returns:
        float: standard deviation value
    """

    # the use of a list and a reduce is needed because mixture components might return scalars or vectors depending on their class and on the passed kwargs.
    vals = [
        w * (dist.std(**kwargs) ** 2 + dist.mean(**kwargs) ** 2)
        for w, dist in zip(self.weights, self.distributions)
    ]
    return np.sqrt(reduce(lambda x, y: x + y, vals) - self.mean() ** 2)

Inherited members